OUT了吧,Kafka能实现消息延时了
摘要:本文讲述如何在保存Kafka特有能力的情况下给Kafka扩充一个具有能处理延时消息场景的能力。
本文分享自华为云社区《Kafka也能实现消息延时了?》,作者:HuaweiCloudDeveloper 。
1、背景
Kafka是一个拥有高吞吐、可持久化、可水平扩展,支持流式数据处理等多种特性的分布式消息流处理中间件,采用分布式消息发布与订阅机制,在日志收集、流式数据传输、在线/离线系统分析、实时监控等领域有广泛的应用,Kafka它虽有以上这么多的应用场景和优点,但也具备其缺陷,比如在延时消息场景下,Kafka就不具备这种能力,因此希望能在保存Kafka特有能力的情况下给Kafka扩充一个具有能处理延时消息场景的能力。
2、开发环境
![](https://pic2.zhimg.com/80/v2-ccb453f95d95282ffe0a84840d122ba1_720w.jpg)
3、云服务介绍
分布式消息服务Kafka版: 华为云分布式消息服务Kafka版是一款基于开源社区版Kafka提供的消息队列服务,向用户提供计算、存储和带宽资源独占式的Kafka专享实例。使用华为云分布式消息服务Kafka版,资源按需申请,按需配置Topic的分区与副本数量,即买即用,您将有更多精力专注于业务快速开发,不用考虑部署和运维。
4、方案设计
i、方案简述
此方案实现,需要借助两个Topic来进行实现,一个Topic用于及时接收生产者们所产生的消息,另一个Topic则用于消费者拉取消息进行消费。另外在这两个Topic之间加上一个队列用于做延时的逻辑判断,如果消息满足了延时的条件,则将队列中的消息生产至我们的消费者需要拉取的Topic中。
ii、方案架构图
Kafka消息延时方案架构图
![](https://pic4.zhimg.com/80/v2-df37df6823e3ffb9d0e22950c98197e3_720w.jpg)
Kafka消息延时实现思路
- 生产者将生产消息存入topic_delay主题中进行存储。
- 将topic_delay主题中的所有消息拉取至ConcurrentLinkedQueue队列中。
- 取值判断是否满足延时要求。
a. 如果满足延时要求,则将消息生产至topic_out主题中,并将queue队列中的值移除。
b. 如果不满足延时要求,则等待自定义时间后重试判断。 - 消费者最终从topic_out主题中拉取消息进行消费。
iii、方案时序图
Kafka消息延时方案时序图
![](https://pic2.zhimg.com/80/v2-06177ccd2b3a2a02f065a8744a46a7e1_720w.jpg)
5、代码参数指南
本项目中起到延时作用的类Delay.java其余类为官方提供用于测试生产和消费消息,如需使用官方测试的使用的生产消费代码相关配置介绍可以参考https://support.huaweicloud.com/devg-kafka/how-to-connect-kafka.html 。 如需使用自己配置的生产者消费者,只配置Delay.java中的参数即可。
Delay.java参数详情
- delay:自定义延时时间,单位ms。
- topic_delay变量:用于临时存储消息的topic名称。
- topic_out变量:用于消费者拉取消息消费的topic名称。
- 关于消费者和生产者配置可按需配置,可参考Kafka官方文档:https://kafka.apache.org/documentation/#producerconfigs
6、代码实现
实现代码可参考Kafka消息延时
package com.dms.delay; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.time.Duration; import java.util.Arrays; import java.util.Date; import java.util.Properties; import java.util.concurrent.ConcurrentLinkedQueue; /** * Hello world! * */ public class Delay { //缓存队列 public static ConcurrentLinkedQueue<ConsumerRecord<String, String>> link = new ConcurrentLinkedQueue(); //延迟时间(20秒),可根据需要设置延迟大小 public static long delay = 20000L; /** *入口 * @param args */ public static void main( String[] args ) { //延时主题(用于控制延时缓冲) String topic_delay = "topic_delay"; //输出主题(直接供消费者消费) String topic_out = "topic_out"; /* 消费线程 */ new Thread(new Runnable() { @Override public void run() { //消费者配置。请根据需要自行设置Kafka配置 Properties props = new Properties(); props.setProperty("bootstrap.servers", "192.168.0.59:9092,192.168.0.185:9092,192.168.0.4:9092"); props.setProperty("group.id", "test"); props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //创建消费者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //指定消费主题 consumer.subscribe(Arrays.asList(topic_delay)); while (true) { //轮询消费 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10)); //遍历当前轮询批次拉取到的消息 for (ConsumerRecord<String, String> record : records){ System.out.println(record); //将消息添加到缓存队列 link.add(record); } } } }).start(); /* 生产线程 */ new Thread(new Runnable() { @Override public void run() { //生产者配置(请根据需求自行配置) Properties props = new Properties(); props.put("bootstrap.servers", "192.168.0.59:9092,192.168.0.185:9092,192.168.0.4:9092"); props.put("linger.ms", 1); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //创建生产者 Producer<String, String> producer = new KafkaProducer<>(props); //持续从缓存队列中获取消息 while(true){ //如果缓存队列为空则放缓取值速度 if(link.isEmpty()){ try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } continue; } //获取缓存队列栈顶消息 ConsumerRecord<String, String> record = link.peek(); //获取该消息时间戳 long timestamp = record.timestamp(); Date now = new Date(); long nowTime = now.getTime(); if(timestamp+ Delay.delay <nowTime){ //获取消息值 String value = record.value(); //生产者发送消息到输出主题 producer.send(new ProducerRecord<String, String>(topic_out, "",value)); //从缓存队列中移除该消息 link.poll(); }else { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } }).start(); } }
7、结果反馈
![](https://pic1.zhimg.com/80/v2-a747cd8f6030d51f6cd23c559338c9a0_720w.jpg)
相关文章
- 使用 ASP.NET Core MVC 创建 Web API(五)
- 使用 ASP.NET Core MVC 创建 Web API(四)
- 使用 ASP.NET Core MVC 创建 Web API(三)
- 使用 ASP.NET Core MVC 创建 Web API(二)
- 使用 ASP.NET Core MVC 创建 Web API(一)
- 学习ASP.NET Core Razor 编程系列十九——分页
- 学习ASP.NET Core Razor 编程系列十八——并发解决方案
- 学习ASP.NET Core Razor 编程系列十七——分组
- 学习ASP.NET Core Razor 编程系列十六——排序
- 学习ASP.NET Core Razor 编程系列十五——文件上传功能(三)
- 学习ASP.NET Core Razor 编程系列十四——文件上传功能(二)
- 学习ASP.NET Core Razor 编程系列十三——文件上传功能(一)
- 学习ASP.NET Core Razor 编程系列十二——在页面中增加校验
- 学习ASP.NET Core Razor 编程系列十一——把新字段更新到数据库
- 学习ASP.NET Core Razor 编程系列十——添加新字段
- 学习ASP.NET Core Razor 编程系列九——增加查询功能
- 学习ASP.NET Core Razor 编程系列八——并发处理
- 学习ASP.NET Core Razor 编程系列七——修改列表页面
- 学习ASP.NET Core Razor 编程系列六——数据库初始化
- 学习ASP.NET Core Razor 编程系列五——Asp.Net Core Razor新建模板页面