zl程序教程

您现在的位置是:首页 >  后端

当前栏目

java发送kafka事务消息

2023-09-27 14:28:03 时间

前言

事务对java开发的同学来说并不陌生,我们使用事务的目的在于避免产生重复数据或者说利用数据存储中间件的事务特性确保数据的精准性,比如大家熟悉的mysql,我们在程序开始时,只需要在程序中添加上事务注解即可

kafka客户端事务,直接使用客户端提供的相关的API即可,和jdbc事务的使用很类似,主要包含下面5个API

// 1 初始化事务
void initTransactions();


// 2 开启事务
void beginTransaction() throws ProducerFencedException;


// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
 String consumerGroupId) throws ProducerFencedException;


// 4 提交事务
void commitTransaction() throws ProducerFencedException;


// 5 放弃事务(类似于回滚事务的操作)

void abortTransaction() throws ProducerFencedException;

下面结合实际的代码以及效果演示进行说明

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.p