zl程序教程

您现在的位置是:首页 >  工具

当前栏目

Spark 读写 Ceph S3入门学习总结

学习Spark入门 总结 读写 ceph S3
2023-06-13 09:13:44 时间

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun

前言

首先安装好Ceph,可以参考我上篇文章Ceph分布式集群安装配置

版本

spark: 2.4.5 hadoop: hdp版本 3.1.1.3.1.0.0-78

spark-shell读写S3

jar包配置

hadoop-aws-3.1.1.3.1.0.0-78.jar 注意版本要和hadoop版本对应
aws-java-sdk-s3-1.12.22.jar
aws-java-sdk-core-1.12.22.jar
aws-java-sdk-dynamodb-1.12.22.jar
可能还需要:
hadoop-client-api-3.1.1.3.1.0.0-78.jar
hadoop-client-runtime-3.1.1.3.1.0.0-78.jar

将上面的jar包拷贝到$SPARK_HOME/jars

spark-shell读

s3cmd创建测试文件

创建Bucket

s3cmd mb s3://txt
Bucket 's3://txt/' created

本地生成测试txt

vi test.txt
1
2
3
4

将test.txt上传到s3://txt

s3cmd put test.txt s3://txt
upload: 'test.txt' -> 's3://txt/test.txt'  [1 of 1]
 8 of 8   100% in    0s    45.82 B/s  done

代码

spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "XLSMNHY9Z4ML094IBGOY")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "iGn9bmqKAArUqiMIohYDmF3WSPi0YAyVO3J9WnxZ")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "10.110.105.162:7480")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.connection.ssl.enabled", "false")

val rdd = spark.read.text("s3a://txt/test.txt")
rdd.count
rdd.foreach(println(_))

结果

scala> rdd.count
res4: Long = 4

scala> rdd.foreach(println(_))
[1]
[2]
[3]
[4]

spark-shell写

s3cmd创建用于写的Bucket

s3cmd mb s3://test-s3-write
Bucket 's3://test-s3-write/' created

代码

spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "XLSMNHY9Z4ML094IBGOY")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "iGn9bmqKAArUqiMIohYDmF3WSPi0YAyVO3J9WnxZ")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "10.110.105.162:7480")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.connection.ssl.enabled", "false")
import spark.implicits._

val df = Seq((1, "a1", 10, 1000, "2022-09-27")).toDF("id", "name", "value", "ts", "dt")
df.write.mode("overwrite").parquet("s3a://test-s3-write/test_df")

spark.read.parquet("s3a://test-s3-write/test_df").show

验证

我们在上面的代码里已经通过读来验证了一次

spark.read.parquet("s3a://test-s3-write/test_df").show
+---+----+-----+----+----------+
| id|name|value|  ts|        dt|
+---+----+-----+----+----------+
|  1|  a1|   10|1000|2022-09-27|
+---+----+-----+----+----------+

接下来再用s3cmd命令验证对应的s3路径下是否有对应的我们用代码写的parquet文件

s3cmd ls s3://test-s3-write/test_df/
2022-09-28 07:35            0  s3://test-s3-write/test_df/_SUCCESS
2022-09-28 07:35         1222  s3://test-s3-write/test_df/part-00000-f1f23a4e-bb07-424c-853e-59281af2920c-c000.snappy.parquet

IDEA Spark代码读写S3

pom依赖

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-aws</artifactId>
    <version>3.1.1.3.1.0.0-78</version>
</dependency>
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>aws-java-sdk-core</artifactId>
    <version>1.12.22</version>
</dependency>
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>aws-java-sdk-s3</artifactId>
    <version>1.12.22</version>
</dependency>
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>aws-java-sdk-dynamodb</artifactId>
    <version>1.12.22</version>
</dependency>

代码

import org.apache.spark.sql.SparkSession

object SparkS3Demo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().
      master("local[*]").
      appName("SparkS3Demo").
      getOrCreate()
    spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "XLSMNHY9Z4ML094IBGOY")
    spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "iGn9bmqKAArUqiMIohYDmF3WSPi0YAyVO3J9WnxZ")
    spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "10.110.105.162:7480")
    spark.sparkContext.hadoopConfiguration.set("fs.s3a.connection.ssl.enabled", "false")

    testReadTxt(spark)
    testWriteAndReadParquet(spark)

    spark.stop()
  }

  /**
   * 测试Spark读S3txt
   * 需要先创建Bucket s3cmd mb s3://txt
   * 再上传test.txt s3cmd put test.txt s3://txt
   * 最后用Spark读
   */
  def testReadTxt(spark: SparkSession) = {
    val rdd = spark.read.text("s3a://txt/test.txt")
    println(rdd.count)
    rdd.foreach(println(_))
  }

  /**
   * s3cmd mb s3://test-s3-write
   * 需要先创建Bucket
   */
  def testWriteAndReadParquet(spark: SparkSession) = {
    import spark.implicits._

    val df = Seq((1, "a1", 10, 1000, "2022-09-27")).toDF("id", "name", "value", "ts", "dt")
    df.write.mode("overwrite").parquet("s3a://test-s3-write/test_df")

    spark.read.parquet("s3a://test-s3-write/test_df").show
  }
}

完整代码

完整代码已上传到GitHub,有需要的同学可自行下载:https://github.com/dongkelun/S3_Demo

s3,s3a,s3n 的区别

引用自https://m.imooc.com/wenda/detail/606708

  • S3本机文件系统(URI方案:s3n)用于在S3上读写常规文件的本机文件系统。该文件系统的优点是您可以访问S3上用其他工具编写的文件。相反,其他工具可以访问使用Hadoop编写的文件。缺点是S3施加的文件大小限制为5GB。
  • S3A(URI方案:s3a)S3a:系统是S3本机s3n fs的后继产品,它使用Amazon的库与S3进行交互。这使S3a支持更大的文件(没有更多的5GB限制),更高性能的操作等等。该文件系统旨在替代S3本机/替代S3本机:从s3n:// URL访问的所有对象也应该仅通过替换URL架构就可以从s3a访问。
  • S3块文件系统(URI方案:s3)由S3支持的基于块的文件系统。文件存储为块,就像它们在HDFS中一样。这样可以有效地执行重命名。此文件系统要求您为文件系统专用存储桶-您不应使用包含文件的现有存储桶,也不应将其他文件写入同一存储桶。该文件系统存储的文件可以大于5GB,但不能与其他S3工具互操作。

异常解决

缺jar包

  • 异常:java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found 缺包:hadoop-aws-3.1.1.3.1.0.0-78.jar
  • 异常:java.lang.NoClassDefFoundError: com/amazonaws/services/s3/model/MultiObjectDeleteException 缺包:aws-java-sdk-s3-1.12.22.jar
  • 异常:java.lang.NoClassDefFoundError: com/amazonaws/AmazonServiceException java.lang.NoClassDefFoundError: com/amazonaws/SdkBaseException 缺包:aws-java-sdk-core-1.12.22.jar
  • 异常:java.lang.NoClassDefFoundError: com/amazonaws/services/dynamodbv2/model/AmazonDynamoDBException 缺包:aws-java-sdk-dynamodb-1.12.22.jar
  • 异常:java.lang.NoClassDefFoundError: org/apache/hadoop/fs/statistics/IOStatisticsSource 缺包:hadoop-client-api-3.1.1.3.1.0.0-78.jar
  • 异常:java.lang.NoClassDefFoundError: org/apache/hadoop/thirdparty/com/google/common/base/Preconditions 缺包:hadoop-client-runtime-3.1.1.3.1.0.0-78

Status Code: 403; Error Code: 403 Forbidden

java.nio.file.AccessDeniedException: s3a://txt/test.txt: getFileStatus on s3a://txt/test.txt: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 7WGTHV5104XV9QG1; S3 Extended Request ID: foP4XEGSFN258IhbdV8NolM8Rmn8pESxAIK8LCwxFWxjL3Bd5Cm+kJJSjOODxeQ2cnTnqbxaXjg=; Proxy: null), S3 Extended Request ID: foP4XEGSFN258IhbdV8NolM8Rmn8pESxAIK8LCwxFWxjL3Bd5Cm+kJJSjOODxeQ2cnTnqbxaXjg=:403 Forbidden
  ... 49 elided
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 7WGTHV5104XV9QG1; S3 Extended Request ID: foP4XEGSFN258IhbdV8NolM8Rmn8pESxAIK8LCwxFWxjL3Bd5Cm+kJJSjOODxeQ2cnTnqbxaXjg=; Proxy: null)
  ... 66 more

原因是我们的fs.s3a.endpoint设置的值不对,网上很多网站配置的值都是s3.cn-north-1.amazonaws.com.cn,比如这篇文章:https://blog.csdn.net/zhouyan8603/article/details/77640643,这样作为新手的我很容易被带进坑里出不来~,正确的值应该是ip:7480,设置Nginx代理的8080端口也不行,原因未知,另外在同一个spark-shell里设置了错误的endpoint值,我们再重新设置该值还是会报同样的错误,必须退出spark-shell重新设置才可以。

No AWS Credentials provided by BasicAWSCredentialsProvider

22/09/29 19:22:27 WARN InstanceMetadataServiceResourceFetcher: Fail to retrieve token
com.amazonaws.SdkClientException: Failed to connect to service endpoint:
org.apache.hadoop.fs.s3a.AWSClientIOException: doesBucketExist on txt: com.amazonaws.AmazonClientException: No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Failed to connect to service endpoint: : No AWS Credentials provided by BasicAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Failed to connect to service endpoint:

这是因为我们没有正确的设置fs.s3a.access.keyfs.s3a.secret.key,有一些文章上写的key为spark.hadoop.fs.s3a.access.key和和spark.hadoop.fs.s3a.secret.key,也就是所有的key都多了前缀spark.hadoop,经过验证,这样配置是不生效的,以下几种方式都不对:

spark.sparkContext.hadoopConfiguration.set("spark.hadoop.fs.s3a.access.key","XLSMNHY9Z4ML094IBGOY")
spark.conf.set("fs.s3a.access.key", "XLSMNHY9Z4ML094IBGOY")
spark.conf.set("spark.hadoop.fs.s3a.access.key","XLSMNHY9Z4ML094IBGOY")

以上就是我遇到的几个主要的问题及解决思路,当然大家一开始按照我总结的正确的方式是不会有这些问题的。记录在这里主要是为了备忘和为其他一开始没有按照我写的文章进行配置导致出现同样问题的同学提供解决思路,网上资料太少了~

总结

本文主要总结了Spark读写Ceph S3文件的配置和代码示例,以及一些异常的解决方法,希望能对大家有所帮助。

本文由 董可伦 发表于 伦少的博客 ,采用署名-非商业性使用-禁止演绎 3.0进行许可。

非商业转载请注明作者及出处。商业转载请联系作者本人。

本文标题:Spark 读写 Ceph S3入门学习总结

本文链接:https://dongkelun.com/2022/09/30/saprkS3/