kafka-生产者发送流程
生产者整体架构:
image.png
发送之前会经历 拦截器, 序列化器, 分区器.
发送过程: 由两个线程完成. 主线程和sender线程.
主线程: 负责将消息发送到消息累加器(RecordAccumulator) .
Sender线程: 负责将消息累加器(RecordAccumulator)中获取消息并发送到Broker.
RecordAccumulator: 主要用来缓存消息, Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能 RecordAccumulator 缓存的大小可以通过生产者客户端参数 buffer.memory 配置,默认值为 33554432B ,即 32M, 如果生产者发送消息的速度超过发送到服务器的速度 ,则会导致生产者空间不足,这个时候 KafkaProducer send() 方法调用要么 被阻塞,要么抛出异常,这个取决于参数 max .block.ms 的配置,此参数的默认值为60000, 即60秒.
主线程中发送过来的消息都会被加到 RecordAccumulator 的某个双端队列( Deque )中, RecordAccumulator 的内部为每个分区都维护了 个双端队列,队列中的内容就是 Producer Batch ,即 Deque ProducerBatch。消息写入缓存时,追加到双端队列的尾部:Sender读取消息时, 从队列头部读取.
在 RecordAccumulator 的内部还有一个 BufferPool, 它主要用来实现 ByteBuffer 的复用,以实现缓存的高效利用 。不 BufferPool 只针对特定大ByteBuffer 进行管理,而其他大小的 ByteBuffer 不会缓存进 BufferPool 中,这个特定的大小 batch.size 参数来指定,默认值为 16384B ,即 16KB 我们可以适当地调大 batch.size 参数 以便多缓存一些消息。
总结:kafka是微批发送消息的,不是实时发送。每个批次的大小为batch.size;
rocketmq是实时发送.
重要参数
参数名 | 说明 |
---|---|
max.request.size | 客户端能发送消息的最大值, 默认1 M |
retries | 重试次数 |
retry.backoff.ms | 两次重试之间的间隔 |
compression.type | 消息压缩, 默认为none, 压缩后减少IO, 但是会加大时延. |
liner.ms | 生产者客户端会在 ProducerBatch 填满或等待时间超过 linger.ms 值时发送出去。 |
receive.buffer.bytes | socket接收消息的缓冲区, 默认32Kb, producer与broker处于不同的机房,适当调高该值 |
send.buffer.bytes | 发送消息的socket缓冲区, 默认128KB. |
相关文章
- MyBatis逆向工程 Generator
- Centos7 安装 Redis 6.0.8 遇坑记(Redis 编译安装)
- MyBatis使用PageHelper实现分页查询
- MySQL表结构导出Excel、导出Word
- MySql通过父id递归向下查询子节点
- Elasticsearch笔记(集群插件、kibana、什么是倒排索引)
- SpringData集成Elasticsearch
- Mysql修改时区(时间差8小时)
- R绘图 | 表达矩阵画箱线图
- R包|数据I/O界的瑞士军刀rio
- SQL 多表联合查询的几种方式
- MySql字符串拆分实现split功能(字段分割转列、转行)
- 小样本利器4. 正则化+数据增强 Mixup Family代码实现
- Bert不完全手册9. 长文本建模 BigBird & Longformer & Reformer & Performer
- 小样本利器2.文本对抗+半监督 FGSM & VAT & FGM代码实现
- 中文NER的那些事儿4. 数据增强在NER的尝试
- 无所不能的Embedding6 - 跨入Transformer时代~模型详解&代码实现
- AB实验的高端玩法系列4- 实验渗透低?用户未被触达?CACE/LATE
- 因果推理的春天系列序 - 数据挖掘中的Confounding, Collidar, Mediation Bias
- AB实验的高端玩法系列3 - AB组不随机?观测试验?Propensity Score