zl程序教程

您现在的位置是:首页 >  大数据

当前栏目

[Spark] 05 - Apache Kafka

2023-09-27 14:23:24 时间

前言


Ref: kafka中文教程

作为消息中间件,其他组件先跟Kafka交流,然后再有Kafka统一跟Hadoop沟通。

 

一、kafka名词解释

producer:生产者,就是它来生产“鸡蛋”的。

consumer:消费者,生出的“鸡蛋”它来消费。

topic:你把它理解为标签,生产者每生产出来一个鸡蛋就贴上一个标签(topic),消费者可不是谁生产的“鸡蛋”都吃的,这样不同的生产者生产出来的“鸡蛋”,消费者就可以选择性的“吃”了。

broker:就是篮子了。


二、与 Storm 比较

除了Kafka Streams,还有Apache Storm和Apache Samza可选择。

Ref: 大牛总结分享:大数据技术Storm 区别 Kafka 哪些场景更适合

可见更多的是“集成合作关系”。

 

三、Zookeeper安装

要与kafka文件夹中自带的zk的版本要一样:https://archive.apache.org/dist/zookeeper/zookeeper-3.4.8/

Ref: ZooKeeper的安装与部署(集群模式)

Ref: How to Setup Apache ZooKeeper Cluster on Ubuntu 18.04 LTS(单机模式下实践没问题)

Ref: 报错:WARN [WorkerSender[myid=1]:QuorumCnxManager@584] - Cannot open channel to 2 at election address /x.x.x.x:3888

ipv6的坑,记得直接关掉就好了。

$ sudo sysctl -w net.ipv6.conf.all.disable_ipv6=1
$ sudo sysctl -w net.ipv6.conf.default.disable_ipv6=1

 

四、Kafka安装

Ref: Spark2.1.0+入门:Apache Kafka作为DStream数据源(Python版) 

Ref: Kafka集群部署 (守护进程启动)

Ref: centos7下kafka集群安装部署

Ref: Zookeeper+Kafka集群部署(已测、可用)

 

 

 

 

高级输入源


Kafka 基本知识

一、Kafka 注册信息

Broker 和 Topic

消息具有类别 (Topic) 属性。一个 topic 的消息可能保存在一个或多个broker上。

分区 (Partition) 是物理上的概念,每个 topic 包含一个或多个 partition。

 

生产者 和 消费者

Producer --> Kafka Broker --> Consumer (Spark Streaming)

每一个消费者只属于某一个组 (Consumer Group),没指定就在默认的组。

 

 

二、ZooKeeper

Kafka的运行依赖于 ZooKeeper,其 "注册信息" 都在其中。

所以,先启动 ZooKeeper,再启动 Kafka。

 

安装Kafka

参考资料:Spark2.1.0+入门:Apache Kafka作为DStream数据源(Python版) 

不同的版本兼容不同的spark,例如:Kafka_2.11 - 0.8.2.2.tgz,2.11是scala版本号。

根据spark配置Kafka,过程在此略,详见 “课时64”。

记得下载对应的jar包以及/usr/local/kafka/libs下的内容,一并拷贝到/usr/local/spark/jars/kafka子目录。

在spark-env.sh设置:

 

启动Kafka

# 打开第一个终端,先启动 zookeeper
$ cd /usr/local/kafka
$ ./bin/zookeeper-server-start.sh  config/zookeeper.properties

# 打开第二个终端,再启动 kafka
$ cd /usr/local/kafka
$ bin/kafka-server-start.sh config/server.properties

# 打开第三个终端
# 创建一个topic:wordsendertest
$ cd /usr/local/kafka
$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 \
>--replication-factor 1 --partitions 1 --topic wordsendertest

# 列出所有创建的Topic,验证是否创建成功
$ ./bin/kafka-topics.sh --list --zookeeper localhost:2181

# 创建生产者给topic扔数据,可以在当前终端输入一些测试文字
./bin/kafka-console-producer.sh --broker-list localhost:9092 \
> --topic wordsendertest

# 打开第四个终端
# 创建消费者接收topic的数据,接收到“以上输入的文字”
$ cd /usr/local/kafka
$ ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 \
> --topic wordsendertest --from-beginning

 

 

Spark Streaming

将以上 ”第四个终端“ 换为如下自定义的 ”消费者程序“。

localhost:9092 ----> Kafka 作为数据源 ----> localhost:2181

from __future__ import print_function
 
import sys
 
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
 
if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
        exit(-1)
 
# 准备参数 sc
= SparkContext(appName="PythonStreamingKafkaWordCount") ssc = StreamingContext(sc, 1) zkQuorum, topic = sys.argv[1:]

kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})


# 至此,kafka作为数据源,开始“转换”

lines
= kvs.map(lambda x: x[1]) counts = lines.flatMap(lambda line: line.split(" ")) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a+b) counts.pprint() ssc.start() ssc.awaitTermination()

运行程序:

/usr/local/spark/bin/spark-submit ./KafkaWordCount.py  localhost:2181  wordsendertest

 

  

Structured Streaming

一、准备期间

例子逻辑

生产者每0.1秒生成2个单词并写入此topic。

消费者订阅 wordcount-topic 收到单词,并每隔8秒钟进行一次统计。

统计结果发送给另一个主题:wordcount-result-topic

 

启动 Kafka

# (1) 启动 Zookeeper 服务
bin/zookeeper-server-start.sh config/zookeeper.properties
  
# (2) 启动 Kafka 服务
bin/kafka-server-start.sh config/server.properties
  
# (3) 监督"输入"终端 
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wordcount-topic
 
# (4) 监督"输出"终端
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wordcount-result-topic 

 

 

二、编写 "生产者"

# spark_ss_kafka_producer.py

import string
import random
import time
from kafka import KafkaProducer

if __name__ = "__main__":
  # broker服务器的位置9092
  producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

  while True: 
    s2 = (random.choice(string.ascii_lowercase) for _ in range(2))  # 随机生成两个小写字母
    word = ''.join(s2)                # 拼接起来
    value = bytearray(word, 'utf-8')  # 字节序列

    producer.send('wordcount-topic', value=value).get(timeout=10)  # 生产者向该主题发送出去,循环发送
    time.sleep(0.1)

运行代码 

sudo apt-get install pip3
sudo pip3 install kafka-python

python3 spark_ss_kafka_producer.py

 

  

三、编写 "消费者"

从 topic: wordcount-topic 获得消息,然后再往 topic: wordcount-result-topic 中投入消息。

# spark_ss_kafka_consumer.py

from pyspark.sql import SparkSession

if __name__ == "__main__":

  spark = SparkSessoin.builder.appName("StructuredKafkaWordCount").getOrCreate()
  spark.sparkContext.setLogLevel('WARN')

  lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", 'wordcount-topic').load().selectExpr("CAST(value AS STRING)")  # 转化为字符串类型

  wordCounts = lines.groupBy("value").count()

运行代码

/usr/local/spark/bin/spark-submit  \
> --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 
> spark_ss_kafka_consumer.py

 

 

 

 

Kafka Cluster


终端版本

一、topic基本操作

创建、显示等。

hadoop@node-master$ bin/kafka-topics.sh --create --zookeeper node-master:2181, node1:2181, node3:2181, node3:2181 --replication-factor 3 --partitions 3 --topic mytopic
Created topic "mytopic".
hadoop@node-master$
bin/kafka-topics.sh --describe --zookeeper node-master:2181, node1:2181, node3:2181, node3:2181 --topic mytopic
Topic:mytopic PartitionCount:3 ReplicationFactor:3 Configs: Topic: mytopic Partition: 0 Leader: none Replicas: 1,0,2 Isr: Topic: mytopic Partition: 1 Leader: none Replicas: 2,1,3 Isr: Topic: mytopic Partition: 2 Leader: none Replicas: 3,2,0 Isr:
hadoop@node-master$
bin/kafka-topics.sh --list --zookeeper node-master:2181, node1:2181, node3:2181, node3:2181
__consumer_offsets mytopic test test_topic topicName

 

二、“生产者” 喂 “消费者”

一个topic相当于建立好的一条虚拟管道。

生产者接连发送“hello" "world"。

hadoop@node-master$ bin/kafka-console-producer.sh --broker-list node-master:9092, node1:9092, node3:9092, node3:9092   -topic mytopic

hello
world

消费者依次收到“hello" "world"。

hadoop@node-master$ bin/kafka-console-consumer.sh --zookeeper node-master:2181, node1:2181, node3:2181, node3:2181 -topic mytopic --from-beginning
Using the ConsoleConsumer with old consumer
is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. hello world

 

三、关闭服务

删除 topic 和关闭服务

bin/kafka-topics.sh --delete --zookeeper node-master:2181, node1:2181, node3:2181, node3:2181 --topic mytopic

启动服务:

bin/kafka-server-start.sh config/server.properties &

停止服务:

bin/kafka-server-stop.sh

至此Zookeeper+Kafka集群配置成功.

 

 

代码版本

一、生产者

node-master作为生产者。然后,kafka cluster中的成员都可以监听。

# spark_ss_kafka_producer.py

import string
import random
import time
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['node-master:9092'])

while True: 
    s2 = (random.choice(string.ascii_lowercase) for _ in range(2))
    word  = ''.join(s2)
    value = bytearray(word, 'utf-8')

    producer.send('wordcount-topic', value=value).get(timeout=10)

    time.sleep(0.1)

 

 

二、终端消费者

通过./kafka-console-consumer.sh通过终端监听内容。

hadoop@node-master$ bin/kafka-console-consumer.sh --zookeeper node-master:2181 -topic wordcount-topic
Using the ConsoleConsumer with old consumer
is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. lf dk uf yn ns ux

 

 

三、代码消费者

通过代码 接收msg,然后“做处理”,再发到另一个topic (wordcount-result-topic),通过以下终端方式打印出结果。

hadoop@node-master$ bin/kafka-console-consumer.sh --zookeeper node-master:2181  -topic wordcount-result-topic
Using the ConsoleConsumer with old consumer
is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. eg:3 pr:3 ei:2 rf:7 ss:7 ct:2 ld:4 mj:2 ey:5 pu:6 pi:2 wg:7
 

Init

In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext

import datetime
In [2]:
def fnGetAppName():

    currentSecond=datetime.datetime.now().second
    currentMinute=datetime.datetime.now().minute
    currentHour=datetime.datetime.now().hour

    currentDay=datetime.datetime.now().day
    currentMonth=datetime.datetime.now().month
    currentYear=datetime.datetime.now().year
    
    return "{}-{}-{}_{}-{}-{}".format(currentYear, currentMonth, currentDay, currentHour, currentMinute, currentSecond)
In [3]:
def fn_timer(a_func):

    def wrapTheFunction():
        import time
        time_start=time.time()
        
        a_func()
        
        time_end=time.time()
        print('totally cost {} sec'.format(time_end-time_start))
 
    return wrapTheFunction
In [4]:
appName = fnGetAppName()
print("appName: {}".format(appName))

conf = SparkConf().setMaster("spark://node-master:7077").setAppName(appName)
# conf = SparkConf().setMaster("local").setAppName(appName)
 
appName: 2019-11-20_14-16-27
 

Spark Context

In [5]:
sc = SparkContext(conf = conf)
 

Spark Session

In [6]:
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
 

Spark Stream

In [7]:
ssc = StreamingContext(sc, 1)
 

Let's Go

In [8]:
spark = SparkSession.builder.appName(appName).getOrCreate()
In [9]:
lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "node-master:9092").option("subscribe", 'wordcount-topic').load().selectExpr("CAST(value AS STRING)")
In [10]:
wordCounts = lines.groupBy("value").count()
In [14]:
query = wordCounts \
    .selectExpr("CAST(value AS STRING) as key", "CONCAT(CAST(value AS STRING), ':', CAST(count AS STRING)) as value") \
    .writeStream \
    .outputMode("complete") \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "node-master:9092") \
    .option("topic", "wordcount-result-topic") \
    .option("checkpointLocation", "file:///tmp/kafka-sink-cp") \
    .trigger(processingTime="9 seconds") \
    .start()
In [12]:
query.awaitTermination()
 
---------------------------------------------------------------------------
会报错,但手动重复执行query即可。
In [ ]:
 

End.