zl程序教程

您现在的位置是:首页 >  大数据

当前栏目

《KAFKA官方文档》第三章:快速入门(二)

Kafka文档官方入门 快速 第三章
2023-09-11 14:16:09 时间
第八步:使用Kafka流(Kafka Streams)处理数据

Kafka流是一个针对存储于Kafka brokers上的数据进行实时流处理和分析的客户端类库。快速入门中的示例将展示如何使用这个类库实现一个数据流处理应用。下面是其中的WordCountDemo数单词示例代码片段(转换成Java8的lambda表达式更便于阅读)。

“` // 字符串和长整型的序列化器与反序列化器(serde) final Serde stringSerde = Serdes.String(); final Serde longSerde = Serdes.Long();

// 从streams-file-input主题构造一个KStream,消息值代表了文本的每一行(这里我们忽略消息key中存储的数据) KStream String, String textLines = builder.stream(stringSerde, stringSerde, “streams-file-input”);

KTable String, Long wordCounts = textLines // 按空格拆分每个文本行为多个单独的词 .flatMapValues(value - Arrays.asList(value.toLowerCase().split(“\W+”)))

// 将单词分组后作为消息的key

.groupBy((key, value) - value)

// 统计每个单词出现的次数(即消息的key)

.count("Counts")

// 将运行结果作为变更记录流发送到输出主题 wordCounts.to(stringSerde, longSerde, “streams-wordcount-output”); “`

上面的代码实现了数单词算法(WordCount algorithm),即计算了输入文本中每一个单词出现的次数。不同于我们常见的计算给定数量文本的数单词算法,这个示例被设计来操作一个无限的、不确定数据量的数据流。跟有界的情况类似,这是一个有状态的算法,会跟踪和更新单词的数目。此算法必须假定输入的数据是没有边界的,这样因为不知道什么时候处理完所有的数据,所以每当处理了新输入的数据时,上述代码会随时输出当前的状态和处理结果。

我们先准备发送到Kafka主题的输入数据,这些数据将被Kafka流程序依次处理。

echo -e “all streams lead to kafka\nhello kafka streams\njoin kafka summit” file-input.txt

在Windows上执行:

echo all streams lead to kafka file-input.txt

echo hello kafka streams file-input.txt

echo|set /p=join kafka summit file-input.txt

接着,我们控制台上使用生产者脚本将这些数据发送到一个叫streams-file-input的主题,此脚本会从标准输入(STDIN)一行行的读取数据,然后把每一行内容作为一个单独的、key为null、值为字符串格式的Kafka消息,发送到这个主题(在实际应用中,只要应用程序一直运行,数据就可以一直持续的流向Kafka):

bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic streams-file-input

bin/kafka-console-producer.sh –broker-list localhost:9092 –topic streams-file-input file-input.txt

此时我们可以运行数单词示例程序来处理输入数据:

bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

示例程序将从streams-file-input主题读入数据,然后用数单词算法计算每一个消息数据,持续将当前计算结果写入到streams-wordcount-output主题。到目前为止,我们在任何命令行的标准输出上看不到这些结果,因为这些结果数据都被写入到Kafka输出主题streams-wordcount-output了。这个示例程序也将会一直运行,不像常见的流处理程序会在处理完以后自动结束。

现在我们可以通过读取Kafka输出主题来查看数单词示例程序的输出:

bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic streams-wordcount-output –from-beginning –formatter kafka.tools.DefaultMessageFormatter –property print.key=true –property print.value=true –property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer –property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

可以看到在控制台上输出如下:

all 1

lead 1

to 1

hello 1

streams 2

join 1

kafka 3

summit 1

第一列是java.lang.String类型的Kafka消息key,第二列是java.lang.Long类型的消息值。注意到输出的实际上是一个持续更新的流,每一个数据记录(上面输出的每一行)代表一个单词(比如kafka)每次更新的计数。对于同一个key,会有多个记录,每一个后面的记录代表之前记录的更新结果。

下面的两幅图展示了这个场景下到底发生了什么。图上的第一列展示了计算单词出现次数的KTable String, Long 的当前状态演化。第二列展示了状态更新导致的KTable变化记录,也是被发送到Kafka输出主题streams-wordcount-output中的数据。

“all stream lead to kafka”这行文本先被处理。随着每一个新单词作为一个表格中的新项(绿色背景高亮显示)加入到KTable,KTable表格逐渐增长,同时相应的变化记录被发送到下面的KStream中。

当第二行文本“hello kafka streams”被处理后,我们可以看到与此同时在KTable中已经存在的项立即被更新(即kafak和streams)。同样的变化记录也被发送到输出主题。

第三行处理也类似,我们暂且略去。这就解释了为何输出主题中的内容如上所,因为它包含了全部的变化记录。

如果我们跳出这个具体的示例来看,Kafka流所做的事情就是表和变更记录流之间的相互作用(这里表指的是KTable,变更记录流指的是下面的KStream):表中的每一个变化记录会发送到流中,当然如果我们从头至尾的消费一个完整的变更记录流,则可以重建这个表的全部内容。

现在我们可以写入更多消息到stream-file-input主题,然后观察这些消息被添加到streams-wordcount-output主题,这些消息反映出了更新的单词计数(可以在控制台使用上面提及的生产者脚本和消费者脚本来操作)。

最后我们可以在控制台使用Ctrl-C快捷键来结束消费者。

 

 

转载自 并发编程网 - ifeve.com


kafka官方文档学习笔记1--基本概念了解 什么是kafka? kafka是一个分布式流式平台,能够通过订阅-发布模式在应用之间实时传递数据流,同时保证可靠性和容错性;简单理解,就是一个分布式消息队列。 kafka涉及的3基本概念 kafka服务:既可以以单点方式运行,也可以通过多个节点组成集群运行; record:kafka中的每条记录称.
《KAFKA官方文档》第三章:快速入门(一) 快速入门 翻译者:kimmking@163.com 原文:kafka.apache.org/quickstart 本教程假设读者完全从零开始,电脑上没有已经存在的Kafka和Zookeeper环境。
《KAFKA官方文档》设计与实现(二) 5.4 消息格式 * 1. 消息的4字节CRC32 * 2. 一个字节的 identifier ,用以格式的变化,变化的值为0 或者1 * 3. 一个字节的 identifier属性,允许消息的注释与版本无关 * 位 0 ~ 2 : 压缩编解码 * 0 : 无压缩 * 1 :
《KAFKA官方文档》设计与实现(一) 5.设计与实现(IMPLEMENTATION) 5.1 API 设计 生产者 APIS 生产者API包含2个producers-kafka.producer.SyncProducer和kafka.producer.async.AsyncProducer。