zl程序教程

您现在的位置是:首页 >  云平台

当前栏目

RocketMQ学习(八):事务消息

2023-09-14 09:02:12 时间

源代码版本是3.2.6,还是直接跑源代码。rocketmq事务消息是发生在Producer和Broker之间,是二阶段提交。

二阶段提交过程看图:

事务逻辑

第一阶段是:步骤1,2,3。
第二阶段是:步骤4,5。

具体说明:

只有在消息发送成功,并且本地操作执行成功时,才发送提交事务消息,做事务提交。

其他的情况,例如消息发送失败,直接发送回滚消息,进行回滚,或者发送消息成功,但是执行本地操作失败,也是发送回滚消息,进行回滚。

事务消息原理实现过程:

一阶段:
Producer向Broker发送1条类型为TransactionPreparedType的消息,Broker接收消息保存在CommitLog中,然后返回消息的queueOffset和MessageId到Producer,MessageId包含有commitLogOffset(即消息在CommitLog中的偏移量,通过该变量可以直接定位到消息本身),由于该类型的消息在保存的时候,commitLogOffset没有被保存到consumerQueue中,此时客户端通过consumerQueue取不到commitLogOffset,所以该类型的消息无法被取到,导致不会被消费。

一阶段的过程中,Broker保存了1条消息。

二阶段:
Producer端的TransactionExecuterImpl执行本地操作,返回本地事务的状态,然后发送一条类型为TransactionCommitType或者TransactionRollbackType的消息到Broker确认提交或者回滚,Broker通过Request中的commitLogOffset,获取到上面状态为TransactionPreparedType的消息(简称消息A),然后重新构造一条与消息A内容相同的消息B,设置状态为TransactionCommitType或者TransactionRollbackType,然后保存。其中TransactionCommitType类型的,会放commitLogOffset到consumerQueue中,TransactionRollbackType类型的,消息体设置为空,不会放commitLogOffset到consumerQueue中。

二阶段的过程中,Broker也保存了1条消息。

总结:事务消息过程中,broker一共保存2条消息。

贴代码:


properties

project.build.sourceEncoding UTF-8 /project.build.sourceEncoding

logback.version 1.0.13 /logback.version

rocketmq.version 3.2.6 /rocketmq.version

/properties

dependencies

dependency

groupId ch.qos.logback /groupId

artifactId logback-classic /artifactId

version 1.0.13 /version

/dependency

dependency

groupId ch.qos.logback /groupId

artifactId logback-core /artifactId

version 1.0.13 /version

/dependency

dependency

groupId com.alibaba.rocketmq /groupId

artifactId rocketmq-client /artifactId

version ${rocketmq.version} /version

/dependency

dependency

groupId junit /groupId

artifactId junit /artifactId

version 4.10 /version

scope test /scope

/dependency

/dependencies


package com.zoo.quickstart.transaction;

import java.util.concurrent.atomic.AtomicInteger;

import com.alibaba.rocketmq.client.producer.LocalTransactionState;

import com.alibaba.rocketmq.client.producer.TransactionCheckListener;

import com.alibaba.rocketmq.common.message.MessageExt;

/**

* 未决事务,服务器回查客户端,broker端发起请求代码没有被调用,所以此处代码可能没用。

*/

public class TransactionCheckListenerImpl implements TransactionCheckListener {

private AtomicInteger transactionIndex = new AtomicInteger(0);

@Override

public LocalTransactionState checkLocalTransactionState(MessageExt msg) {

System.out.println("server checking TrMsg " + msg.toString());

int value = transactionIndex.getAndIncrement();

if ((value % 6) == 0) {

throw new RuntimeException("Could not find db");

}

else if ((value % 5) == 0) {

return LocalTransactionState.ROLLBACK_MESSAGE;

}

else if ((value % 4) == 0) {

return LocalTransactionState.COMMIT_MESSAGE;

}

return LocalTransactionState.UNKNOW;

}

}


package com.zoo.quickstart.transaction;

import java.util.concurrent.atomic.AtomicInteger;

import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;

import com.alibaba.rocketmq.client.producer.LocalTransactionState;

import com.alibaba.rocketmq.common.message.Message;

/**

* 执行本地事务

*/

public class TransactionExecuterImpl implements LocalTransactionExecuter {

private AtomicInteger transactionIndex = new AtomicInteger(1);

@Override

public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {

int value = transactionIndex.getAndIncrement();

if (value == 0) {

throw new RuntimeException("Could not find db");

}

else if ((value % 5) == 0) {

return LocalTransactionState.ROLLBACK_MESSAGE;

}

else if ((value % 4) == 0) {

return LocalTransactionState.COMMIT_MESSAGE;

}

return LocalTransactionState.UNKNOW;

}

}


package com.zoo.quickstart.transaction;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.client.producer.SendResult;

import com.alibaba.rocketmq.client.producer.TransactionCheckListener;

import com.alibaba.rocketmq.client.producer.TransactionMQProducer;

import com.alibaba.rocketmq.common.message.Message;

/**

* 发送事务消息例子

*

*/

public class TransactionProducer {

public static void main(String[] args) throws MQClientException, InterruptedException {

TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();

TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");

// 事务回查最小并发数

producer.setCheckThreadPoolMinSize(2);

// 事务回查最大并发数

producer.setCheckThreadPoolMaxSize(2);

// 队列数

producer.setCheckRequestHoldMax(2000);

producer.setTransactionCheckListener(transactionCheckListener);

producer.setNamesrvAddr("192.168.0.104:9876");

producer.start();

String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", "TagE" };

TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();

for (int i = 0; i 1; i++) {

try {

Message msg =

new Message("TopicTest", tags[i % tags.length], "KEY" + i,

("Hello RocketMQ " + i).getBytes());

SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);

System.out.println(sendResult);

Thread.sleep(10);

}

catch (MQClientException e) {

e.printStackTrace();

}

}

for (int i = 0; i 100000; i++) {

Thread.sleep(1000);

}

producer.shutdown();

}

}


SpringBoot整合RocketMQ发送事务消息 RocketMQ提供了类似X/Open XA的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA是一种分布式事务解决方案,一种分布式事务处理模式
解析 RocketMQ 业务消息——“事务消息” 本篇文章通过拆解 RocketMQ 事务消息的使用场景、基本原理、实现细节和实战使用,帮助大家更好的理解和使用 RocketMQ 的事务消息。
自顶向下学习 RocketMQ(八):事务消息原理分析 事务消息发送步骤如下: 生产者将半事务消息发送至消息队列 RocketMQ 版服务端。 消息队列 RocketMQ 版服务端将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息为半事务消息。 生产者开始执行本地事务逻辑。 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit 或是 Rollback)
自顶向下学习 RocketMQ(七):事务消息 RocketMQ 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ 的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。