Spark修炼之道(进阶篇)——Spark入门到精通:第十六节 Spark Streaming与Kafka
2023-09-14 09:00:24 时间
Spark Streaming与Kafka版的WordCount示例(一)
Spark Streaming与Kafka版的WordCount示例(二)
1. Spark Streaming与Kafka版本的WordCount示例 (一)
if (args.length 4) { System.err.println("Usage: KafkaWordCountProducer metadataBrokerList topic " + " messagesPerSec wordsPerMessage ") System.exit(1) val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args // Zookeeper连接属性配置 val props = new HashMap[String, Object]() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") //创建KafkaProducer val producer = new KafkaProducer[String, String](props) // 向kafka集群发送消息 while(true) { (1 to messagesPerSec.toInt).foreach { messageNum = val str = (1 to wordsPerMessage.toInt).map(x = scala.util.Random.nextInt(10).toString) .mkString(" ") val message = new ProducerRecord[String, String](topic, null, str) producer.send(message) Thread.sleep(1000) }
root@sparkslave02:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-server-start.sh config/server.properties root@sparkslave01:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-server-start.sh config/server.properties root@sparkmaster:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-server-start.sh config/server.properties
向kafka集群发送消息
root@sparkslave01:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-console-producer.sh --broker-list sparkslave01:9092 --sync --topic kafkatopictest
import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord} import org.apache.log4j.{Level, Logger} import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ import org.apache.spark.{Logging, SparkConf} object KafkaWordCount { def main(args: Array[String]) { if (args.length 4) { System.err.println("Usage: KafkaWordCount zkQuorum group topics numThreads ") System.exit(1) StreamingExamples.setStreamingLogLevels() val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[4]") val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint") val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap //创建ReceiverInputDStream val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x = (x, 1L)) .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2) wordCounts.print() ssc.start() ssc.awaitTermination() }
配置运行参数:
具体如下:
sparkmaster:2181 test-consumer-group kafkatopictest 1
sparkmaster:2181,zookeeper监听地址
test-consumer-group, consumer-group的名称,必须和$KAFKA_HOME/config/consumer.properties中的group.id的配置内容一致
kafkatopictest,topic名称
1,线程数
运行KafkaWordCount 后,在producer中输入下列内容
root@sparkslave01:/hadoopLearning/kafka_2.10-0.8.2.1# bin/kafka-console-producer.sh --broker-list sparkslave01:9092 --sync --topic kafkatopictest [2015-11-04 03:25:39,666] WARN Property topic is not valid (kafka.utils.VerifiableProperties) Spark Spark TEST TEST Spark Streaming
得到结果如下:
前面的例子中,producer是通过kafka的脚本生成的,本例中将给出通过编写程序生成的producer
if (args.length 4) { System.err.println("Usage: KafkaWordCountProducer metadataBrokerList topic " + " messagesPerSec wordsPerMessage ") System.exit(1) val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args // Zookeeper连接属性配置 val props = new HashMap[String, Object]() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") //创建KafkaProducer val producer = new KafkaProducer[String, String](props) // 向kafka集群发送消息 while(true) { (1 to messagesPerSec.toInt).foreach { messageNum = val str = (1 to wordsPerMessage.toInt).map(x = scala.util.Random.nextInt(10).toString) .mkString(" ") val message = new ProducerRecord[String, String](topic, null, str) producer.send(message) Thread.sleep(1000) }
KafkaWordCountProducer 运行参数设置如下:
sparkmaster:9092 kafkatopictest 5 8
sparkmaster:9092,broker-list
kafkatopictest,top名称
5表示每秒发多少条消息
8表示每条消息中有几个单词
先KafkaWordCountProducer,然后再运行KafkaWordCount ,得到的计算结果如下:
相关文章
- 利用flume+kafka+storm+mysql构建大数据实时系统
- Apache Kafka + Spark Streaming Integration
- Apache Kafka工作流程| Kafka Pub-Sub Messaging
- 阿里云消息队列 Kafka-消息检索实践
- 大叔经验分享(138)kafka重置offset
- Spark修炼之道(进阶篇)——Spark入门到精通:第十五节 Kafka 0.8.2.1 集群搭建
- Spark on k8s: 通过hostPath设置SPARK_LOCAL_DIRS加速Shuffle
- kafka安装、配置、启动、常用命令及shell启动脚本编写
- kafka发送及消费消息示例
- spark集成kafka数据源
- kafka详解一、Kafka简介
- 云小课|MRS数据分析-通过Spark Streaming作业消费Kafka数据
- kafka.common.KafkaException: Failed to acquire lock on file .lock in /tmp/kafka-logs. A Kafka instance in another process or thread is using this directory.
- y151.第八章 Servless和Knative从入门到精通 -- Kafka 与Eventing(十五)
- Spark 以及 spark streaming 核心原理及实践
- Spark实战(八)spark的几种启动方式
- Spark实战(七)spark streaming +kafka(Python版)
- Spark实战(五)spark streaming + flume(Python版)
- Spark实战(四)spark+python快速入门实战小例子(PySpark)
- Flume+Kafka+Spark Streaming+MySQL实时日志分析
- Kafka单节点单broker的部署和使用
- Kafka 消费组位移