【kafka异常】使用Spring-kafka遇到的坑
查看一下压缩策略
bin/kafka-topics.sh --describe --zookeeper xxxx:2181 --topic SHI_TOPIC1
Topic:SHI_TOPIC1 PartitionCount:1 ReplicationFactor:1 Configs:cleanup.policy=compact
Topic: SHI_TOPIC1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Configs:cleanup.policy=compact
:
然后再检查一下自己发送消息的时候是不是没有传 key
[参考链接](()
问题堆栈信息
org.springframework.kafka.listener.ListenerExecutionFailedException: invokeHandler Failed;
nested exception is java.lang.IllegalStateException: No Acknowledgment available as an argument,
the listener container must have a MANUAL AckMode to populate the Acknowledgment.;
nested exception is java.lang.IllegalStateException:
No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.
问题原因
解决方案
问题堆栈信息
Failed to start bean ‘org.springframework.kafka.config.internalKafkaListenerEndpointRegistry’; nested exception is java.lang.IllegalStateException: Consumer cannot be configured for auto commit for ackMode MANUAL_IMMEDIATE
问题原因
不能再配置中既配置
kafka.consumer.enable-auto-commit=true
自动提交; 然后又在监听器中使用手动提交
例如:
kafka.consumer.enable-auto-commit=true
@Autowired
private ConsumerFactory consumerFactory;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaManualAckListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
//设置提交偏移量的方式 当Acknowledgment.acknowledge()侦听器调用该方法时,立即提交偏移量
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
/**
-
手动ack 提交记录
-
@param data
-
@param ack
-
@throws InterruptedException
*/
@KafkaListener(id = “consumer-id2”,topics = “SHI_TOPIC1”,concurrency = “1”,
clientIdPrefix = “myClientId2”,containerFactory = “kafkaManualAckListenerContainerFactory”)
public void consumer2(String data, Acknowledgment ack) {
log.info(“consumer-id2-手动ack,提交记录,data:{}”,data);
ack.acknowledge();
}
解决方法:
将自动提交关掉,或者去掉手动提交;
如果你想他们都同时存在,某些情况自动提交;某些情况手动提交; 那你创建 一个新的
consumerFactory
将它的是否自动提交设置为false;比如
@Configuration
@EnableKafka
public class KafkaConfig {
@Autowired
private KafkaProperties properties;
/**
-
创建一个新的消费者工厂
-
创建多个工厂的时候 SpringBoot就不会自动帮忙创建工厂了;所以默认的还是自己创建一下
-
@return
*/
@Bean
public ConsumerFactory<Object, Object> kafkaConsumerFactory() {
Map<String, Object> map = properties.buildConsumerProperties();
DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>( map);
return factory;
}
/**
-
创建一个新的消费者工厂
-
但是修改为不自动提交
-
@return
*/
@Bean
public ConsumerFactory<Object, Object> kafkaManualConsumerFactory() {
Map<String, Object> map = properties.buildConsumerProperties();
map.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>( map);
return factory;
}
/**
-
手动提交的监听器工厂 (使用的消费组工厂必须 kafka.consumer.enable-auto-commit = false)
-
@return
*/
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaManualAckListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaManualConsumerFactory());
//设置 《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》无偿开源 威信搜索公众号【编程进阶路】 提交偏移量的方式 当Acknowledgment.acknowledge()侦听器调用该方法时,立即提交偏移量
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
相关文章
- 直接在代码里面对list集合进行分页
- .NET Framework 4.5新特性详解
- 大数据的简要介绍
- 大数据的由来
- 高斯混合模型的自然梯度变量推理
- timing-wheel 仿Kafka实现的时间轮算法
- 使用Navicat软件连接自建数据库(Linux系统)
- 那一天,我被Redis主从架构支配的恐惧
- Redis 深入了解键的过期时间
- C#使用委托调用实现用户端等待闪屏
- 基于流计算 Oceanus 和 Elasticsearch Service 构建百亿级实时监控系统
- GRAND | 转录调控网络预测数据库
- JFreeChart API中文文档
- 临床相关突变查询数据库
- TIGER | 人类胰岛基因变化查询数据库
- 视频边缘计算网关EasyNVR在视频整体监控解决方案中的应用分析
- Apache Arrow - 大数据在数据湖后的下一个风向标
- 常见的电商数据指标体系
- AKShare-艺人数据-艺人流量价值
- MySQL中多表联合查询与子查询的这些区别,你可能不知道!