Commit cannot be completed since the group has already rebalanced and assign
The and Cannot be has group already commit
2023-09-11 14:14:47 时间
报错信息
Commit cannot be completed since the group has already rebalanced and assigned the partitions
如何理解
这里是说提交commit失败, 因为这个组已经重新分配了
产生原因
正常情况下, kafka会有一个配置用于设置一条消息的过期时间, 在规定时间内, 如果消费者提交了消费完成的信息, 那么就可以正常的分配下一条记录给消费者, 并且将当前记录的状态记为"已消费"状态, 对消息队列做一个标识, 避免重复消费
如何解决
kafka中配置的规定返回消息时间, 默认是300s, 也就是5分钟, 但是有一些业务逻辑处理起来比较复杂, 数据量又比较庞大, 那么5分钟是肯定处理不完的, 比如导入一个5G的文件, 然后逐条插入数据库, 这就需要消耗很长时间, 所以需要设置一下kafka的最大间隔时间
在application-dev.yml文件中配置如下
也就是配置
spring:
kafka:
consumer:
properties:
max.poll.interval.ms: 86400000
86400000是一天的毫秒数, 我这个业务需求有一天一夜足矣
至此, 问题完美修复!
其它参考方案
- 调大max.poll.interval.ms(两次poll方法最大时间间隔),默认时间为300000ms
- 调小max.poll.records(一次最多处理的记录数量),默认500
- 启动多个线程并行处理数据,但要注意处理完一批消息后才能提交offset,然后进行下次的poll(会用到CountDownLatch)
修改配置参数,调大间隔,调小一次处理的最大任务数量
props.put("max.poll.records", 8);
props.put("max.poll.interval.ms", "30000");
props.put("session.timeout.ms", "30000");
使用多线程并行处理
@Scheduled(fixedRate = 5000)
public void processing()
{
//如果消息队列中没有消息,等待timeout毫秒后,调用poll()方法。
//如果队列中有消息,立即消费消息,每次消费的消息的多少
//可以通过max.poll.records配置
ConsumerRecords<String, String> records = consumer.poll(3000);
if (records.count() == 0)
{
return;
}
Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
CountDownLatch countDownLatch = new CountDownLatch(records.count());
ConsumerRecord array[] = new ConsumerRecord[records.count()];
int i;
for (i = 0; i < records.count(); ++i)
{
array[i] = iterator.next();
}
for (i = 0; i < records.count(); ++i){
final int id = i;
if (id < records.count() - 1)
{
new Thread(()-> {
disposeOneRecord(array[id],false);
countDownLatch.countDown();
}).start();
}
else
{
new Thread(()-> {
disposeOneRecord(array[id],true);
countDownLatch.countDown();
}).start();
}
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
consumer.commitAsync();
logger.info(String.format("Successfully processing %d records", records.count()));
}
private void disposeOneRecord(ConsumerRecord<String, String> record, boolean saveInRedis)
{
String[] split;
DCSPoint point;
String rowKey, qualifier, value;
List<Put> putList = new ArrayList<>();
Map<String,Object> tagAndValue = JSONObject.parseObject(record.value()).getInnerMap();
for (String tag : tagAndValue.keySet()) {
split = tag.split("_");
if (split.length != 2)
{
continue;
}
try {
point = DCSPoint.valueOf(split[1].toUpperCase());
}catch (IllegalArgumentException e){
continue;
}
if (point.getSection() == Section.UNKNOWN || point.getDataType() != DataType.REAL)
{
continue;
}
value = tagAndValue.get(tag).toString();
if (saveInRedis)
{
RedisConfig.masterRedis.set(tag, value);
}
rowKey = split[0] + "_" + record.key();
qualifier = split[1];
putList.add(HBaseDaoUtil.cellPut(rowKey, HBaseConfig.FAMILY,qualifier,value));
}
hBaseDao.adds(HBaseConfig.TABLE_NAME, putList);
}
相关文章
- Codeforces div.2 B. The Child and Set
- The three top-paying tech roles in 2022 and the skills you need to land them
- What do 'lazy' and 'greedy' mean in the context of regular expressions?
- the user account is not authorized for remote login
- What are the Web.Debug.config and Web.Release.Config files for?
- What is the difference between CORS and CSPs?
- What's the difference between dependencies, devDependencies and peerDependencies in npm package.json file?
- X-Pack for the Elastic Stack [6.2] » Securing the Elastic Stack »Setting Up User Authentication
- BZOJ1991 : Pku2422 The Wolves and the Sheep
- GCJ1C09C - Bribe the Prisoners
- Vue - DevTools Open DevTools and look for the Vue panel(控制台没有 Vue 选项卡)
- Oracle Database Hang While Loading 3rd party SBT Library And After This Nobody Can Access The Database (windows login 登陆hang )
- Linux 图形界面的显示原理是什么?---the graphical server and the window manager
- swift的static和class修饰符---What is the difference between static func and class func in Swift?
- The connection to adb is down, and a severe error has occured完整解决办法
- Dynamic CRM 2013学习笔记(三十)Linq使用报错 A proxy type with the name account has been defined by another assembly
- How to deploy a Delphi OSX project from the command line
- you need to upgrade the working copy first
- Basic Data Structures and Algorithms in the Linux Kernel--reference
- BEA-150021 - The admin server failed to authenticate the identity of the user username starting the managed server.
- [Android Studio]布局文件错误“No orientation specified, and the default is horizontal.This is a common ...“
- 《PersFormer:3D Lane Detection via Perspective Transformer and the OpenLane Benchmark》论文笔记
- leetcode 287. Find the Duplicate Number 寻找重复数 (中等)