zl程序教程

您现在的位置是:首页 >  Javascript

当前栏目

RocketMQ5.0.0消息消费<三> _ 消息消费

2023-04-18 14:24:36 时间

目录

一、消息消费

1. 消费UML图

2. 提交消息

3. 消费消息

二、消息确认(ACK)

1. 消费端处理消费结果

2. Broker端处理消费ACK请求

三、消费进度管理

1. 广播模式消费进度存储

2. 集群模式消费进度存储

四、参考资料


一、消息消费

1. 消费UML图

        PUSH模式消息拉取机制参考《RocketMQ5.0.0消息消费<一> _ PUSH模式的消息拉取​》,PullMessageService负责对消息队列进行消息拉取,从Broker端拉取消息后将消息存入ProcessQueue消息处理队列中,后调用ConsumeMessageService#submitConsumeRequest方法将消息提交到线程池。使用消费线程池确保了消息拉取与消息消费的解耦。RocketMQ使用ConsumeMessageService来实现消息消费的处理逻辑。

        RocketMQ支持顺序消费与并发消费,本章节介绍并发消费消息的流程。

        下图所示是消费消息UML图,org.apache.rocketmq.client.impl.consumer.ConsumeMessageService维护一个消费线程池。

        org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService是PUSH模式的并发消息消费实现类,其关键属性如下。 

// 消息推模式实现类
private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
// 默认推模式消费者
private final DefaultMQPushConsumer defaultMQPushConsumer;
// 并发消息监听
private final MessageListenerConcurrently messageListener;
// 消费线程池的任务队列
private final BlockingQueue<Runnable> consumeRequestQueue;
// 消费线程池
private final ThreadPoolExecutor consumeExecutor;
// 消费组
private final String consumerGroup;

// 消费延迟调度器
private final ScheduledExecutorService scheduledExecutorService;
// 定时删除过期消息线程池
private final ScheduledExecutorService cleanExpireMsgExecutors;

2. 提交消息

        参考《RocketMQ5.0.0消息消费<一> _ PUSH模式的消息拉取​》,成功拉取消息后,org.apache.rocketmq.client.consumer.PullCallback回调onSuccess(),把消息提交(异步提交)到ConsumeMessageService的线程池中,供消费者消费,则本次拉取消息完成。

        org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest是提交拉取的消息到消费线程池中的核心方法,如下代码所示。注意事项:

  • consumeMessageBatchMaxSize:获取并发消费时一次消费消息条数,默认1条。
  • 判断拉取的消息条数(默认最大32条)与 consumeBatchSize大小比较

        a. msgs.size() <= consumeBatchSize时:组装消费请求ConsumeRequest并提交消费任务到消费线程池中;

        b. msgs.size() > consumeBatchSize时:分页提交到消费线程池中。

  • submitConsumeRequestLater():消费线程池饱和拒绝时,则延迟5s再次提交消费请求
/**
 * 提交消息消费,供消费者消费
 * 并发消息消费入口:{@link DefaultMQPushConsumerImpl#pullMessage}中的{@link org.apache.rocketmq.client.consumer.PullCallback}
 * step1:获取并发消费时一次消费消息条数,默认1条(DefaultMQPushConsumer.consumeMessageBatchMaxSize)
 * step2:msgs.size()一次拉取消息的条数,最大32条 <= consumeBatchSize时,
 *          a. 组装消费请求并提交消费任务到消费线程池中
 *          b. 出现饱和抛出异常时,延迟5s提交{@link ConsumeMessageConcurrentlyService#submitConsumeRequestLater(ConsumeRequest)}
 * step3:msgs.size()一次拉取消息的条数,最大32条 > consumeBatchSize时,分页提交到消费线程池中
 * @param msgs 一次拉取待消费消息,最大默认32条{@link DefaultMQPushConsumer#pullBatchSize}
 * @param processQueue 待消息消费处理队列
 * @param messageQueue 消息所属消费队列
 * @param dispatchToConsume 是否转发到消费线程池,并发消费则忽略
 */
@Override
public void submitConsumeRequest(
    final List<MessageExt> msgs,
    final ProcessQueue processQueue,
    final MessageQueue messageQueue,
    final boolean dispatchToConsume) {
    // 并发消费时一次消费消息条数,默认1条(DefaultMQPushConsumer.consumeMessageBatchMaxSize)
    final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
    // msgs.size()一次拉取消息的条数,最大32条
    if (msgs.size() <= consumeBatchSize) {
        // 组装消费请求
        ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
        try {
            // 消费任务直接提交到消费线程池,具体消费逻辑ConsumeMessageConcurrentlyService.ConsumeRequest.run
            this.consumeExecutor.submit(consumeRequest);
        } catch (RejectedExecutionException e) {
            // 延迟5s提交到消费线程池
            this.submitConsumeRequestLater(consumeRequest);
        }
    }
    // msgs.size() > consumeBatchSize时,对拉取消息进行分页,每页有consumeBatchSize条消息
    else {
        for (int total = 0; total < msgs.size(); ) {
            // 每页有consumeBatchSize条消息
            List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
            for (int i = 0; i < consumeBatchSize; i++, total++) {
                if (total < msgs.size()) {
                    msgThis.add(msgs.get(total));
                } else {
                    break;
                }
            }

            ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                for (; total < msgs.size(); total++) {
                    msgThis.add(msgs.get(total));
                }

                this.submitConsumeRequestLater(consumeRequest);
            }
        }
    }
}

3. 消费消息

        org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest是提交线程池中的消费任务,该类是个线程。其关键属性如下。

// 提交消费请求的消息
private final List<MessageExt> msgs;
// PullRequest.messageQueue属性:待拉取消费队列(负载均衡后的分配的消息队列)
private final ProcessQueue processQueue;
// PullRequest.processQueue属性:消息处理队列(存储已拉取的消息)
private final MessageQueue messageQueue;

        ConsumeMessageConcurrentlyService.ConsumeRequest#run方法执行消息消费逻辑,代码如下。注意事项:

  • processQueue.isDropped():判断拉取消息处理队列是否被丢弃(消费队列重新负载 _ 删除处理),避免消息重复消费;ACK时,也要进行是否丢弃判断
  • MessageListenerConcurrently#consumeMessage:具体的消息消费业务逻辑,返回该批消息的消费结果。
  • ConsumeMessageConcurrentlyService#processConsumeResult():消费者消费消息后,进行消息ACK确认,下小节介绍
  • ConsumeConcurrentlyStatus status:该批消息的消费结果(成功CONSUME_SUCCESS、失败RECONSUME_LATER)
/**
 * 并发消息消费的具体逻辑
 * 入口:{@link ConsumeMessageConcurrentlyService#submitConsumeRequest}
 * step1:判断当前消费者的ProcessQueue是否被丢弃,true时重新负载均衡后所属消费队列被消费组内其他消费则占用
 * step2:恢复重试消息主题名(消息重试机制决定)
 * step3:执行消费前的钩子函数
 * step4:具体的消息消费业务逻辑,返回该批消息的消费结果
 *        {@link MessageListenerConcurrently#consumeMessage(List, ConsumeConcurrentlyContext)}
 * step5:判断是否消费超时
 * step6:执行消费后钩子函数
 * step7:再次判断当前消费者的ProcessQueue是否被丢弃(消费过程中是否被修改),防止重复消费
 *        没有丢弃,则处理消费结果,ACK机制{@link ConsumeMessageConcurrentlyService#processConsumeResult(ConsumeConcurrentlyStatus, ConsumeConcurrentlyContext, ConsumeRequest)}
 */
@Override
public void run() {
    // 判断当前消费者的ProcessQueue是否被丢弃,true时重新负载均衡后所属消费队列被消费组内其他消费则占用
    if (this.processQueue.isDropped()) {
        log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
        return;
    }

    // 并发消息消费监听器
    MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
    ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
    ConsumeConcurrentlyStatus status = null;

    // 恢复重试消息主题名(消息重试机制决定)
    defaultMQPushConsumerImpl.tryResetPopRetryTopic(msgs, consumerGroup);
    defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());

    // 执行消费前的钩子函数
    ConsumeMessageContext consumeMessageContext = null;
    if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
        consumeMessageContext = new ConsumeMessageContext();
        consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
        consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
        consumeMessageContext.setProps(new HashMap<String, String>());
        consumeMessageContext.setMq(messageQueue);
        consumeMessageContext.setMsgList(msgs);
        consumeMessageContext.setSuccess(false);
        // 执行消费前钩子函数
        ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
    }

    long beginTimestamp = System.currentTimeMillis();
    boolean hasException = false;
    ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
    try {
        if (msgs != null && !msgs.isEmpty()) {
            for (MessageExt msg : msgs) {
                // 设置消费开始时间
                MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
            }
        }
        // 具体的消息消费业务逻辑,返回该批消息的消费结果
        status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
    } catch (Throwable e) {
        log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
            RemotingHelper.exceptionSimpleDesc(e),
            ConsumeMessageConcurrentlyService.this.consumerGroup,
            msgs,
            messageQueue), e);
        hasException = true;
    }
    long consumeRT = System.currentTimeMillis() - beginTimestamp;
    if (null == status) {
        if (hasException) {
            returnType = ConsumeReturnType.EXCEPTION;
        } else {
            returnType = ConsumeReturnType.RETURNNULL;
        }
    }
    // 消费是否超时
    else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
        returnType = ConsumeReturnType.TIME_OUT;
    } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
        returnType = ConsumeReturnType.FAILED;
    } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
        returnType = ConsumeReturnType.SUCCESS;
    }

    if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
        consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
    }

    if (null == status) {
        log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
            ConsumeMessageConcurrentlyService.this.consumerGroup,
            msgs,
            messageQueue);
        status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }

    // 执行消费后钩子函数
    if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
        consumeMessageContext.setStatus(status.toString());
        consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
        ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
    }

    ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
        .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);

    // 再次判断当前消费者的ProcessQueue是否被丢弃(消费过程中是否被修改),防止重复消费
    if (!processQueue.isDropped()) {
        // 处理消费结果,ACK机制
        ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
    } else {
        log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
    }
}

二、消息确认(ACK)

        根据上节的介绍,PUSH模式拉取消息后,拉取请求提交到消费线程池,消费者消费消息。org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus是消费者消费该批消息的结果,只有在消费结果是失败(RECONSUME_LATER)时,这批消息逐条发送消费ACK确认。 

        Broker处理消费ACK请求时,消息主题修改为重试主题(%RETRY% + 消费组名称),而原始主题存储到消息扩展属性。同时,判断消费次数是否超出最大重试消费(默认16次),若是则进入DLQ队列。新消息再次存储到Commitlog文件,而不是直接修改消息,原因是RocketMQ顺序写入Commitlog文件,随机读,提高消息的吞吐量。消费者重新消费时,恢复为原始主题。

1. 消费端处理消费结果

        org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult是处理消费结果的核心逻辑方法,如下代码。注意事项:

  • 根据消费结果计算ackIndex:

                该批消息消费成功CONSUME_SUCCESS:ackIndex为msgs.size() - 1

                该批消息消费失败RECONSUME_LATER:ackIndex为-1

  • 根据消费模式处理失败RECONSUME_LATER:

                广播:只打印日志。

                集群:该批消息逐条向Broker同步发送ACK确认,Broker返回失败时设置消费次数(+1)并再次封装成ConsumeRequest,且延迟5s重新消费

  • 消费结果成功时:更新消费进度(移除该批消息后,获取剩下的msgTreeMap消息中最小的偏移量,避免重复消费),下小节介绍消费进度管理。
/**
 * 处理消费结果
 * step1:计算ackIndex,为消费ACK准备:
 *            成功CONSUME_SUCCESS:ackIndex为msgs.size() - 1
 *            失败RECONSUME_LATER:ackIndex为-1
 * step2:消费模式,处理业务方返回RECONSUME_LATER的消息:
 *            广播BROADCASTING:RECONSUME_LATER的消息执行,只打印日志
 *            集群CLUSTERING:本批消息只要有一个返回RECONSUME_LATER,则本批消息都需要ACK,ACK返回失败,设置消费次数
 * step3:更新消费进度:
 *            移除该批消息后,获取剩下的msgTreeMap消息中最小的偏移量,避免重复消费
 */
public void processConsumeResult(
    final ConsumeConcurrentlyStatus status,
    final ConsumeConcurrentlyContext context,
    final ConsumeRequest consumeRequest
) {
    int ackIndex = context.getAckIndex();

    if (consumeRequest.getMsgs().isEmpty())
        return;

    // 计算ackIndex,为消费ACK准备
    switch (status) {
        // 成功,则ackIndex为msgs.size() - 1
        case CONSUME_SUCCESS:
            if (ackIndex >= consumeRequest.getMsgs().size()) {
                ackIndex = consumeRequest.getMsgs().size() - 1;
            }
            int ok = ackIndex + 1;
            int failed = consumeRequest.getMsgs().size() - ok;
            this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
            this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
            break;
        // 失败,则ackIndex为-1
        case RECONSUME_LATER:
            ackIndex = -1;
            this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                consumeRequest.getMsgs().size());
            break;
        default:
            break;
    }

    // 根据消费模式,处理业务方返回RECONSUME_LATER的消息
    switch (this.defaultMQPushConsumer.getMessageModel()) {
        case BROADCASTING:
            // RECONSUME_LATER的消息执行,只打印日志
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
            }
            break;
        case CLUSTERING:
            // 本批消息只要有一个返回RECONSUME_LATER,则本批消息都需要ACK
            List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                // 每条消息,向Broker发送ACK
                boolean result = this.sendMessageBack(msg, context);
                // ACK返回失败的消息,设置消费次数
                if (!result) {
                    msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                    msgBackFailed.add(msg);
                }
            }

            // ACK返回失败的消息再次封装成ConsumeRequest,且延迟5s重新消费
            if (!msgBackFailed.isEmpty()) {
                consumeRequest.getMsgs().removeAll(msgBackFailed);

                this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
            }
            break;
        default:
            break;
    }

    // 移除消息后,获取剩下的msgTreeMap消息,最小的偏移量,避免重复消费
    long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
    // 更新本地消费队列的消费进度
    if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
        this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
    }
}

        org.apache.rocketmq.client.impl.MQClientAPIImpl#consumerSendMessageBack是同步发送消息ACK确认核心方法,如下代码。请求码RequestCode.CONSUMER_SEND_MSG_BACK,注意事项:

  • delayLevel(延迟级别):取ConsumeConcurrentlyContext#delayLevelWhenNextConsume属性(消费延迟重试策略),值有:

                -1:不重试,直接进入死信队列DLQ
                0(默认):Broker控制重试次数
                >0:消费端控制重试次数

/**
 * 同步向Broker发送消息消费ACK请求
 * step1:构建消费确认ACK的发送请求头{@link ConsumerSendMsgBackRequestHeader};
 * step2:同步发送ACK
 * 注意:delayLevel(延迟级别):取{@link ConsumeConcurrentlyContext#delayLevelWhenNextConsume}属性(消费延迟重试策略),值有:
 *                      -1:不重试,直接进入死信队列DLQ;0(默认):Broker控制重试次数;>0:消费端控制重试次数
 * Broker入口:{@link org.apache.rocketmq.broker.processor.SendMessageProcessor#processRequest}
 * @param addr Broker地址
 * @param msg
 * @param consumerGroup
 * @param delayLevel 延迟级别
 * @param timeoutMillis 超时时间
 * @param maxConsumeRetryTimes 最大消费重试次数
 */
public void consumerSendMessageBack(
    final String addr,
    final MessageExt msg,
    final String consumerGroup,
    final int delayLevel,
    final long timeoutMillis,
    final int maxConsumeRetryTimes
) throws RemotingException, MQBrokerException, InterruptedException {
    // 构建消费确认ACK的发送请求头
    ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
    // 发送请求码CONSUMER_SEND_MSG_BACK
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);

    requestHeader.setGroup(consumerGroup);
    requestHeader.setOriginTopic(msg.getTopic());
    requestHeader.setOffset(msg.getCommitLogOffset());
    requestHeader.setDelayLevel(delayLevel);
    requestHeader.setOriginMsgId(msg.getMsgId());
    requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);

    RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
        request, timeoutMillis);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            return;
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}

2. Broker端处理消费ACK请求

        SendMessageProcessor#processRequest是Broker端接收消费ACK请求方法,请求码RequestCode.CONSUMER_SEND_MSG_BACK。

        org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#consumerSendMsgBack是消费ACK的核心方法,代码如下。注意事项:

  • 主题修改为重试主题:重试主题(%RETRY% + 消费组名称),并且从重试队列中随机选择一个队列;消息原始主题存储到消息扩展属性
  • 判断延迟级别delayLevel:见上小节《消费端处理消费结果》,默认0是Broker控制重试次数。
  • 判定消费重试次数是否超出最大重试次数:

        a. 消费次数 >= 最大重试消费次数 或 delayLevel < 0时:进入DLQ队列(只有写权限),不在进行消费,需要人工干预

        b. 消费次数 < 最大重试消费次数时:若是delayLevel = 0 (默认)则重置delayLevel,后消息进行延迟消费(设置延迟级别)。

  • 创建新消息对象:创建新消息并提交到Commitlog文件内存,而不是修改消息(重试次数)。原因是:RocketMQ顺序写入Commitlog文件,随机读,提高消息的吞吐量
/**
 * Broker处理消息消费ACK确认
 * 消费者ACK入口:{@link MQClientAPIImpl#consumerSendMessageBack}
 * step1:获取重试主题(%RETRY% + 消费组名称),并且从重试队列中随机选择一个队列
 * step2:构建重试主题配置信息
 *        {@link TopicConfigManager#createTopicInSendMessageBackMethod}
 * step3:判断是否有写权限
 * step4:根据offset从Commitlog获取消息
 * step5:消息的原始主题存入到属性中
 * step6:获取最大重试消费次数,默认16次
 *        {@link SubscriptionGroupConfig.retryMaxTimes}
 * step7:消费次数 >= 最大重试消费次数 或 delayLevel < 0时,进入DLQ队列(只有写权限),不在进行消费,需要人工干预
 *       消费次数 < 最大重试消费次数,若是delayLevel = 0 (默认)则重置delayLevel,后消息进行延迟消费(设置延迟级别)
 * step8:创建新的消息对象,并重新提交到Commitlog文件内存
 *        topic为重试主题、新的msgId,其他属性与原消息一致;新的消息对象存储到Commitlog文件中;原来消息的topic、msgId存入新消息属性。
 */
protected RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
    throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final ConsumerSendMsgBackRequestHeader requestHeader =
        (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);

    // The send back requests sent to SlaveBroker will be forwarded to the master broker beside
    final BrokerController masterBroker = this.brokerController.peekMasterBroker();
    if (null == masterBroker) {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("no master available along with " + brokerController.getBrokerConfig().getBrokerIP1());
        return response;
    }

    // The broker that received the request.
    // It may be a master broker or a slave broker
    final BrokerController currentBroker = this.brokerController;

    SubscriptionGroupConfig subscriptionGroupConfig =
        masterBroker.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
    if (null == subscriptionGroupConfig) {
        response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
        response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
            + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
        return response;
    }

    BrokerConfig masterBrokerConfig = masterBroker.getBrokerConfig();
    if (!PermName.isWriteable(masterBrokerConfig.getBrokerPermission())) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark("the broker[" + masterBrokerConfig.getBrokerIP1() + "] sending message is forbidden");
        return response;
    }

    if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }

    // 获取重试主题:%RETRY% + 消费组名称
    String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
    // 从重试队列中随机选择一个队列
    int queueIdInt = this.random.nextInt(subscriptionGroupConfig.getRetryQueueNums());

    int topicSysFlag = 0;
    if (requestHeader.isUnitMode()) {
        topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
    }

    // Create retry topic to master broker 构建重试主题配置信息
    TopicConfig topicConfig = masterBroker.getTopicConfigManager().createTopicInSendMessageBackMethod(
        newTopic,
        subscriptionGroupConfig.getRetryQueueNums(),
        PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
    if (null == topicConfig) {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("topic[" + newTopic + "] not exist");
        return response;
    }

    // 是否有写权限
    if (!PermName.isWriteable(topicConfig.getPerm())) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
        return response;
    }

    // 根据offset从Commitlog获取消息
    // Look message from the origin message store
    MessageExt msgExt = currentBroker.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
    if (null == msgExt) {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("look message by offset failed, " + requestHeader.getOffset());
        return response;
    }

    //for logic queue
    if (requestHeader.getOriginTopic() != null
        && !msgExt.getTopic().equals(requestHeader.getOriginTopic())) {
        //here just do some fence in case of some unexpected offset is income
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("look message by offset failed to check the topic name" + requestHeader.getOffset());
        return response;
    }

    // 消息的原始主题存入到属性中
    final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
    if (null == retryTopic) {
        MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
    }
    msgExt.setWaitStoreMsgOK(false);

    // 延迟级别
    int delayLevel = requestHeader.getDelayLevel();

    // 获取最大重试消费次数,默认16次
    int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
    if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
        Integer times = requestHeader.getMaxReconsumeTimes();
        if (times != null) {
            maxReconsumeTimes = times;
        }
    }

    // 消费次数 >= 最大重试消费次数时,进入DLQ队列(只有写权限),不在进行消费,需要人工干预
    boolean isDLQ = false;
    if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
        || delayLevel < 0) {

        // 进入DQL队列,重置newTopic为:%DLQ% + 消费组
        isDLQ = true;
        newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
        queueIdInt = randomQueueId(DLQ_NUMS_PER_GROUP);

        // Create DLQ topic to master broker
        topicConfig = masterBroker.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
            DLQ_NUMS_PER_GROUP,
            PermName.PERM_WRITE | PermName.PERM_READ, 0);

        if (null == topicConfig) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("topic[" + newTopic + "] not exist");
            return response;
        }
        msgExt.setDelayTimeLevel(0);
    }
    // 消费次数 < 最大重试消费次数,则消息进行延迟消费
    else {
        // 根据消费次数,设置延迟级别
        if (0 == delayLevel) {
            delayLevel = 3 + msgExt.getReconsumeTimes();
        }

        msgExt.setDelayTimeLevel(delayLevel);
    }

    /*
        创建新的消息对象
        a. topic为重试主题、新的msgId,其他属性与原消息一致;
        b. 新的消息对象存储到Commitlog文件中;
        c. 原来消息的topic、msgId存入新消息属性。
     */
    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    msgInner.setTopic(newTopic);
    msgInner.setBody(msgExt.getBody());
    msgInner.setFlag(msgExt.getFlag());
    MessageAccessor.setProperties(msgInner, msgExt.getProperties());
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
    msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));

    msgInner.setQueueId(queueIdInt);
    msgInner.setSysFlag(msgExt.getSysFlag());
    msgInner.setBornTimestamp(msgExt.getBornTimestamp());
    msgInner.setBornHost(msgExt.getBornHost());
    msgInner.setStoreHost(this.getStoreHost());
    msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1); // 消费次数+1

    String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
    MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));

    boolean succeeded = false;

    // 存储到Commitlog中
    // Put retry topic to master message store
    PutMessageResult putMessageResult = masterBroker.getMessageStore().putMessage(msgInner);
    if (putMessageResult != null) {
        String commercialOwner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);

        switch (putMessageResult.getPutMessageStatus()) {
            case PUT_OK:
                String backTopic = msgExt.getTopic();
                String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
                if (correctTopic != null) {
                    backTopic = correctTopic;
                }
                if (TopicValidator.RMQ_SYS_SCHEDULE_TOPIC.equals(msgInner.getTopic())) {
                    masterBroker.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
                    masterBroker.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes());
                    masterBroker.getBrokerStatsManager().incQueuePutNums(msgInner.getTopic(), msgInner.getQueueId());
                    masterBroker.getBrokerStatsManager().incQueuePutSize(msgInner.getTopic(), msgInner.getQueueId(), putMessageResult.getAppendMessageResult().getWroteBytes());
                }
                masterBroker.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);

                if (isDLQ) {
                    masterBroker.getBrokerStatsManager().incDLQStatValue(
                        BrokerStatsManager.SNDBCK2DLQ_TIMES,
                        commercialOwner,
                        requestHeader.getGroup(),
                        requestHeader.getOriginTopic(),
                        BrokerStatsManager.StatsType.SEND_BACK_TO_DLQ.name(),
                        1);

                    String uniqKey = msgInner.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                    DLQ_LOG.info("send msg to DLQ {}, owner={}, originalTopic={}, consumerId={}, msgUniqKey={}, storeTimestamp={}",
                        newTopic,
                        commercialOwner,
                        requestHeader.getOriginTopic(),
                        requestHeader.getGroup(),
                        uniqKey,
                        putMessageResult.getAppendMessageResult().getStoreTimestamp());
                }

                response.setCode(ResponseCode.SUCCESS);
                response.setRemark(null);

                succeeded = true;
                break;
            default:
                break;
        }

        if (!succeeded) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark(putMessageResult.getPutMessageStatus().name());
        }
    } else {
        if (isDLQ) {
            String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
            String uniqKey = msgInner.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
            DLQ_LOG.info("failed to send msg to DLQ {}, owner={}, originalTopic={}, consumerId={}, msgUniqKey={}, result={}",
                newTopic,
                owner,
                requestHeader.getOriginTopic(),
                requestHeader.getGroup(),
                uniqKey,
                "null");
        }

        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark("putMessageResult is null");
    }

    if (this.hasConsumeMessageHook() && !UtilAll.isBlank(requestHeader.getOriginMsgId())) {
        String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup());
        ConsumeMessageContext context = new ConsumeMessageContext();
        context.setNamespace(namespace);
        context.setTopic(requestHeader.getOriginTopic());
        context.setConsumerGroup(requestHeader.getGroup());
        context.setCommercialRcvStats(BrokerStatsManager.StatsType.SEND_BACK);
        context.setCommercialRcvTimes(1);
        context.setCommercialOwner(request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER));

        context.setAccountAuthType(request.getExtFields().get(BrokerStatsManager.ACCOUNT_AUTH_TYPE));
        context.setAccountOwnerParent(request.getExtFields().get(BrokerStatsManager.ACCOUNT_OWNER_PARENT));
        context.setAccountOwnerSelf(request.getExtFields().get(BrokerStatsManager.ACCOUNT_OWNER_SELF));
        context.setRcvStat(isDLQ ? BrokerStatsManager.StatsType.SEND_BACK_TO_DLQ : BrokerStatsManager.StatsType.SEND_BACK);
        context.setSuccess(succeeded);
        context.setRcvMsgNum(1);
        //Set msg body size 0 when sent back by consumer.
        context.setRcvMsgSize(0);
        context.setCommercialRcvMsgNum(succeeded ? 1 : 0);

        try {
            this.executeConsumeMessageHookAfter(context);
        } catch (AbortProcessException e) {
            response.setCode(e.getResponseCode());
            response.setRemark(e.getErrorMessage());
        }
    }

    return response;
}

        重试新消息的延迟时间达到后,则允许再次拉取消息,进而重新消费。拉取消息提交到消费线程消费时,重试主题恢复到原始主题重新消费。ConsumeRequest#run()消费消息任务,详细见上节,如下代码所示。

@Override
public void run() {
    ......

    // 恢复重试消息主题名(消息重试机制决定)
    defaultMQPushConsumerImpl.tryResetPopRetryTopic(msgs, consumerGroup);
    defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());

    ......
}

三、消费进度管理

        上面章节介绍,消费一批消息成功后,则ProceeQueue移除该批消息,获取剩下的msgTreeMap消息中最小的偏移量,避免重复消费,同时更新消费偏移量。那么消费进度存储在哪里呢?

  • 广播模式:同一个消费组的所有消费者都需要消费主题下的所有消息,也就是同组内的消费者的消息消费行为是对立的,互相不影响,所以消费进度存储在本地消费者
  • 集群模式:同一个消费组的所有消费者共享主题下的所有消息,同一个消息消费队列在同一时间只会被消费组内的一个消费者消费,并且随着消费队列的动态变化重新负载,所以消费进度存储在Broker

        org.apache.rocketmq.client.consumer.store.OffsetStore是消费进度接口,其UML图及接口方法如下。

        消费者启动时,会初始化消费进度。如:根据集群模式创建OffsetStore接口的具体实现类;执行load()加载消费进度到内存中(offsetTable维护);执行5s定时周期任务,持久化消费进度

1. 广播模式消费进度存储

        org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore是广播模式消费进度存储的实现类,其关键属性如下。

// 本地存储目录,默认用户主目录/.rocketmq_offsets,通过-Drocketmq.client.localOffsetStoreDir配置
public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
    "rocketmq.client.localOffsetStoreDir",
    System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
// 存储文件,路径:LOCAL_OFFSET_STORE_DIR/.rocketmq_offsets/{mQClientFactory.getClientId()}/groupName/offsets.json
private final String storePath;
// 内存中消费队列的消费进度
private ConcurrentMap<MessageQueue/* 消费队列 */, AtomicLong/* 消费进度 */> offsetTable = new ConcurrentHashMap<MessageQueue, AtomicLong>();

        消费者启动时,会初始化消费进度。若是广播模式,则从文件中加载消费进度,LocalFileOffsetStore#load加载消费进度,如下代码所示。 

/**
 * 广播模式下加载消费进度文件(消费者启动时,会初始化消费进度)
 * 消费者启动入口:{@link DefaultMQPushConsumerImpl#start()}
 * step1:读取本地磁盘消费队列的消费进度到内存
 *        {@link LocalFileOffsetStore#readLocalOffset()}
 * step2:遍历,消费进度存储到内存中消费队列的消费进度{@link LocalFileOffsetStore#offsetTable}
 */
@Override
public void load() throws MQClientException {
    // 读取本地磁盘消费队列的消费进度到内存
    OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
    if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
        offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());

        // 打印日志
        for (Entry<MessageQueue, AtomicLong> mqEntry : offsetSerializeWrapper.getOffsetTable().entrySet()) {
            AtomicLong offset = mqEntry.getValue();
            log.info("load consumer's offset, {} {} {}",
                    this.groupName,
                    mqEntry.getKey(),
                    offset.get());
        }
    }
}

        消费者定时5s周期执行持久化消费进度到磁盘,LocalFileOffsetStore#persistAll,代码如下。

/**
 * 持久化所有消费进度到磁盘
 * 什么时候持久化呢?  MQ的客户端实例启动定时任务,每5s持久化一次
 * 定时任务{@link MQClientInstance#startScheduledTask()}5s执行persistAllConsumerOffset()方法
 */
@Override
public void persistAll(Set<MessageQueue> mqs) {
    if (null == mqs || mqs.isEmpty())
        return;

    // 构建对象,把指定的MessageQueue添加到offsetTable中,进行持久化
    OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
    for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
        if (mqs.contains(entry.getKey())) {
            AtomicLong offset = entry.getValue();
            offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
        }
    }

    // 转为json字符串
    String jsonString = offsetSerializeWrapper.toJson(true);
    if (jsonString != null) {
        try {
            MixAll.string2File(jsonString, this.storePath);
        } catch (IOException e) {
            log.error("persistAll consumer offset Exception, " + this.storePath, e);
        }
    }
}

2. 集群模式消费进度存储

        org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore是集群模式消费进度存储的实现类,其关键属性如下。

// 内存中消费队列的消费进度
private ConcurrentMap<MessageQueue/* 消费队列 */, AtomicLong/* 消费进度 */> offsetTable = new ConcurrentHashMap<MessageQueue, AtomicLong>();

        消费者启动时,会初始化消费进度。执行load()方法为空,即:集群模式下无需加载操作。成功消费该批消息后,执行OffsetStore#updateOffset方法更新本地消费队列的消费进度(内存消费进度)。而消费者定时5s周期执行持久化消费进度到磁盘,LocalFileOffsetStore#persistAll(发送到Broker更新内存消费进度)。同时,Broker端默认10s周期执行持久化一次消费进度,如下所示是集群模式下消费进度管理图 。

        org.apache.rocketmq.broker.offset.ConsumerOffsetManager是Broker的消费进度更新管理器,其关键属性如下。存储文件名为${RocketMQ_HOME}/store/config/consumerOffset.json。 

// 消费进度列表
protected ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer/* 消费队列ID */, Long/* 消费进度 */>> offsetTable = new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);

        RemoteBrokerOffsetStore#persist持久化指定消费队列的消费进度,其核心方法是updateConsumeOffsetToBroker(),其代码如下。

/**
 * Update the Consumer Offset synchronously, once the Master is off, updated to Slave, here need to be optimized.
 */
@Override
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
    MQBrokerException, InterruptedException, MQClientException {
    // 获取消费队列的Broker
    FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), MixAll.MASTER_ID, false);
    if (null == findBrokerResult) {
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
        findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(this.mQClientFactory.getBrokerNameFromMessageQueue(mq), MixAll.MASTER_ID, false);
    }

    if (findBrokerResult != null) {
        UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
        requestHeader.setTopic(mq.getTopic());
        requestHeader.setConsumerGroup(this.groupName);
        requestHeader.setQueueId(mq.getQueueId());
        requestHeader.setCommitOffset(offset);
        requestHeader.setBname(mq.getBrokerName());

        // 单向更新,无更新响应结果
        if (isOneway) {
            this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
                findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
        }
        // 不是单向,则返回是否更新成功
        else {
            this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
                findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
        }
    } else {
        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }
}

        org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#readOffset是读取Broker端的消费进度的核心方法,代码如下。

@Override
public long readOffset(final MessageQueue mq, final ReadOffsetType type) {
    if (mq != null) {
        switch (type) {
            // 先从内存中读取,再从磁盘读取
            case MEMORY_FIRST_THEN_STORE:
            // 从内存读取
            case READ_FROM_MEMORY: {
                AtomicLong offset = this.offsetTable.get(mq);
                if (offset != null) {
                    return offset.get();
                } else if (ReadOffsetType.READ_FROM_MEMORY == type) {
                    return -1;
                }
            }
            // 从磁盘读取
            case READ_FROM_STORE: {
                try {
                    // 从broker获取指定消费队列的消费偏移量
                    long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);
                    AtomicLong offset = new AtomicLong(brokerOffset);
                    this.updateOffset(mq, offset.get(), false);
                    return brokerOffset;
                }
                // No offset in broker
                catch (OffsetNotFoundException e) {
                    return -1;
                }
                //Other exceptions
                catch (Exception e) {
                    log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);
                    return -2;
                }
            }
            default:
                break;
        }
    }

    return -3;
}

四、参考资料

消费进度管理 | RocketMQ        

RocketMQ消费进度管理_fFee-ops的博客-CSDN博客_rokcetmq集群 消费进度

https://www.cnblogs.com/shanml/p/16989785.html

RocketMQ5.0.0消息消费<一> _ PUSH模式的消息拉取_爱我所爱0505的博客-CSDN博客

RocketMQ5.0.0消息消费<二> _ 消息队列负载均衡机制_爱我所爱0505的博客-CSDN博客