kafka 结合springboot实战--第二节
SpringBootKafka 实战 -- 结合 第二节
2023-06-13 09:16:10 时间
生产者事务
Spring-kafka自动注册的KafkaTemplate实例是不具有事务消息发送能力的。需要在 application.properties
配置属性:
spring.kafka.producer.acks=-1
spring.kafka.producer.transaction-id-prefix=kafka_tx
当激活事务时 kafkaTemplate 就只能发送事务消息了,发送非事务的消息会报异常。发送事务消息的方法有两种,一种是通过 kafkaTemplate.executeInTransaction 实现,一种是通过 spring的注解 @Transactional
来实现,代码示例:
@Scheduled(cron = "*/15 * * * * ?")
public void sendTrans() {
kafkaTemplate.executeInTransaction(t ->{
t.send("xxxxx","test1");
t.send("xxxxx","test2");
return true;
}
);
}
@Scheduled(cron = "*/15 * * * * ?")
@Transactional(rollbackFor = Exception.class)
public void sendFoo() {
kafkaTemplate.send("topic_input", "test");
}
消费者Ack
消费者消息消息可以自动确认,也可以通过手动确认,开启手动首先需要关闭自动提交,然后设置下consumer的消费模式:
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual
配置完成之后我们需要对消费者监听器做一点小改动:
@KafkaListener( topics = "topic_input")
public void listen(ConsumerRecord<?, String> record, Acknowledgment ack) {
System.out.println(record.value());
ack.acknowledge();
}
如你所见,我们可以通过 Acknowledgment.acknowledge()
来手动的确认消息的消费,不确认就不算消费成功,监听器会再次收到这个消息。对于某些业务场景这个功能还是很必要的,比如消费消息的同时导致写库异常,数据库回滚,那么消息也不应该被ack。
消费者监听器生命周期控制
消费者监听器有三个生命周期:启动、停止、继续;如果我们想控制消费者监听器生命周期,需要修改@KafkaListener
的 autoStartup
属性为false, 并给监听器 id 属性赋值 然后通过KafkaListenerEndpointRegistry
控制id 对应的监听器的启动停止继续:
import org.springframework.stereotype.Service;
@Service
public class test {
@Autowired
KafkaListenerEndpointRegistry listenerRegistry;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Scheduled(cron = "*/15 * * * * ?")
@Transactional
public void testListener(){
if (i==20){
listenerRegistry.getListenerContainer("listener1").start();
}
System.out.println("生产者生产消息"+i++);
kafkaTemplate.send("test","xxx"+i);
}
@KafkaListener( id = "listener1",topics = "test",autoStartup ="false" )
public void testStart(ConsumerRecord<?, String> record){
System.out.println(record.value());
}
}
通过观察窗口输出就能看到,生产者生产了20条数据后消费者监听器才开始启动消费。
相关文章
- springboot启动流程详解_网页解析的详细过程
- springboot事物oracle,SpringBoot 事务管理
- springboot jpa 中使用逻辑删除[通俗易懂]
- springboot 日志跟踪(zipkin)
- 如何在SpringBoot应用中实现跨域访问资源和消息通信?
- SpringBoot整合Dubbo学习总结【概述,快速入门,高级特性,案例所敲代码】
- MQ备份交换机springboot
- SpringBoot事件监听机制及观察者/发布订阅模式详解
- SpringBoot 集成Redis 过程
- SpringBoot之thymeleaf(Q)详解编程语言
- Linux环境下部署Kafka服务器实践(linux kafka)
- 比较Redis vs Kafka(redis还是kafka)