RocketMQ学习(八):事务消息
源代码版本是3.2.6,还是直接跑源代码。rocketmq事务消息是发生在Producer和Broker之间,是二阶段提交。
二阶段提交过程看图:
第一阶段是:步骤1,2,3。
第二阶段是:步骤4,5。
具体说明:
只有在消息发送成功,并且本地操作执行成功时,才发送提交事务消息,做事务提交。
其他的情况,例如消息发送失败,直接发送回滚消息,进行回滚,或者发送消息成功,但是执行本地操作失败,也是发送回滚消息,进行回滚。
事务消息原理实现过程:
一阶段:
Producer向Broker发送1条类型为TransactionPreparedType的消息,Broker接收消息保存在CommitLog中,然后返回消息的queueOffset和MessageId到Producer,MessageId包含有commitLogOffset(即消息在CommitLog中的偏移量,通过该变量可以直接定位到消息本身),由于该类型的消息在保存的时候,commitLogOffset没有被保存到consumerQueue中,此时客户端通过consumerQueue取不到commitLogOffset,所以该类型的消息无法被取到,导致不会被消费。
一阶段的过程中,Broker保存了1条消息。
二阶段:
Producer端的TransactionExecuterImpl执行本地操作,返回本地事务的状态,然后发送一条类型为TransactionCommitType或者TransactionRollbackType的消息到Broker确认提交或者回滚,Broker通过Request中的commitLogOffset,获取到上面状态为TransactionPreparedType的消息(简称消息A),然后重新构造一条与消息A内容相同的消息B,设置状态为TransactionCommitType或者TransactionRollbackType,然后保存。其中TransactionCommitType类型的,会放commitLogOffset到consumerQueue中,TransactionRollbackType类型的,消息体设置为空,不会放commitLogOffset到consumerQueue中。
二阶段的过程中,Broker也保存了1条消息。
总结:事务消息过程中,broker一共保存2条消息。
贴代码:
properties
project.build.sourceEncoding UTF-8 /project.build.sourceEncoding
logback.version 1.0.13 /logback.version
rocketmq.version 3.2.6 /rocketmq.version
/properties
dependencies
dependency
groupId ch.qos.logback /groupId
artifactId logback-classic /artifactId
version 1.0.13 /version
/dependency
dependency
groupId ch.qos.logback /groupId
artifactId logback-core /artifactId
version 1.0.13 /version
/dependency
dependency
groupId com.alibaba.rocketmq /groupId
artifactId rocketmq-client /artifactId
version ${rocketmq.version} /version
/dependency
dependency
groupId junit /groupId
artifactId junit /artifactId
version 4.10 /version
scope test /scope
/dependency
/dependencies
package com.zoo.quickstart.transaction;
import java.util.concurrent.atomic.AtomicInteger;
import com.alibaba.rocketmq.client.producer.LocalTransactionState;
import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
import com.alibaba.rocketmq.common.message.MessageExt;
/**
* 未决事务,服务器回查客户端,broker端发起请求代码没有被调用,所以此处代码可能没用。
*/
public class TransactionCheckListenerImpl implements TransactionCheckListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
@Override
public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
System.out.println("server checking TrMsg " + msg.toString());
int value = transactionIndex.getAndIncrement();
if ((value % 6) == 0) {
throw new RuntimeException("Could not find db");
}
else if ((value % 5) == 0) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
else if ((value % 4) == 0) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
}
package com.zoo.quickstart.transaction;
import java.util.concurrent.atomic.AtomicInteger;
import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;
import com.alibaba.rocketmq.client.producer.LocalTransactionState;
import com.alibaba.rocketmq.common.message.Message;
/**
* 执行本地事务
*/
public class TransactionExecuterImpl implements LocalTransactionExecuter {
private AtomicInteger transactionIndex = new AtomicInteger(1);
@Override
public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {
int value = transactionIndex.getAndIncrement();
if (value == 0) {
throw new RuntimeException("Could not find db");
}
else if ((value % 5) == 0) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
else if ((value % 4) == 0) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
}
package com.zoo.quickstart.transaction;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
import com.alibaba.rocketmq.client.producer.TransactionMQProducer;
import com.alibaba.rocketmq.common.message.Message;
/**
* 发送事务消息例子
*
*/
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
// 事务回查最小并发数
producer.setCheckThreadPoolMinSize(2);
// 事务回查最大并发数
producer.setCheckThreadPoolMaxSize(2);
// 队列数
producer.setCheckRequestHoldMax(2000);
producer.setTransactionCheckListener(transactionCheckListener);
producer.setNamesrvAddr("192.168.0.104:9876");
producer.start();
String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", "TagE" };
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
for (int i = 0; i 1; i++) {
try {
Message msg =
new Message("TopicTest", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes());
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
System.out.println(sendResult);
Thread.sleep(10);
}
catch (MQClientException e) {
e.printStackTrace();
}
}
for (int i = 0; i 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
SpringBoot整合RocketMQ发送事务消息 RocketMQ提供了类似X/Open XA的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA是一种分布式事务解决方案,一种分布式事务处理模式
解析 RocketMQ 业务消息——“事务消息” 本篇文章通过拆解 RocketMQ 事务消息的使用场景、基本原理、实现细节和实战使用,帮助大家更好的理解和使用 RocketMQ 的事务消息。
自顶向下学习 RocketMQ(八):事务消息原理分析 事务消息发送步骤如下: 生产者将半事务消息发送至消息队列 RocketMQ 版服务端。 消息队列 RocketMQ 版服务端将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息为半事务消息。 生产者开始执行本地事务逻辑。 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit 或是 Rollback)
自顶向下学习 RocketMQ(七):事务消息 RocketMQ 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ 的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
相关文章
- 13-RabbitMQ高级特性-Confirm确认消息
- Qt中各种消息框的使用
- 【RocketMQ】发送事务消息
- 消息队列RocketMQ版分布式事务消息
- vue.js客服系统实时聊天项目开发(十一)处理发送消息enter事件以及实现ctrl+enter换行
- Redis 应用实践-消息队列
- SAP ABAP 通过 https 消费外部 API 遇到错误消息 SSSLERR_SSL_CONNECT
- redis实现消息队列教程详解数据库
- 使用消息队列实现分布式事务-公认较为理想的分布式事务解决方案详解架构师
- Spring Cloud(九):分布式配置中心和消息总线详解编程语言
- Python3 itchat实现微信定时发送群消息详解编程语言
- 有消息显示类似Win8的Charms bar功能即将来到Win10
- 消息称国家市场监督管理总局要求腾讯音乐娱乐集团放弃独家版权
- 重磅消息:F5收购Nginx!
- MySQL XA消息演示实现分布式事务的一种方式(mysql xa 演示)
- Redis消息队列如何做到加强备份的完整性(redis消息队列 备份)
- aspxgridviewCustomButtonCallback不支持弹出消息提示解决方法