zl程序教程

您现在的位置是:首页 >  后端

当前栏目

RocketMQ-实际开发中遇到的几个问题

开发 遇到 几个 实际 rocketmq 问题
2023-09-11 14:16:28 时间

消息幂等性

  • 什么是幂等性

一个操作任意执行多次与执行一次的结果相同,这个操作就是幂等

      生产者发送消息之后,为了确保消费者消费成功 我们通常会采用手动签收方式确认消费,MQ就是使用了消息超时、重传、确认机制来保证消息必达。

场景:
 1. 订单服务(生产者),点击结算订单之后需要付款,这时就会发送一条“结算”的消息到mq的broker中。
 2. 此时支付服务(消费者)监听到这条消息之后就会处理结算扣款的逻辑,然后手动签收订单告诉mq我已经消费完成了。
 3. 如果在结算的过程中程序出现了异常,我们就返回mq一个消费失败的状态,此时mq就会重新发送这条消息;或者是由于网络波动支付服务一直没有响应消息的消费状态,mq也照样会重新发送这条消息。
 4. 那么这种情况下,支付服务(消费者)就会重复收到这条消息,如果不做任何判断就有可能会重复消费出现多次扣款的情况。
解决方案:
  在发送消息的时候,我们可以生成一个唯一ID标识每一条消息,将消息处理成功和去重日志通过事物的形式写入去重表或缓存中。每次消费之前都先查一遍,如果存在就说明消费过了直接返回消费成功。

      RocketMQ在消费消息时,对应生产者重试发送的消息,RocketMQ做了消息幂等, 就是内部生成了一个inner-msg-id,作为消息去重和幂等的依据。 inner-msg-id是全局唯一,与业务无关,接收消息和发送消息双方都不知道

   对于下游系统,在MQ处理订单成功返回ack时,MQ-server会给下游系统的MQ-client发送消息,下游系统MQ-Client处理完业务会给MQ-Server发送Ack,在MQ-server接收到Ack后会进一步处理,但是如果这个ACk丢失,上游系统会重新发送,这是下游系统就会收到重复的消息,这是就需要用到业务ID,如订单支付成功后会生成一个订单支付成功的编号,下游系统接收到订单支付成功后,生成订单时也会生成一个支付成功后的编号对应的订单ID,如果再次收到支付成功的编号时会先检查是否已经存在该订单编号。

如何保证消息的顺序性

思考下,为什么我们要保证消息的顺序性呢,有什么好处呢?

看下下面这一组操作:

  1. 用户的积分默认是0分,而新注册用户设置为默认的10分。
  2. 用户有奖励行为,积分+2分。
  3. 用户有不正当行为,积分-3分。

这样一组操作,正常用户积分要变成9分。但是如果顺序乱了,这个结果就全部对不了。这时,就需要对这一组操作,保证消息都是有序的。

MQ的顺序问题分为全局有序和局部有序。

  • 全局有序:整个MQ系统的所有消息严格按照队列先入先出顺序进行消费。
  • 局部有序:只保证一部分关键消息的消费顺序。

顺序消息(FIFO 消息)是 MQ 提供的一种严格按照顺序进行发布和消费的消息类型。顺序消息由两个部分组成:顺序发布和顺序消费。

顺序消息包含两种类型:

  • 分区顺序:一个Partition内所有的消息按照先进先出的顺序进行发布和消费
  • 全局顺序:一个Topic内所有的消息按照先进先出的顺序进行发布和消费

       ​ 首先 我们需要分析下这个问题,在通常的业务场景中,全局有序和局部有序哪个更重要?其实在大部分的MQ业务场景,我们只需要能够保证局部有序就可以了。例如我们用QQ聊天,只需要保证一个聊天窗口里的消息有序就可以了。而对于电商订单场景,也只要保证一个订单的所有消息是有序的就可以了。至于全局消息的顺序,并不会太关心。而通常意义下,全局有序都可以压缩成局部有序的问题。例如以前我们常用的聊天室,就是个典型的需要保证消息全局有序的场景。但是这种场景,通常可以压缩成只有一个聊天窗口的QQ来理解。即整个系统只有一个聊天通道,这样就可以用QQ那种保证一个聊天窗口消息有序的方式来保证整个系统的全局消息有序。

      ​ 然后 落地到RocketMQ。通常情况下,发送者发送消息时,会通过MessageQueue轮询的方式保证消息尽量均匀的分布到所有的MessageQueue上,而消费者也就同样需要从多个MessageQueue上消费消息。而MessageQueue是RocketMQ存储消息的最小单元,他们之间的消息都是互相隔离的,在这种情况下,是无法保证消息全局有序的。

    ​ 而对于局部有序的要求,只需要将有序的一组消息都存入同一个MessageQueue里,这样MessageQueue的FIFO设计天生就可以保证这一组消息的有序。RocketMQ中,可以在发送者发送消息时指定一个MessageSelector对象,让这个对象来决定消息发入哪一个MessageQueue。这样就可以保证一组有序的消息能够发到同一个MessageQueue里。

    ​ 另外,通常所谓的保证Topic全局消息有序的方式,就是将Topic配置成只有一个MessageQueue队列(默认是4个)。这样天生就能保证消息全局有序了。这个说法其实就是我们将聊天室场景压缩成只有一个聊天窗口的QQ一样的理解方式。而这种方式对整个Topic的消息吞吐影响是非常大的,如果这样用,基本上就没有用MQ的必要了。

  • 生产者保证消息有序:

Producer端确保消息顺序唯一要做的事情就是将消息路由到特定的分区,在RocketMQ中,通过MessageQueueSelector来实现分区的选择。

public interface MessageQueueSelector {
    org.apache.rocketmq.common.message.MessageQueue select(java.util.List<org.apache.rocketmq.common.message.MessageQueue> list, org.apache.rocketmq.common.message.Message message, java.lang.Object o);
}
  • List mqs:消息要发送的Topic下所有的分区

  • Message msg:消息对象

  • 额外的参数:用户可以传递自己的参数

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {  
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {  
                        Integer id = (Integer) arg;  
                        int index = id % mqs.size();  
                        return mqs.get(index);  
                    }  
                }, 0);  
  • 消费者消费消息

RocketMQ消费端有两种类型:MQPullConsumer和MQPushConsumer。

MQPullConsumer由用户控制线程,主动从服务端获取消息,每次获取到的是一个MessageQueue中的消息。PullResult中的List msgFoundList自然和存储顺序一致,用户需要再拿到这批消息后自己保证消费的顺序。

对于PushConsumer,由用户注册MessageListener来消费消息,在客户端中需要保证调用MessageListener时消息的顺序性。


public interface MessageListenerOrderly extends org.apache.rocketmq.client.consumer.listener.MessageListener {
    org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus consumeMessage(java.util.List<org.apache.rocketmq.common.message.MessageExt> list, org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext consumeOrderlyContext);
}
  consumer.registerMessageListener( new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                MessageExt msg = msgs.get(0);
                int times = msg.getReconsumeTimes();
                try {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
                    //模拟异常
                    String str = new String(msgs.get(0).getBody());
                    if(str.contains("订单")) {
                        int i=1/0;
                    }
                    
                    //做业务逻辑操作
                    System.out.println("消费成功");
                    return ConsumeOrderlyStatus.SUCCESS;
    
                } catch (Exception e) {
                    System.out.println("重试次数"+times);
                    //如果重试2次不成功,则记录,人工介入
                    if(times >= 2){
                        System.out.println("重试次数大于2,记录数据库,发短信通知开发人员或者运营人员");
                        return ConsumeOrderlyStatus.SUCCESS;
                    }
                    e.printStackTrace();
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }
        });
        consumer.start();

处理积压消息

         当出现消息积压时,首先需要去排查导致消息积压的原因,在正常情况下,使用MQ都会要尽量保证他的消息生产速度和消费速度整体上是平衡的,但是如果部分消费者系统出现故障,就会造成大量的消息积累。这类问题通常在实际工作中会出现得比较隐蔽。例如某一天一个数据库突然挂了,大家大概率就会集中处理数据库的问题。等好不容易把数据库恢复过来了,这时基于这个数据库服务的消费者程序就会积累大量的消息。或者网络波动等情况,也会导致消息大量的积累。这在一些大型的互联网项目中,消息积压的速度是相当恐怖的。所以消息积压是个需要时时关注的问题。

        ​ 对于消息积压,如果是RocketMQ或者kafka还好,他们的消息积压不会对性能造成很大的影响。而如果是RabbitMQ的话,那就惨了,大量的消息积压可以瞬间造成性能直线下滑。​ 对于RocketMQ来说,有个最简单的方式来确定消息是否有积压。那就是使用web控制台,就能直接看到消息的积压情况。

另外,也可以通过mqadmin指令在后台检查各个Topic的消息延迟情况。还有RocketMQ也会在他的 ${storePathRootDir}/config 目录下落地一系列的json文件,也可以用来跟踪消息积压情况。

导致消息积压的几种情况:

  • Product生产者过多
  • Consumer消费者过少
  • 其他原因导致

如果消费者速度正常,只是生产者生产消息速度太快,可以通过上线更多的consumer临时解决消息堆积问题。

    如果Topic下的MessageQueue配置得是足够多的,那每个Consumer实际上会分配多个MessageQueue来进行消费。这个时候,就可以简单的通过增加Consumer的服务节点数量来加快消息的消费,等积压消息消费完了,再恢复成正常情况。最极限的情况是把Consumer的节点个数设置成跟MessageQueue的个数相同。但是如果此时再继续增加Consumer的服务节点就没有用了。

      而如果Topic下的MessageQueue配置得不够多的话,那就不能用上面这种增加Consumer节点个数的方法了。这时怎么办呢? 这时如果要快速处理积压的消息,可以创建一个新的Topic,配置足够多的MessageQueue。然后把所有消费者节点的目标Topic转向新的Topic,并紧急上线一组新的消费者,只负责消费旧Topic中的消息,并转储到新的Topic中,这个速度是可以很快的。然后在新的Topic上,就可以通过增加消费者个数来提高消费速度了。之后再根据情况恢复成正常情况。

       在官网中,还分析了一个特殊的情况。就是如果RocketMQ原本是采用的普通方式搭建主从架构,而现在想要中途改为使用Dledger高可用集群,这时候如果不想历史消息丢失,就需要先将消息进行对齐,也就是要消费者把所有的消息都消费完,再来切换主从架构。因为Dledger集群会接管RocketMQ原有的CommitLog日志,所以切换主从架构时,如果有消息没有消费完,这些消息是存在旧的CommitLog中的,就无法再进行消费了。这个场景下也是需要尽快的处理掉积压的消息。

  1. 理解下面几个问题:

堆积时间过长消息超时了?
RocketMQ中的消息只会在commitLog被删除的时候才会消失,不会超时。也就是说未被消费的消息不会存在超时删除这情况。
堆积的消息会不会进死信队列?
不会,消息在消费失败后会进入重试队列(%RETRY%+ConsumerGroup)

如何保证消息不被重复消费

为什么会出现消息重复?消息重复的原因有两个:1.生产时消息重复,2.消费时消息重复。

Broker处理消息阶段

 Broker收到消息后,持久化到磁盘:在配置文件中配置flushDiskType刷盘方式(同步刷盘、异步刷盘)为同步刷盘,当刷盘成功后再给Procuder成功响应,再搭配双主双从,实现双主同步避免丢失,但是会降低系统的吞吐量。同时也有潜在问题:由于数据已经刷入磁盘,Broker宕机后重启会继续处理,带来消息重复的问题。

Producer发送消息阶段

 常用保障手段:ack机制。Broker收到消息后给Procuder一个确认响应(ACK),如果没有收到响应,Procuder会直到收到Broker的确认响应后才会停止重试消息发送(潜在问题:消息重复)。还有一点需要注意的是,消息队列大都提供了自动ACK,需要手动ACK的时候需要关闭默认设置。还有在代码中做好异常处理,尤其是异步发送的回调中检查发送结果。

在实际生产业务场景下,可以通过业务ID保证不会消费到重复消息,就需要将接收到的消息业务ID与已经消费的业务ID比对,如果是已经消费过的就不会再次消费。