zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

SpringBoot3集成Kafka优雅实现信息消费发送

2023-04-18 16:20:05 时间

前言

       首先,你的JDK是否已经是8+了呢?
       其次,你是否已经用上SpringBoot3了呢?
       最后,这次分享的是SpringBoot3下的kafka发信息与消费信息。


一、场景说明

       这次的场景是springboot3+多数据源的数据交换中心(数仓)需要消费Kafka里的上游推送信息,这里做数据解析处理入库TDengine。

二、使用步骤

1.引入库

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

       简简单单,就这一个依赖就够了。

2.配置

spring:
  #kafka配置
  kafka:
    #bootstrap-servers: 192.168.200.72:9092,192.168.200.73:9092
    #bootstrap-servers: 192.168.200.83:9092,192.168.200.84:9092
    bootstrap-servers: localhost:9092
    client-id: dc-device-flow-analyze
    consumer:
      group-id: dc-device-flow-analyze-consumer-group
      max-poll-records: 10
      #Kafka中没有初始偏移或如果当前偏移在服务器上不再存在时,默认区最新 ,有三个选项 【latest, earliest, none】
      auto-offset-reset: earliest
      #是否开启自动提交
      enable-auto-commit: false
      #自动提交的时间间隔
      auto-commit-interval: 1000
    producer:
      acks: 1
      batch-size: 4096
      buffer-memory: 40960000
      client-id: dc-device-flow-analyze-producer
      compression-type: zstd
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      retries: 3
      properties:
        spring.json.add.type.headers: false
        max.request.size: 126951500
    listener:
      ack-mode: MANUAL_IMMEDIATE
      concurrency: 1  #推荐设置为topic的分区数
      type: BATCH #开启批量监听

#消费topic配置
xiaotian:
  analyze:
    device:
      flow:
        topic:
          consumer: device-flow

3.消费


import com.xiaotian.datagenius.service.DataTransService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * 消费者listener
 *
 * @author zhengwen
 **/
@Slf4j
@Component
public class KafkaListenConsumer {

    @Autowired
    private DataTransService dataTransService;

    /**
     * 设备流水listenner
     *
     * @param records 消费信息
     * @param ack     Ack机制
     */
    @KafkaListener(topics = "${xiaotian.analyze.device.flow.topic.consumer}")
    public void deviceFlowListen(List<ConsumerRecord> records, Acknowledgment ack) {
        log.debug("=====设备流水deviceFlowListen消费者接收信息====");
        try {

            for (ConsumerRecord record : records) {
                log.debug("---开启线程解析设备流水数据:{}", record.toString());
                //具体service里取做逻辑
                dataTransService.deviceFlowTransSave(record);
            }

        } catch (Exception e) {
            log.error("----设备流水数据消费者解析数据异常:{}", e.getMessage(), e);
        } finally {
            //手动提交偏移量
            ack.acknowledge();
        }
    }


}

       消费与SpringBoot2的写法一样,没有任何改变。

4.发布信息


import cn.hutool.json.JSON;
import cn.hutool.json.JSONUtil;
import com.easylinkin.datagenius.core.Result;
import com.easylinkin.datagenius.core.ResultGenerator;
import com.easylinkin.datagenius.vo.KafkaMessageVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;

/**
 * kafka信息管理
 *
 * @author zhengwen
 **/
@Slf4j
@RestController
@RequestMapping("/kafka/push")
public class KafkaPushController {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * kafka的信息push发送
     *
     * @param kafkaMessageVo kafka信息对象
     * @return 推送结果
     */
    @PostMapping("/sendMsg")
    public Result sendMsg(@RequestBody KafkaMessageVo kafkaMessageVo) {

        String topic = kafkaMessageVo.getTopic();
        String msg = kafkaMessageVo.getMessage();
        log.debug(msg);
      
        JSON msgJson = JSONUtil.parseObj(msg);
        /* springboot2的写法
        ListenableFuture<SendResult<String, Object>> listenableFuture =  kafkaTemplate.send(topic, UUID.randomUUID().toString(), msgJson);

        //发送成功后回调
        SuccessCallback successCallback = new SuccessCallback() {
            @Override
            public void onSuccess(Object result) {
                log.debug("发送成功:{}", JSONUtil.toJsonStr(kafkaMessageVo));
            }
        };
        //发送失败回调
        FailureCallback failureCallback = new FailureCallback() {
            @Override
            public void onFailure(Throwable ex) {
                log.error("发送失败", JSONUtil.toJsonStr(kafkaMessageVo), ex);
            }
        };
        listenableFuture.addCallback(successCallback, failureCallback);
        */
        //SpringBoot3的写法
        CompletableFuture<SendResult<String, Object>> completableFuture = kafkaTemplate.send(topic, UUID.randomUUID().toString(), msgJson);

        //执行成功回调
        completableFuture.thenAccept(result -> {
            log.debug("发送成功:{}", JSONUtil.toJsonStr(kafkaMessageVo));
        });
        //执行失败回调
        completableFuture.exceptionally(e -> {
            log.error("发送失败", JSONUtil.toJsonStr(kafkaMessageVo), e);
            return null;
        });

        return ResultGenerator.genSuccessResult();
    }
}

       这个发送信息就与springBoot2的写法一致了。原ListenableFuture类已过时了,现在SpringBoot3、JDK8+用CompletableFuture监听信息发送结果。


总结

1、SpringBoot3真香
2、Kafka的集成已经非常成熟了,资料也多。
       我这里这个SpringBoot3集成Kafka发送信息目前觉得是独家,你能找到的应该都还是使用的ListenableFuture类。
       好了,就写到这里,希望能帮到大家,uping!!!