sparkStreaming读取kafka写入hive表
Kafka 读取 hive 写入 sparkstreaming
2023-09-14 08:57:20 时间
sparkStreaming:
package hive import java.io.File import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} /** * spark消费多个topic的数据写入不同的hive表 */ object SparkToHive { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.WARN) Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.WARN) val warehouseLocation = new File("hdfs://cluster/hive/warehouse").getAbsolutePath @transient val spark = SparkSession .builder() .appName("Spark SQL To Hive") .config("spark.sql.warehouse.dir", warehouseLocation) .enableHiveSupport() .getOrCreate() spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") @transient val sc = spark.sparkContext val scc = new StreamingContext(sc, Seconds(1)) val kafkaParams = Map[String, Object]( "auto.offset.reset" -> "latest", //latest,earliest "value.deserializer" -> classOf[StringDeserializer] , "key.deserializer" -> classOf[StringDeserializer] , "bootstrap.servers" -> "10.200.10.24:6667,10.200.10.26:6667,10.200.10.29:6667" , "group.id" -> "test_jason" , "enable.auto.commit" -> (true: java.lang.Boolean) ) var stream: InputDStream[ConsumerRecord[String, String]] = null val topics = Array("test", "test1","test2") stream = KafkaUtils.createDirectStream[String, String]( scc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) ) stream.foreachRDD(rdd=>{ if (!rdd.isEmpty()) { val cache_rdd = rdd.map(_.value()).cache() // a 表 val a = cache_rdd.filter(_.contains("hello")) // b 表 val b = cache_rdd.filter(_.contains("jason")) // 都可以打印结果,下面的代码就不在写了,可以参考另一篇博客里面写hive的 a.foreach(println) b.foreach(println) } }) scc.start() scc.awaitTermination() } }
相关文章
- kafka学习之-server.properties详细说明
- FlinkCDC读取MySQL并写入Kafka案例(com.alibaba.ververica)
- Kafka项目实战-用户日志上报实时统计之编码实践
- 自己玩KAFKA 版本 kafka_2.13-3.2.1
- helm部署kafka完整记录
- kafka详解一、Kafka简介
- Kafka详解五、Kafka Consumer的底层API- SimpleConsumer
- kafka详解一、Kafka简介
- kafka详解三:开发Kafka应用
- Kafka集群管理工具kafka-manager的安装使用
- kafka中groupid作用
- Kafka基础篇学习笔记整理
- 【项目实战】Kafka中Topic创建介绍
- Kafka 拦截器
- Kafka 分区机制
- Kafka概述与工作原理
- 解开Kafka神秘的面纱(五):kafka优雅应用
- 【Docker系列】1.docker-compose部署zk集群+kafka集群
- kafka_2.11-单机部署