zl程序教程

您现在的位置是:首页 >  其他

当前栏目

(二)RocketMQ订阅与发布

2023-04-18 13:12:11 时间

RocketMQ保证消息可靠性

至少一次

At least Once:指每个消息必须投递一次。Consumer先Pull消息到本地,消费完成后,才向服务器返回ack,如果没有消费一定不会ack消息。

回溯消息

Consumer已经消费成功的消息,由于业务需求需要重新消费。RocketMQ可根据时间维度来回退消费进度。

事务消息

本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。

定时消息

定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。 暂存消息的Topic:SCHEDULE_TOPIC_XXXX

消息重试

Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。

Consumer在消费消息时出现运行时异常(下游应用服务不可用,例如db连接不可用,外系统网络不可达等、业务异常、反序列化失败等)导致消费失败的问题,RocketMQ可以暂时跳过该消息,再消费其他消息,过特定时间后再次重试。

消息重投

生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway没有任何保证。消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在RocketMQ中是无法避免的问题。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。

死信队列

死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 不会立刻将消息丢弃,而是将其发送到该消费者对应的死信队列。在控制台可以对死信进行再次消费。

人为保证RocketMQ消息可靠性

生产者发送消息

一个应用尽量使用一个Topic,不同类型的消息使用不同的tag标识。

每个消息在业务层面的唯一标识码设置到唯一Keys字段,方便定位消息丢失问题。

消息发送成功或失败都需要打印日志,只要不抛异常则说明发送成功,解析sendResult不同的成功状态。

  • SEND_OK 发送成功
  • FLUSH_DISK_TIMEOUT 刷盘超时
  • FLUSH_SLAVE_TIMEOUT 发送成功,服务器同步到Slave时超时
  • SLAVE_NOT_AVAILABLE 发送成功,Slave不可用

消息发送失败的异常处理

同步模式发送两次均失败后轮转到下一个Broker,10S后使用异步发送,超时异常不再发送

选择oneway方式发送

消费者

消费过程幂等

RocketMQ无法避免消息重复(Exactly-Once),所以业务对消费重复非常敏感,务必要在业务层进行去重处理。可以使用关系型数据库进行去重,首先保证消息的唯一键,(如msgId),在消费前判断关系型数据库中是否已存在,存在则跳过,不存在插入并消费。

如果同时接收大量消息时,可以先将数据保存在关系型数据库中,到达一定量后批量插入关系型数据库。

消费速度慢

  1. (提高消费并行度)消息消费属于IO密集型(操作数据库、RPC调用),这种场景取决于后端处理速度,同时优化消费过程。方案如下:
    • 同一个ConsumerGroup下,通过增加Consumer实例数量来提高并行度。
    • 通过修改参数 consumeThreadMin、consumeThreadMax实现消费者并行线程。
  2. (批量方式消费)
  3. (跳过非重要消息) 发生消息堆积时,如果消费速度一直追不上发送速度,如果业务对数据要求不高的话,可以选择丢弃不重要的消息。