kafka事务:入门篇
之前搜索网上kafka事务相关文章,要么不清不楚,要么过于深奥。最近难得一闲,啃了一下kip-98,终于如愿能系统地总结一下kafka事务的原理与实现。
kafka事务
kafka官方设计文档 kip-98。大家可以先阅读我这篇文章,再去啃kip原文。
Kafka 的事务可以看作Kafka 中最难的知识点之一!
一、基础概念
一般而言,消息中间件的消息传输保障有3个层级:
- at most once:至多一次。消息可能会丢失,但绝对不会重复传输
- at least once:最少一次。消息绝不会丢失,但可能会重复传输
- exactly once:恰好一次。每条消息肯定会被传输一次且仅传输一次
Kafka 的消息传输保障机制非常直观。当生产者向 Kafka 发送消息时,一旦消息被成功提交到日志文件,由于多副本机制的存在,这条消息就不会丢失。
Kafka从0.11.0.0版本开始引入了幂等和事务这两个特性,以此来实现EOS(exactly oncesemantics,精确一次处理语义)。
二、幂等
所谓的幂等,简单地说就是对接口的多次调用所产生的结果和调用一次是一致的。生产者在进行重试的时候有可能会重复写入消息,而使用Kafka的幂等性功能之后就可以避免这种情况。
为了实现生产者的幂等性,Kafka为此引入了
- producer id
- 序列号(sequencenumber)这两个概念(在早期的kafka版本中,我们在业务层也实现了类似的header信息来实现业务层幂等。当然这部分kafka原生实现的源码也值得一看,对比看看有何优劣!)
客户端需要开启enable.idempotence
为true。
为什么需要幂等?
因为事务开启后必然会有很多的失败重试
三、事务
幂等性并不能跨多个分区运作,而事务可以弥补这个缺陷。事务可以保证对多个分区写入操作的原子性。操作的原子性是指多个操作要么全部成功,要么全部失败,不存在部分成功、部分失败的可能。
1、解决什么问题?
- 多分区原子写入
consume-transform-produce
原子操作- 有状态应用多个会话之间的连续性如何理解: 如应用重启后可以接着幂等生产和事务恢复
2、producer和consumer
2.1 producer
- producer和transactionalId建议一一对应,因为一个TransactionalId只能有一个活跃的producer
- 应用实例die的情况下,新启动的实例会接管事务,继续未完成的事务(aborted或committed)
2.2 consumer
consumer 事务语义相对弱一些
- topic是compacted,老数据可能被新版本的数据覆盖
- 事务消息跨多个日志segment,可能其中部分segment过期被删除
- consumer消费可以指定offset消费。10条数据原子一个事务原子写入,但你指定从第9条数据开始消费,则1~8条数据不会被消费到。
- consumer不一定会同时消费所有一个事务生产的topic和分区
3、具体实现
3.1 关键概念
为了支持事务,kafka新增加了如下模块:
- 新协调器:Transaction Coordinator,分配PID、管理事务
- 新内部topic:
__transaction_state
持久化存储事务的日志 - 新消息类型:
ControlBatch
兼容老的ProduceBatch
,写到用户的topic,告诉用于之前fetch的数据是否已经commit。详见jira。 之前线上有个案例case,旧版本客户端消费该数据导致故障。 - 新id:transactionalId,唯一识别producer。旧producer挂了的情况下,同id的新producer继续未完成的事务。
- 新的epoch:producer epoch。通过版本号确定合法的事务producer(一个TransactionalId只能有一个活跃的producer)。
- 新的request类型、新的request版本号
- 新的消息格式
3.2 案例与DataFlow
注意4.3和4.4是Consume-Transform-Produce场景独有的步骤,涉及到消费和生产的联动,这里我不多介绍。
步骤:
- 1、查找TransactionCoordinator
- 2、获取并保存PID
- 3、Producer开启事务
- 4、Consume-Transform-Produce(以该场景举例)
- 4.1 AddPartitionsToTxnRequest:producer告诉事务协调器哪些tp会被写入,事务协调器会将这些信息存储,并启动一个事务的定时器。
- 4.2 ProduceRequest。 与常规的Produce相比多了
PID\epoch\sequence number
字段 - 4.3 AddOffsetCommitsToTxnRequest:批量消费和生产时使用到。 sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) - 4.4 TxnOffsetCommitRequest 与4.3配合 - 5、提交或取消一个事务
- 5.1 EndTxnRequest:
endTransaction
\abortTransaction
abortTransaction
后 下游消费者读取后会丢弃掉对应的数据 收到这个请求后, 1)协调器将往事务日志里面写入PREPARE_COMMIT
和PREPARE_ABORT
消息。 2)发送COMMIT
或ABORT
marker到用户的数据里面(即5.2)。 3)事务日志写入COMMITTED
和ABORTED
消息。 - 5.2 WriteTxnMarkerRequest事务协调器发给各个tp的leader的broker,broker收到后会往对应的topic里面插入ControlBatch
(即marker)。告知该PID之前发送的消息的状态是怎么样的。注意:消费者需要缓存该PID的消息直到收到ControlBatch
信息,才直到应该如何处理这些消息(业务处理or丢弃)。 - 5.3 发送最终提交或取消请求
ControlBatch
(即marker)写到tp里后,事务协调器会往事务日志里面写入COMMITTED
和ABORTED
消息,事务完成。
四、事务关键参数
1、broker端
- transactional.id.timeout.ms:604800000 (7 days) 事务id在没有更新情况下存活的时间。
- max.transaction.timeout.ms:900000 (15 min) 最大事务的超时时间。
2、producer端
- enable.idempotence: 幂等默认会要求: acks=all retries > 1 max.inflight.requests.per.connection=1.
- transaction.timeout.ms:事务超时时间,60000
- transactional.id:事务id
3、consumer端
- isolation.level:默认read_uncommitted
- read_uncommitted: 同时消费到committed和uncommitted的消息
- read_committed:只能消费到非事务消息以及
五、性能测试
不难分析,kafka的事务是相对轻量的,对性能影响相对可控。
kafka-producer-perf-test.sh
的 --transactional-id
用于测试并发事务的性能。
潜在性能下降点:单producer写入性能,特别是开启幂等后的性能下降。
相关文章
- Kafka在美团数据平台的实践
- 两个优秀的分布式消息流平台:Kafka与Pulsar
- Kafka 删除topic_kafka自动创建topic
- kafka批量删除topic_kafka查看topic数据量
- Kafka如何删除topic中的部分数据_kafka修改topic副本数
- Kafka策略模式
- Kafka Streams之WordCount
- 【Kafka】Kafka-Server-start.sh 启动脚本分析(Ver 2.7.2)
- kafka单条消息过大导致线上OOM,运维连夜跑路了!
- kafka 事务介绍
- 面试系列-kafka事务控制
- Kafka、RabbitMQ、Pulsar、RocketMQ基本原理和选型
- SpringBoot-Kafka(生产者事务、手动提交offset、定时消费、消息转发、过滤消息内容、自定义分区器、提高吞吐量)
- Kafka和Flink双剑合璧,Confluent收购Immerok引起业内广泛讨论
- 1.5万字长文:从 C# 入门 Kafka
- kafka中生产者是如何把消息投递到哪个分区的?消费者又是怎么选择分区的?
- Kafka入门篇学习笔记整理
- 使用OGG传统模式将Oracle同步到kafka(全量+增量)
- Kafka学习之(一)了解一下Kafka及关键概念和处理机制详解程序员
- 从Linux启动Kafka:一步一步指南(linux启动kafka)
- Oracle 数据流轻松集成 Kafka 服务:提高数据传输效率(oracle到kafka)
- Linux下搭建Kafka Stream架构的实践(linux kafka)
- 使用Kafka连接Oracle数据库(kafka到oracle)
- Oracle与Kafka的联合应用突破传统数据处理极限(oracle与kafka)
- 比较Redis vs Kafka(redis还是kafka)