pyspark streaming简介 和 消费 kafka示例
Kafka 示例 简介 消费 Streaming Pyspark
2023-06-13 09:15:32 时间
# 简介
并不是真正的实时处理框架,只是按照时间进行微批处理进行,时间可以设置的尽可能的小。
将不同的额数据源的数据经过SparkStreaming 处理之后将结果输出到外部文件系统
- 特点
低延时 能从错误中搞笑的恢复: fault-tolerant 能够运行在成百上千的节点 能够将批处理、机器学习、图计算等自框架和Spark Streaming 综合起来使用
- 粗粒度
Spark Streaming接收到实时数据流,把数据按照指定的时间段切成一片片小的数据块,然后把小的数据块传给Spark Engine处理。
- 细粒度
- 数据源 kafka提供了两种数据源。
- 基础数据源,可以直接通过streamingContext API实现。如
文件系统
和socket连接
- 高级的数据源,如Kafka, Flume, Kinesis等等. 可以通过额外的类库去实现。
# 基础数据源
- 使用官方的案例
/spark/examples/src/main/python/streaming
nc -lk 6789
- 处理socket数据
示例代码如下: 读取socket中的数据进行流处理
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# local 必须设为2
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
lines = ssc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
wordCounts.pprint()
ssc.start()
ssc.awaitTermination()
测试
nc -lk 9999
- 处理文件系统数据
文件系统(fileStream(that is, HDFSM S3, NFS))暂不支持python,python仅支持文本文件(textFileStream)
示例如下,但未成功,找不到该文件。
lines = ssc.textFileStream("hdfs://txz-data0:9820/user/jim/workflow/crash/python/crash_2_hdfs.py")
- streaming context
- DStreams
持续化的数据流 对DStream操作算子, 比如map/flatMap,其实底层会被翻译为对DStream中的每个RDD都做相同的操作,因为一个DStream是由不同批次的RDD所
- Input DStreams and Receivers
# 高级数据源
# Spark Streaming 和 kafka 整合
两种模式
- receiver 模式
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext("local[2]", "NetworkWordCount")
sc.setLogLevel("OFF")
ssc = StreamingContext(sc, 1)
# 创建Kafka streaming
line = KafkaUtils.createStream(ssc, "192.168.0.208:2181", 'test', {"jim_test": 1})
# 分词
words = line.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
wordCounts.pprint()
ssc.start()
ssc.awaitTermination()
- no receiver
根据上面的代码替换掉createStream即可。
line = KafkaUtils.createDirectStream(ssc, ["jim_test"], {"metadata.broker.list": "192.168.0.208:9092"})
运行:
spark-submit --jars spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar test_spark_stream.py
需要下载相应的jar包.下载地址如下,搜索。 https://search.maven.org
jar版本会在运行程序时报错提醒。
相关文章
- 盘点一下我用kafka两年以来踩过的一些非比寻常的坑
- kafka学习之Kafka 的简介(一)
- docker部署Kafka_kafka容器部署
- kafka应用场景有哪些_kafka顺序性的消费
- OGG-15051|OGG 同步 Oracle 到 Kafka 时遇到的一个错误
- php 生产kafka 不生效问题
- 微系列:5、在Centos系统中,搭建Kafka集群
- Kafka常见面试题
- 不背锅运维:消息队列概念、kafka入门、Kafka Golang客户端库
- Spring Cloud Stream与Kafka集成示例
- kafka之三 Kafka 高可用详解大数据
- 欢迎购买《Kafka源码解析与实战》详解编程语言
- kafka源码解析之十一KafkaApis详解编程语言
- Oracle 数据流轻松集成 Kafka 服务:提高数据传输效率(oracle到kafka)
- Kafka连接Oracle数据库提高数据处理能力(kafka连oracle)
- 使用Kafka连接Oracle数据库(kafka到oracle)
- Oracle与Kafka新一代数据处理技术(oracle与kafka)
- Oracle与Kafka的联合应用突破传统数据处理极限(oracle与kafka)