zl程序教程

您现在的位置是:首页 >  其他

当前栏目

Spark报错记录:Overloaded method foreachBatch with alternatives

2023-02-26 09:48:04 时间

Structured Streaming报错记录:Overloaded method foreachBatch with alternatives0. 写在前面1. 报错2. 代码及报错信息3. 原因及纠错4. 参考链接


Structured Streaming报错记录:Overloaded method foreachBatch with alternatives

0. 写在前面

  • Spark : Spark3.0.0
  • Scala : Scala2.12

1. 报错

overloaded method value foreachBatch with alternatives:

2. 代码及报错信息

Error:(48, 12) overloaded method value foreachBatch with alternatives: (function:org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] <and> (function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] cannot be applied to ((org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], Any) => org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]) .foreachBatch((df, batchId) => {

 import java.util.Properties
 import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
 import org.apache.spark.sql.{DataFrame, SparkSession}
 
 object ForeachBatchSink1 {
     def main(args: Array[String]): Unit = {
         val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("ForeachSink1")
            .getOrCreate()
         import spark.implicits._
         
         val lines: DataFrame = spark.readStream
            .format("socket") // 设置数据源
            .option("host", "cluster01")
            .option("port", 10000)
            .load
         
         val props = new Properties()
         props.setProperty("user", "root")
         props.setProperty("password", "1234")
         
         val query: StreamingQuery = lines.writeStream
            .outputMode("update")
            .foreachBatch((df, batchId) => {
                 val result = df.as[String].flatMap(_.split("\\W+")).groupBy("value").count()
               
                 result.persist()
               result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
                 result.write.mode("overwrite").json("./foreach1")
                 result.unpersist()
            })
 //           .trigger(Trigger.ProcessingTime(0))
            .trigger(Trigger.Continuous(10))
            .start
         query.awaitTermination()
       
    }
 }


/**

  • Error:(43, 12) overloaded method value foreachBatch with alternatives:
  • (function:org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] <and>
  • (function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row]
  • cannot be applied to ((org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], Any) => org.apache.spark.sql.DataFrame)
  • .foreachBatch((df, batchId) => {*/
 import java.util.Properties
 import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
 import org.apache.spark.sql.{DataFrame, SparkSession}
 
 object ForeachBatchSink {
     def main(args: Array[String]): Unit = {
         val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("ForeachSink")
            .getOrCreate()
         import spark.implicits._
         
         val lines: DataFrame = spark.readStream
            .format("socket") // 设置数据源
            .option("host", "cluster01")
            .option("port", 10000)
            .load
         
         val props = new Properties()
         props.setProperty("user", "root")
         props.setProperty("password", "1234")
         
         val query: StreamingQuery = lines.writeStream
            .outputMode("complete")
            .foreachBatch((df, batchId) => {          
                 result.persist()
                 result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
                 result.write.mode("overwrite").json("./foreach")
                 result.unpersist()
            })
            .start
         query.awaitTermination()
       
    }
 }

3. 原因及纠错

Scala2.12版本和2.11版本的不同,对于foreachBatch()方法的实现不太一样

正确代码如下

 import java.util.Properties
 import org.apache.spark.sql.streaming.StreamingQuery
 import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
 
 
 object ForeachBatchSink {
 
     def myFun(df: Dataset[Row], batchId: Long, props: Properties): Unit = {
         println("BatchId" + batchId)
         if (df.count() != 0) {
             df.persist()
             df.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
             df.write.mode("overwrite").json("./StructedStreaming_sink-ForeachBatchSink")
             df.unpersist()
        }
    }
 
     def main(args: Array[String]): Unit = {
 
         val spark: SparkSession = SparkSession
          .builder()
          .master("local[2]")
          .appName("ForeachBatchSink")
          .getOrCreate()
         import spark.implicits._
 
         val lines: DataFrame = spark.readStream
          .format("socket") // TODO 设置数据源
          .option("host", "cluster01")
          .option("port", 10000)
          .load
 
         val wordCount: DataFrame = lines.as[String]
          .flatMap(_.split("\\W+"))
          .groupBy("value")
          .count()  // value count
 
         val props = new Properties()
         props.setProperty("user", "root")
         props.setProperty("password", "1234")
 
         val query: StreamingQuery = wordCount.writeStream
          .outputMode("complete")
          .foreachBatch((df : Dataset[Row], batchId : Long) => {
               myFun(df, batchId, props)
          })
          .start
 
         query.awaitTermination()
 
    }
 }

 import java.util.Properties
 
 import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
 import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
 
 object ForeachBatchSink1 {
 
     def myFun(df: Dataset[Row], batchId: Long, props: Properties, spark : SparkSession): Unit = {
         import spark.implicits._
         println("BatchId = " + batchId)
         if (df.count() != 0) {
             val result = df.as[String].flatMap(_.split("\\W+")).groupBy("value").count()
             result.persist()
             result.write.mode("overwrite").jdbc("jdbc:mysql://cluster01:3306/test","wc", props)
             result.write.mode("overwrite").json("./StructedStreaming_sink-ForeachBatchSink1")
             result.unpersist()
        }
    }
 
     def main(args: Array[String]): Unit = {
 
         val spark: SparkSession = SparkSession
          .builder()
          .master("local[2]")
          .appName("ForeachBatchSink1")
          .getOrCreate()
         import spark.implicits._
 
         val lines: DataFrame = spark.readStream
          .format("socket") // TODO 设置数据源
          .option("host", "cluster01")
          .option("port", 10000)
          .load
 
         val props = new Properties()
         props.setProperty("user", "root")
         props.setProperty("password", "1234")
 
         val query: StreamingQuery = lines.writeStream
          .outputMode("update")
          .foreachBatch((df : Dataset[Row], batchId : Long) => {
                 myFun(df, batchId, props, spark)
          })
          .trigger(Trigger.Continuous(10))
          .start
         query.awaitTermination()
 
    }
 }

4. 参考链接

https://blog.csdn.net/Shockang/article/details/120961968

小手一点关注,文章提前阅读