初学 RocketMQ 之消息堆积
01 什么是消息堆积?
字面意思:堆积,就是把事物堆积成堆。这里指的就是消息堆积在一起,一直没有被消费或消费的很慢。
02 消息存储在哪?
消息一般会存在 Broker 服务里面。这里拿我自己搭建的环境,我们来看看(因为我没做好文件映射关系,所以直接进去容器看),一般消息都会放在/ R O C K E T M Q H O M E {ROCKETMQ_HOME} ROCKETMQHOME/store/里面,实体消息放在 commitLog 文件,consumequeue 是存放消息索引的。这个涉及到消息索引和持久化部分就不具体说明。
为什么会堆积?
先说结论,首先消息的生命周期简单来说是 “生产 - 消费” 这样的过程。一般来说生产者不会是消息堆积的诱因(感觉不一定,可以试验一下)。产生消堆积的原因一般是消费速度赶不上生产速度所引起的,可能主要有以下两种类型的代码(希望有更多小伙伴补充一下真实的场景):
- CPU 计算代码
- 外部 I/O 操作代码
下面就来模拟一下场景和看看堆积的现象
首先我们要有 “生产者” 和 “消费者” 的角色,这里用的是 spring-boot-starter 快速搭建环境和模拟的。
生产者
@Componentpublic class ReTrySender {
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 普通消息发送
public void delayOrdinarySend() {
Date currentTime = new Date();
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateString = formatter.format(currentTime);
DefaultMQProducer producer = rocketMQTemplate.getProducer();
producer.setRetryTimesWhenSendFailed(3);
String uuid = UUID.randomUUID().toString().replace("-", "");
String msg = "order" + uuid;
String key = "test";
Message<String> buildMsg = MessageBuilder.withPayload(msg).setHeader("KEYS", key).build();
SendResult sendResult = rocketMQTemplate.syncSend(
MqUntilSecond.tag_topic + ":" + MqUntilSecond.tag, buildMsg);
System.out.println("生产消息:"+ msg + ":" + dateString);
}}
消费者
@Component@RocketMQMessageListener(topic = MqUntilSecond.tag_topic,
consumerGroup = MqUntilSecond.consumerGroup,
messageModel = MessageModel.CLUSTERING,
consumeThreadMax = MqUntilSecond.maxThread,
consumeMode = ConsumeMode.CONCURRENTLY)public class ConsumerTag1 implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
@Override
public void onMessage(String message) {
int i = 1;
while (i < 10000){
try {
i ++;
Thread.sleep(1);
}catch (Exception e){
e.printStackTrace();
}
}
Date currentTime = new Date();
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateString = formatter.format(currentTime);
System.out.println("消费消息:" + message + ":" + dateString);
}
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
consumer.setConsumeTimeout(10);
consumer.setMaxReconsumeTimes(2);
}}
消费者配置
public class MqUntilSecond {
public static final String tag_topic = "tag_topic1"; // 主题
public static final String tag = "tag1"; // 标签
public static final String consumerGroup = "syncGroup"; // 消费集群
public static final int maxThread = 1; // 最大消费线程数}
实验部分
在消费者代码部分,我用了循环和线程休眠的方式来模拟一些耗时循环或长链路的场景,我们拿 Jmeter 对接口发起 1000 个请求来看看现象
我们看看 Console 控制台部分,可以看到消费者表单里,syncGroup 的 Delay 字段为 999
我们来看看控制台,这里的消息一直被慢慢的消费,现象是越靠后的消息,生产到消费的时间间隔越大。试想,我拿个系统验证码,一直都收不到那我是不是直接就不用了
我们把这段循环代码去掉,重启服务,再看看现象。消息在一瞬间被处理完毕
03 总结
- 首先可以确定的是代码设计不规范会引起消息堆积;
- 但消息堆积不仅仅是因为代码原因,也有可能是业务本身就存在消费赶不上生产,这个时候需要寻求的是其它解决办法;
- 除了代码,还需要关心配置部分。比如:可以调整生产者和消费者的配置,通过梳理业务和调整配置方式来找到系统最优的性能点;
- 实际业务中,如果压测会涉及到消息队列中间件,需要对它进行监控。
在这里还是要推荐下我自己建的软件测试学习Q群:746506216,群里都是学测试的,如果你想学或者正在学习测试,欢迎你加入,大家都是测试党,不定期分享干货(只有软件测试相关的),包括我自己整理的一份2022最新的Python自动化测试进阶资料和零基础教学,欢迎进阶中和对测试感兴趣的小伙伴加入!
相关文章
- Redis实现消息队列的4种方案
- 【开发者portal在线开发插件系列一】profile和基本上下行消息
- 【RocketMQ异常】Caused by: com.aliyun.openservices.shade.com.alibaba.rocketmq.client.exception.MQClientException: No route info of this topic, message-service-topic-testf
- JavaScript:原生JS实现Facebook实时消息抓捕
- 解析 RocketMQ 多样消费功能-消息过滤
- 解析 RocketMQ 业务消息--“顺序消息”
- 倒数 3 天|RocketMQ 能力全景图即将发布,定义下一代消息队列未来方向
- 阿里云消息队列 RocketMQ、Kafka 荣获金融级产品稳定性测评 “先进级” 认证
- 从消息到数据湖:看 Apache RocketMQ、Hudi、Kyuubi 最新进展
- 消息队列RocketMQ应对双十一流量洪峰的“六大武器”
- 消息积压---一般处理方法
- ActiveMQ持久化消息的三种方式
- spring boot:用rocketmq消息订阅实现删除购物车商品功能(spring boot 2.3.3)
- 新手学JavaScript(三)----超酷消息警告框插件(SweetAlert)
- 消息摘要算法-SHA算法实现
- 83、android的消息处理机制(图+源码分析)——Looper,Handler,Message
- spring boot单元测试之十二:用RabbitMQ mock库做消息生产/消费的mock(spring boot 2.4.4)
- spring boot:用rocketmq发送延时消息用来取消订单(spring boot 2.3.3)
- IIS 编译器错误消息: CS0016未能写入输出文件“c:WINDOWSMicrosoft.NETFrameworkv1.1.4322Temporary ASP.NET Filesroo
- 微信公众号发送视频消息和视频号,有什么区别?
- 如何将Twitter消息导入到SAP CRM和Cloud for Customer去
- 错误消息“Conversion factors are invalid”的准确抛出位置
- RocketMQ批量消息发送
- 慕课9、消息驱动的微服务-Spring Cloud Alibaba RocketMQ
- 音视频开发(四十二):Android消息机制ThreadLocal
- 一文带你认知定时消息发布RocketMQ
- android消息处理机制原理解析
- 【RabbitMQ笔记05】消息队列RabbitMQ七种模式之Routing路由键模式
- 【消息中间件】RocketMQ底层如何实现生产者发送消息与失败重试