SpringBoot3集成Kafka优雅实现信息消费发送
2023-04-18 16:20:05 时间
前言
首先,你的JDK是否已经是8+了呢?
其次,你是否已经用上SpringBoot3了呢?
最后,这次分享的是SpringBoot3下的kafka发信息与消费信息。
一、场景说明
这次的场景是springboot3+多数据源的数据交换中心(数仓)需要消费Kafka里的上游推送信息,这里做数据解析处理入库TDengine。
二、使用步骤
1.引入库
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
简简单单,就这一个依赖就够了。
2.配置
spring:
#kafka配置
kafka:
#bootstrap-servers: 192.168.200.72:9092,192.168.200.73:9092
#bootstrap-servers: 192.168.200.83:9092,192.168.200.84:9092
bootstrap-servers: localhost:9092
client-id: dc-device-flow-analyze
consumer:
group-id: dc-device-flow-analyze-consumer-group
max-poll-records: 10
#Kafka中没有初始偏移或如果当前偏移在服务器上不再存在时,默认区最新 ,有三个选项 【latest, earliest, none】
auto-offset-reset: earliest
#是否开启自动提交
enable-auto-commit: false
#自动提交的时间间隔
auto-commit-interval: 1000
producer:
acks: 1
batch-size: 4096
buffer-memory: 40960000
client-id: dc-device-flow-analyze-producer
compression-type: zstd
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
retries: 3
properties:
spring.json.add.type.headers: false
max.request.size: 126951500
listener:
ack-mode: MANUAL_IMMEDIATE
concurrency: 1 #推荐设置为topic的分区数
type: BATCH #开启批量监听
#消费topic配置
xiaotian:
analyze:
device:
flow:
topic:
consumer: device-flow
3.消费
import com.xiaotian.datagenius.service.DataTransService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* 消费者listener
*
* @author zhengwen
**/
@Slf4j
@Component
public class KafkaListenConsumer {
@Autowired
private DataTransService dataTransService;
/**
* 设备流水listenner
*
* @param records 消费信息
* @param ack Ack机制
*/
@KafkaListener(topics = "${xiaotian.analyze.device.flow.topic.consumer}")
public void deviceFlowListen(List<ConsumerRecord> records, Acknowledgment ack) {
log.debug("=====设备流水deviceFlowListen消费者接收信息====");
try {
for (ConsumerRecord record : records) {
log.debug("---开启线程解析设备流水数据:{}", record.toString());
//具体service里取做逻辑
dataTransService.deviceFlowTransSave(record);
}
} catch (Exception e) {
log.error("----设备流水数据消费者解析数据异常:{}", e.getMessage(), e);
} finally {
//手动提交偏移量
ack.acknowledge();
}
}
}
消费与SpringBoot2的写法一样,没有任何改变。
4.发布信息
import cn.hutool.json.JSON;
import cn.hutool.json.JSONUtil;
import com.easylinkin.datagenius.core.Result;
import com.easylinkin.datagenius.core.ResultGenerator;
import com.easylinkin.datagenius.vo.KafkaMessageVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
/**
* kafka信息管理
*
* @author zhengwen
**/
@Slf4j
@RestController
@RequestMapping("/kafka/push")
public class KafkaPushController {
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* kafka的信息push发送
*
* @param kafkaMessageVo kafka信息对象
* @return 推送结果
*/
@PostMapping("/sendMsg")
public Result sendMsg(@RequestBody KafkaMessageVo kafkaMessageVo) {
String topic = kafkaMessageVo.getTopic();
String msg = kafkaMessageVo.getMessage();
log.debug(msg);
JSON msgJson = JSONUtil.parseObj(msg);
/* springboot2的写法
ListenableFuture<SendResult<String, Object>> listenableFuture = kafkaTemplate.send(topic, UUID.randomUUID().toString(), msgJson);
//发送成功后回调
SuccessCallback successCallback = new SuccessCallback() {
@Override
public void onSuccess(Object result) {
log.debug("发送成功:{}", JSONUtil.toJsonStr(kafkaMessageVo));
}
};
//发送失败回调
FailureCallback failureCallback = new FailureCallback() {
@Override
public void onFailure(Throwable ex) {
log.error("发送失败", JSONUtil.toJsonStr(kafkaMessageVo), ex);
}
};
listenableFuture.addCallback(successCallback, failureCallback);
*/
//SpringBoot3的写法
CompletableFuture<SendResult<String, Object>> completableFuture = kafkaTemplate.send(topic, UUID.randomUUID().toString(), msgJson);
//执行成功回调
completableFuture.thenAccept(result -> {
log.debug("发送成功:{}", JSONUtil.toJsonStr(kafkaMessageVo));
});
//执行失败回调
completableFuture.exceptionally(e -> {
log.error("发送失败", JSONUtil.toJsonStr(kafkaMessageVo), e);
return null;
});
return ResultGenerator.genSuccessResult();
}
}
这个发送信息就与springBoot2的写法一致了。原ListenableFuture类已过时了,现在SpringBoot3、JDK8+用CompletableFuture监听信息发送结果。
总结
1、SpringBoot3真香
2、Kafka的集成已经非常成熟了,资料也多。
我这里这个SpringBoot3集成Kafka发送信息目前觉得是独家,你能找到的应该都还是使用的ListenableFuture类。
好了,就写到这里,希望能帮到大家,uping!!!
相关文章
- 直接在代码里面对list集合进行分页
- .NET Framework 4.5新特性详解
- 大数据的简要介绍
- 大数据的由来
- 高斯混合模型的自然梯度变量推理
- timing-wheel 仿Kafka实现的时间轮算法
- 使用Navicat软件连接自建数据库(Linux系统)
- 那一天,我被Redis主从架构支配的恐惧
- Redis 深入了解键的过期时间
- C#使用委托调用实现用户端等待闪屏
- 基于流计算 Oceanus 和 Elasticsearch Service 构建百亿级实时监控系统
- GRAND | 转录调控网络预测数据库
- JFreeChart API中文文档
- 临床相关突变查询数据库
- TIGER | 人类胰岛基因变化查询数据库
- 视频边缘计算网关EasyNVR在视频整体监控解决方案中的应用分析
- Apache Arrow - 大数据在数据湖后的下一个风向标
- 常见的电商数据指标体系
- AKShare-艺人数据-艺人流量价值
- MySQL中多表联合查询与子查询的这些区别,你可能不知道!