zl程序教程

您现在的位置是:首页 >  Java

当前栏目

kafka 结合springboot实战--第二节

2023-02-18 16:39:46 时间

生产者事务

Spring-kafka自动注册的KafkaTemplate实例是不具有事务消息发送能力的。需要在 application.properties 配置属性:

spring.kafka.producer.acks=-1
spring.kafka.producer.transaction-id-prefix=kafka_tx

当激活事务时 kafkaTemplate 就只能发送事务消息了,发送非事务的消息会报异常。发送事务消息的方法有两种,一种是通过 kafkaTemplate.executeInTransaction 实现,一种是通过 spring的注解 @Transactional 来实现,代码示例:

    @Scheduled(cron = "*/15 * * * * ?")
    public void sendTrans() {
      kafkaTemplate.executeInTransaction(t ->{
          t.send("xxxxx","test1");
          t.send("xxxxx","test2");
          return true;
      }
          );
    }

    @Scheduled(cron = "*/15 * * * * ?")
    @Transactional(rollbackFor = Exception.class)
    public void sendFoo() {
        kafkaTemplate.send("topic_input", "test");

    }

消费者Ack

消费者消息消息可以自动确认,也可以通过手动确认,开启手动首先需要关闭自动提交,然后设置下consumer的消费模式:

spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual

配置完成之后我们需要对消费者监听器做一点小改动:

    @KafkaListener( topics = "topic_input")
    public void listen(ConsumerRecord<?, String> record, Acknowledgment ack) {
        System.out.println(record.value());
        ack.acknowledge();
    }

如你所见,我们可以通过 Acknowledgment.acknowledge() 来手动的确认消息的消费,不确认就不算消费成功,监听器会再次收到这个消息。对于某些业务场景这个功能还是很必要的,比如消费消息的同时导致写库异常,数据库回滚,那么消息也不应该被ack。

消费者监听器生命周期控制

消费者监听器有三个生命周期:启动、停止、继续;如果我们想控制消费者监听器生命周期,需要修改@KafkaListenerautoStartup 属性为false, 并给监听器 id 属性赋值 然后通过KafkaListenerEndpointRegistry 控制id 对应的监听器的启动停止继续:

import org.springframework.stereotype.Service;
@Service
public class test {
    @Autowired
    KafkaListenerEndpointRegistry listenerRegistry;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Scheduled(cron = "*/15 * * * * ?")
    @Transactional
    public void testListener(){
        if (i==20){
            listenerRegistry.getListenerContainer("listener1").start();
        }
        System.out.println("生产者生产消息"+i++);
        kafkaTemplate.send("test","xxx"+i);
    }

     @KafkaListener( id = "listener1",topics = "test",autoStartup ="false" )
    public void testStart(ConsumerRecord<?, String> record){
        System.out.println(record.value());
    }


}

通过观察窗口输出就能看到,生产者生产了20条数据后消费者监听器才开始启动消费。