Kafka中生产消息时的三种分区分配策略
🔥《Kafka运维管控平台LogiKM》🔥 ✏️更强大的管控能力✏️ 🎾更高效的问题定位能力🎾 🌅更便捷的集群运维能力🌅 🎼更专业的资源治理🎼 🌞更友好的运维生态🌞
文章目录
KafkaProducer在发送消息的时候,需要指定发送到哪个分区, 那么这个分区策略都有哪些呢?
我们今天来看一下
使用分区策略的配置:
属性 | 描述 | 默认值 |
---|---|---|
partitioner.class | 消息的分区分配策略 | org.apache.kafka.clients.producer.internals.DefaultPartitioner |
1. DefaultPartitioner 默认分区策略
全路径类名:org.apache.kafka.clients.producer.internals.DefaultPartitioner
- 如果消息中指定了分区,则使用它
- 如果未指定分区但存在key,则根据序列化key使用murmur2哈希算法对分区数取模。
- 如果不存在分区或key,则会使用粘性分区策略,关于粘性分区请参阅 KIP-480。
粘性分区Sticky Partitioner
为什么会有粘性分区的概念?
首先,我们知道,Producer在发送消息的时候,会将消息放到一个ProducerBatch中, 这个Batch可能包含多条消息,然后再将Batch打包发送。关于这一块可以看看我之前的文章 图解Kafka Producer 消息缓存模型
这样做的好处就是能够提高吞吐量,减少发起请求的次数。
但是有一个问题就是, 因为消息的发送它必须要你的一个Batch满了或者linger.ms
时间到了,才会发送(当当然具体的条件会更多)。如果生产的消息比较少的话,迟迟难以让Batch塞满,那么就意味着更高的延迟。
在之前的消息发送中,就将消息轮询到各个分区的, 本来消息就少,你还给所有分区遍历的分配,那么每个ProducerBatch都很难满足条件。
那么假如我先让一个ProducerBatch塞满了之后,再给其他的分区分配是不是可以降低这个延迟呢?
详细的可以看看下面这张图、
这张图的前提是:
Topic1 有3分区, 此时给Topic1 发9条无key的消息, 这9条消息加起来都不超过batch.size
.
那么以前的分配方式和粘性分区的分配方式如下
可以看到,使用粘性分区之后,至少是先把一个Batch填满了发送然后再去填充另一个Batch。不至于向之前那样,虽然平均分配了,但是导致一个Batch都没有放满,不能立即发送。这不就增大了延迟了吗(只能通过linger.ms
时间到了才发送)
划重点:
-
当一个Batch发送之后,需要选择一个新的粘性分区的时候
①. 可用分区<1 ;那么选择分区的逻辑是在所有分区中随机选择。
②. 可用分区=1; 那么直接选择这个分区。
③. 可用分区>1 ; 那么在所有可用分区中随机选择。 -
当选择下一个粘性分区的时候,不是按照分区平均的原则来分配。而是随机原则(当然不能跟上一次的分区相同)
例如刚刚发送到的Batch是 1号分区,等Batch满了,发送之后,新的消息可能会发到2或者3, 如果选择的是2,等2的Batch满了之后,下一次选择的Batch仍旧可能是1,而不是说为了平均,选择3分区。
2.UniformStickyPartitioner 纯粹的粘性分区策略
全路径类名:org.apache.kafka.clients.producer.internals.UniformStickyPartitioner
他跟DefaultPartitioner 分区策略的唯一区别就是。
DefaultPartitionerd 如果有key的话,那么它是按照key来决定分区的,这个时候并不会使用粘性分区
UniformStickyPartitioner 是不管你有没有key, 统一都用粘性分区来分配。
3. RoundRobinPartitioner 分区策略
全路径类名:org.apache.kafka.clients.producer.internals.RoundRobinPartitioner
- 如果消息中指定了分区,则使用它
- 将消息平均的分配到每个分区中。
- 与key无关
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (!availablePartitions.isEmpty()) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
}
上面是具体代码。有个地方需要注意;
- 当可用分区是0的话,那么就是遍历的是所有分区中的。
- 当有可用分区的话,那么遍历的是所有可用分区的。
相关文章
- Kafka、RabbitMQ、RocketMQ消息中间件的对比—— 消息发送性能
- 使用kafka消息队列解决分布式事务(可靠消息最终一致性方案-本地消息服务)
- Kafka Consumer Lag Monitoring
- Kafka压测— 搞垮kafka的方法(转)
- Kafka实战-简单示例
- 大叔经验分享(137)kafka开启压缩
- Canal订阅binlog变更并结合kafka实现消息缓冲
- flume+kafka+hdfs详解
- Kafka 消息监控 - Kafka Eagle
- Kafka消息队列
- kafka架构
- kafka基本操作:创建topic、生产/消费消息(同一消费组均分消息;不同消费组订阅消息)
- kafka生产者、消费者消息操作命令
- kafka describe topic
- kafka如何实现高并发存储-如何找到一条需要消费的数据(阿里)
- kafka 基础知识梳理-kafka是一种高吞吐量的分布式发布订阅消息系统
- kafka详解一、Kafka简介
- 消息队列和Kafka
- y151.第八章 Servless和Knative从入门到精通 -- Kafka 与Eventing(十五)
- kafka消息队列的简单理解
- linux 下安装 php kafka 扩展
- kafka消息顺序与重复
- Kafka acks参数对消息持久化的影响
- Filebeat+Kafka+Logstash+ElasticSearch+Kibana搭建完整版
- Kafka核心概念
- 解开Kafka神秘的面纱(五):kafka优雅应用
- 解开Kafka神秘的面纱(一):kafka架构与应用场景
- 面试题系列:Kafka 夺命11问,你能扛到第几问?