Spark 读写 Ceph S3入门学习总结
前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun
前言
首先安装好Ceph,可以参考我上篇文章Ceph分布式集群安装配置
版本
spark: 2.4.5 hadoop: hdp版本 3.1.1.3.1.0.0-78
spark-shell读写S3
jar包配置
hadoop-aws-3.1.1.3.1.0.0-78.jar 注意版本要和hadoop版本对应
aws-java-sdk-s3-1.12.22.jar
aws-java-sdk-core-1.12.22.jar
aws-java-sdk-dynamodb-1.12.22.jar
可能还需要:
hadoop-client-api-3.1.1.3.1.0.0-78.jar
hadoop-client-runtime-3.1.1.3.1.0.0-78.jar
将上面的jar包拷贝到$SPARK_HOME/jars
spark-shell读
s3cmd创建测试文件
创建Bucket
s3cmd mb s3://txt
Bucket 's3://txt/' created
本地生成测试txt
vi test.txt
1
2
3
4
将test.txt上传到s3://txt
s3cmd put test.txt s3://txt
upload: 'test.txt' -> 's3://txt/test.txt' [1 of 1]
8 of 8 100% in 0s 45.82 B/s done
代码
spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "XLSMNHY9Z4ML094IBGOY")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "iGn9bmqKAArUqiMIohYDmF3WSPi0YAyVO3J9WnxZ")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "10.110.105.162:7480")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.connection.ssl.enabled", "false")
val rdd = spark.read.text("s3a://txt/test.txt")
rdd.count
rdd.foreach(println(_))
结果
scala> rdd.count
res4: Long = 4
scala> rdd.foreach(println(_))
[1]
[2]
[3]
[4]
spark-shell写
s3cmd创建用于写的Bucket
s3cmd mb s3://test-s3-write
Bucket 's3://test-s3-write/' created
代码
spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "XLSMNHY9Z4ML094IBGOY")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "iGn9bmqKAArUqiMIohYDmF3WSPi0YAyVO3J9WnxZ")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "10.110.105.162:7480")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.connection.ssl.enabled", "false")
import spark.implicits._
val df = Seq((1, "a1", 10, 1000, "2022-09-27")).toDF("id", "name", "value", "ts", "dt")
df.write.mode("overwrite").parquet("s3a://test-s3-write/test_df")
spark.read.parquet("s3a://test-s3-write/test_df").show
验证
我们在上面的代码里已经通过读来验证了一次
spark.read.parquet("s3a://test-s3-write/test_df").show
+---+----+-----+----+----------+
| id|name|value| ts| dt|
+---+----+-----+----+----------+
| 1| a1| 10|1000|2022-09-27|
+---+----+-----+----+----------+
接下来再用s3cmd命令验证对应的s3路径下是否有对应的我们用代码写的parquet文件
s3cmd ls s3://test-s3-write/test_df/
2022-09-28 07:35 0 s3://test-s3-write/test_df/_SUCCESS
2022-09-28 07:35 1222 s3://test-s3-write/test_df/part-00000-f1f23a4e-bb07-424c-853e-59281af2920c-c000.snappy.parquet
IDEA Spark代码读写S3
pom依赖
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>3.1.1.3.1.0.0-78</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
<version>1.12.22</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.12.22</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-dynamodb</artifactId>
<version>1.12.22</version>
</dependency>
代码
import org.apache.spark.sql.SparkSession
object SparkS3Demo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().
master("local[*]").
appName("SparkS3Demo").
getOrCreate()
spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "XLSMNHY9Z4ML094IBGOY")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "iGn9bmqKAArUqiMIohYDmF3WSPi0YAyVO3J9WnxZ")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "10.110.105.162:7480")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.connection.ssl.enabled", "false")
testReadTxt(spark)
testWriteAndReadParquet(spark)
spark.stop()
}
/**
* 测试Spark读S3txt
* 需要先创建Bucket s3cmd mb s3://txt
* 再上传test.txt s3cmd put test.txt s3://txt
* 最后用Spark读
*/
def testReadTxt(spark: SparkSession) = {
val rdd = spark.read.text("s3a://txt/test.txt")
println(rdd.count)
rdd.foreach(println(_))
}
/**
* s3cmd mb s3://test-s3-write
* 需要先创建Bucket
*/
def testWriteAndReadParquet(spark: SparkSession) = {
import spark.implicits._
val df = Seq((1, "a1", 10, 1000, "2022-09-27")).toDF("id", "name", "value", "ts", "dt")
df.write.mode("overwrite").parquet("s3a://test-s3-write/test_df")
spark.read.parquet("s3a://test-s3-write/test_df").show
}
}
完整代码
完整代码已上传到GitHub,有需要的同学可自行下载:https://github.com/dongkelun/S3_Demo
s3,s3a,s3n 的区别
引用自https://m.imooc.com/wenda/detail/606708
- S3本机文件系统(URI方案:s3n)用于在S3上读写常规文件的本机文件系统。该文件系统的优点是您可以访问S3上用其他工具编写的文件。相反,其他工具可以访问使用Hadoop编写的文件。缺点是S3施加的文件大小限制为5GB。
- S3A(URI方案:s3a)S3a:系统是S3本机s3n fs的后继产品,它使用Amazon的库与S3进行交互。这使S3a支持更大的文件(没有更多的5GB限制),更高性能的操作等等。该文件系统旨在替代S3本机/替代S3本机:从s3n:// URL访问的所有对象也应该仅通过替换URL架构就可以从s3a访问。
- S3块文件系统(URI方案:s3)由S3支持的基于块的文件系统。文件存储为块,就像它们在HDFS中一样。这样可以有效地执行重命名。此文件系统要求您为文件系统专用存储桶-您不应使用包含文件的现有存储桶,也不应将其他文件写入同一存储桶。该文件系统存储的文件可以大于5GB,但不能与其他S3工具互操作。
异常解决
缺jar包
- 异常:java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found 缺包:hadoop-aws-3.1.1.3.1.0.0-78.jar
- 异常:java.lang.NoClassDefFoundError: com/amazonaws/services/s3/model/MultiObjectDeleteException 缺包:aws-java-sdk-s3-1.12.22.jar
- 异常:java.lang.NoClassDefFoundError: com/amazonaws/AmazonServiceException java.lang.NoClassDefFoundError: com/amazonaws/SdkBaseException 缺包:aws-java-sdk-core-1.12.22.jar
- 异常:java.lang.NoClassDefFoundError: com/amazonaws/services/dynamodbv2/model/AmazonDynamoDBException 缺包:aws-java-sdk-dynamodb-1.12.22.jar
- 异常:java.lang.NoClassDefFoundError: org/apache/hadoop/fs/statistics/IOStatisticsSource 缺包:hadoop-client-api-3.1.1.3.1.0.0-78.jar
- 异常:java.lang.NoClassDefFoundError: org/apache/hadoop/thirdparty/com/google/common/base/Preconditions 缺包:hadoop-client-runtime-3.1.1.3.1.0.0-78
Status Code: 403; Error Code: 403 Forbidden
java.nio.file.AccessDeniedException: s3a://txt/test.txt: getFileStatus on s3a://txt/test.txt: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 7WGTHV5104XV9QG1; S3 Extended Request ID: foP4XEGSFN258IhbdV8NolM8Rmn8pESxAIK8LCwxFWxjL3Bd5Cm+kJJSjOODxeQ2cnTnqbxaXjg=; Proxy: null), S3 Extended Request ID: foP4XEGSFN258IhbdV8NolM8Rmn8pESxAIK8LCwxFWxjL3Bd5Cm+kJJSjOODxeQ2cnTnqbxaXjg=:403 Forbidden
... 49 elided
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 7WGTHV5104XV9QG1; S3 Extended Request ID: foP4XEGSFN258IhbdV8NolM8Rmn8pESxAIK8LCwxFWxjL3Bd5Cm+kJJSjOODxeQ2cnTnqbxaXjg=; Proxy: null)
... 66 more
原因是我们的fs.s3a.endpoint设置的值不对,网上很多网站配置的值都是s3.cn-north-1.amazonaws.com.cn
,比如这篇文章:https://blog.csdn.net/zhouyan8603/article/details/77640643,这样作为新手的我很容易被带进坑里出不来~,正确的值应该是ip:7480,设置Nginx代理的8080端口也不行,原因未知,另外在同一个spark-shell里设置了错误的endpoint值,我们再重新设置该值还是会报同样的错误,必须退出spark-shell重新设置才可以。
No AWS Credentials provided by BasicAWSCredentialsProvider
22/09/29 19:22:27 WARN InstanceMetadataServiceResourceFetcher: Fail to retrieve token
com.amazonaws.SdkClientException: Failed to connect to service endpoint:
org.apache.hadoop.fs.s3a.AWSClientIOException: doesBucketExist on txt: com.amazonaws.AmazonClientException: No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Failed to connect to service endpoint: : No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Failed to connect to service endpoint:
这是因为我们没有正确的设置fs.s3a.access.key
和fs.s3a.secret.key
,有一些文章上写的key为spark.hadoop.fs.s3a.access.key
和和spark.hadoop.fs.s3a.secret.key
,也就是所有的key都多了前缀spark.hadoop
,经过验证,这样配置是不生效的,以下几种方式都不对:
spark.sparkContext.hadoopConfiguration.set("spark.hadoop.fs.s3a.access.key","XLSMNHY9Z4ML094IBGOY")
spark.conf.set("fs.s3a.access.key", "XLSMNHY9Z4ML094IBGOY")
spark.conf.set("spark.hadoop.fs.s3a.access.key","XLSMNHY9Z4ML094IBGOY")
以上就是我遇到的几个主要的问题及解决思路,当然大家一开始按照我总结的正确的方式是不会有这些问题的。记录在这里主要是为了备忘和为其他一开始没有按照我写的文章进行配置导致出现同样问题的同学提供解决思路,网上资料太少了~
总结
本文主要总结了Spark读写Ceph S3文件的配置和代码示例,以及一些异常的解决方法,希望能对大家有所帮助。
本文由 董可伦 发表于 伦少的博客 ,采用署名-非商业性使用-禁止演绎 3.0进行许可。
非商业转载请注明作者及出处。商业转载请联系作者本人。
相关文章
- Hudi Spark SQL源码学习总结-Create Table
- 3.Python3基础入门学习笔记(三)
- 视觉卷不动了,来看看分子领域?全球首个分子图像自监督学习框架ImageMol来了
- 机器学习之简单线性回归详解程序员
- Spark学习笔记4:数据读取与保存详解大数据
- Spark学习笔记6:Spark调优与调试详解大数据
- Spark源码分析之spark-submit详解大数据
- TensorFlow学习笔记(四)图像识别与卷积神经网络详解大数据
- Spark-Sql源码解析之五 Spark Planner:optimized logical plan –> spark plan详解大数据
- 用Spark轻松写入Redis(spark写入redis)
- 学习笔记:Linux汇编语法入门(linux汇编语法)
- Linux:学习难度是否值得一试?(linux好学么)
- 跟Hoevo缪庆学习VR/AR/MR的空间定位技术 | 雷锋网公开课
- Oracle 学习之旅:收获与心得体会(oracle心得体会)
- Linux学习之路:掌握网盘的技巧(linux教程网盘)
- Spark构建Redis数据按照高效实时处理(spark连接redis)
- 利用Spark加速访问Redis(spark访问redis)
- 激发火花,Spark整合Redis(spark整合redis)
- 基于Spark实现Redis数据库查询(spark查询redis)
- Spark构建实时应用存储分析引擎Redis(spark存储redis)
- 从零开始学习MySQL下载和启动入门教程(mysql下载启动教程)
- 从零学习Redis源码,迈向开发大门的第一步(从零开始学redis源码)
- 解答Oracle书后题实现更透彻的学习(oracle书后题)
- Oracle Les01学习初步,掌握基础知识(oracle les01)
- PrototypeString对象学习