zl程序教程

您现在的位置是:首页 >  Python

当前栏目

RocketMQ源码详解:事务消息、批量消息、延迟消息

2023-03-20 14:52:49 时间

◆ 概述

在上文中,我们讨论了消费者对于消息拉取的实现,对于

这个黑盒的心脏部分,我们顺着消息的发送流程已经将其剖析了大半部分。本章我们不妨乘胜追击,接着讨论各种不同的消息的原理与实现。

◆ 事务消息

◆ 概念

RocketMQ 中的事务消息功能,实际上是 分布式事务中的本地事务表 的实现,只不过,在这里用消息中间件来代替了数据库,同时也帮我们做好了回查的操作。

在这点上,RocketMQ 和 Kafka 是截然不同的,kafka 的事务是用来实现 Exacltly Once 语义,且该语义主要用来流计算中,即在 "从 Topic 中读 -> 计算 -> 存到 Topic" 保证不被重复计算。

◆ 事务流程

  • 客户端发送 half 消息

吐槽一下为什么要叫半消息(half message),叫 prepare 消息不是更直观吗

  1. Broker 将 half 消息持久化
  2. 客户端根据事务执行结果,发送 Commit / Rollback 消息
  3. Broker 收到 Commit 时,将事务消息对消费者可见。收到 Rollback 时,将消息丢弃

◆ 补偿

  • Broker 过久未收到事务执行结果,询问客户端执行结果
  • 客户端收到结果查询请求,执行回查方法,发送 Commit / Rollback 方法
  • Broker 根据事务执行结果做出对应处理

◆ 源码流程

◆ 第一步

在设置好了事务监听器后(执行事务 与 事务回查),就可以发送事务消息

在将事务消息交给发送方法后,客户端首先会为消息添加事务消息的标识

MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");

然后将该事务消息会像普通的同步消息一样发送(且是同步发送)

sendResult = this.send(msg);

具体发送流程见:RocketMQ源码详解 | Producer篇 · 其一:Start,然后 Send 一条消息

◆ 第二步

在 Broker 端接收到消息以后,会走与普通消息相同的底层通道(因为这个消息本身就只是个加上了 事务flag 的普通消息),然后由 TransactionalMessageService 来对这个消息进行额外处理。

首先会对该消息放入 real topic 属性和 real queue 属性,然后将消息 Topic 替换为用于处理所有事务消息的特殊的 Topic,当然该 Topic 对消费者是不可见的。

private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {  MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());  MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,                              String.valueOf(msgInner.getQueueId()));  // 设置标记为未收到结果  msgInner.setSysFlag(    MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));  // 替换到特殊的 Topic (RMQ_SYS_TRANS_HALF_TOPIC)  msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());  msgInner.setQueueId(0);  msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));  return msgInner;}

完成后,会送到 MessageStore 像普通消息一样处理

普通消息的具体流程见 RocketMQ源码详解 | Broker篇 · 其二:文件系统

◆ 第三步

回到 Producer 端,在事务消息发送完成后,该方法会使用专门的线程池执行事务

// 2.执行本地事务,更新事务获取状态localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);

然后对本地的事务执行状态进行处理,也就是将该执行状态上报

this.endTransaction(msg, sendResult, localTransactionState, localException);

这里会发送一条 oneway 命令给 Broker 端,且使用的是 RequestCode.END_TRANSACTION 请求码

// 事务结果报告(可能是 commit 或 rollback)public static final int END_TRANSACTION = 37;

完成处理后,该方法会将事务的发送结果和本地事务的执行结构都返回给上层 API

◆ 第四步

在 Broker 端,这里会由 EndTransactionProcessor 处理器来处理该请求码

然后,根据事务的执行结果来做不同的处理

if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {  // 事务执行成功,尝试完成事务   // 获取 half 消息  result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);  if (result.getResponseCode() == ResponseCode.SUCCESS) {    if (res.getCode() == ResponseCode.SUCCESS) {      // 将 half 消息取出,构造真实消息,然后投入实际上的 Topic      /* pass */            RemotingCommand sendResult = sendFinalMessage(msgInner);            if (sendResult.getCode() == ResponseCode.SUCCESS) {        /*         * 找到半消息,进行删除         * 删除并不是物理上的删除,因为物理上的删除的代价十分的高昂,而是写入一条具有相同事务id的消息到 op Topic         */        this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());      }      return sendResult;    }    return res;  }}

如果需要回滚,则对相应的半消息进行删除,且和上面一样,并不是物理上的删除,而是发送具有相同事务 id 的消息到 OP Topic,来标记这个事务已经完成了(Commit/Rollback), OP Topic 也是一个特殊的 Topic,同样对消费者不可见。

if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {  // 事务执行失败,进行 half 消息的回滚   // 首先找到 half 消息  result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);  if (result.getResponseCode() == ResponseCode.SUCCESS) {    RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);    if (res.getCode() == ResponseCode.SUCCESS) {      // 进行删除      this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());    }    return res;  }}

当这些都做完后,一次事务就完成了。

◆ 补偿

当然啦,以上是顺利的情况,我们当然不能指望事务每一次都能执行成功、网络分区和宕机事件永远不会发生。

在一段时间后,如果客户端没有对事务的状态进行上报(或者上报的状态不是 Commit 或 Rollback,而是 Unknown), Broker 端当然就要进行事务状态的回查。

在 BrokerController 启动的时候,会开启事务状态检测服务,该服务会通过循环调用 TransactionalMessageServiceImpl.check() 方法,不断的扫描未结束的事务,同时对超过指定时间还不知道状态的事务进行回查操作。

check() 方法是事务回查的核心,由于很长,我们先来看第一部分(删减了没人在意的 Log)

// 首先找到存储所有 half 消息的 TopicString topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);// 对其中每一个 queue 进行检查for (MessageQueue messageQueue : msgQueues) {  long startTime = System.currentTimeMillis();   // 获得对应的 op 消息所在的 queue  MessageQueue opQueue = getOpQueue(messageQueue);  // 获取未处理的 half 消息的起始偏移量  long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);  // 获取 op 消息的 queue 的起始偏移量  long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);   // 用来记录已经被处理了的 op 消息的偏移量  List<Long> doneOpOffset = new ArrayList<>();  // 用来记录已经完成了的 half 消息的偏移量  // key: halfOffset, value: opOffset  HashMap<Long, Long> removeMap = new HashMap<>();   PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);

在 fillOpRemoveMap 方法中,主要是将 op 消息取出,来标记可以被移除的 half 消息(op 消息的存在代表对应事务的结束)

/** * 读取op消息,解析op消息,填充removeMap * * @param removeMap 要删除的半消息,key: halfOffset,value: opOffset * @param opQueue Op message queue. * @param pullOffsetOfOp op message queue 的起始偏移量 * @param miniOffset half message queue 的当前最小偏移量 * @param doneOpOffset 存储已处理的 op 消息 * @return 获取到的 Op 消息 */private PullResult fillOpRemoveMap(HashMap<Long, Long> removeMap,                                   MessageQueue opQueue, long pullOffsetOfOp, long miniOffset, List<Long> doneOpOffset) {  // 首先通过 queue 获取 op 消息,最大数量为 32 条  PullResult pullResult = pullOpMsg(opQueue, pullOffsetOfOp, 32);    /* pass: pullResult 消息的意外状态的处理 */   List<MessageExt> opMsg = pullResult.getMsgFoundList();  for (MessageExt opMessageExt : opMsg) {    // op 消息的 body 存储的是对应的 half 消息的偏移量, 现在将其取出    Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset));    // 感觉这里的 Tag 并没有什么意义,无论是 Commit 还是 Rollback 都会加入这个 Tag    if (TransactionalMessageUtil.REMOVETAG.equals(opMessageExt.getTags())) {      // 在 已处理偏移量 之前的话则可直接放入 已处理偏移量集合      if (queueOffset < miniOffset) {        doneOpOffset.add(opMessageExt.getQueueOffset());      } else {        // 否则放入需要移除的 half 的消息的集合        removeMap.put(queueOffset, opMessageExt.getQueueOffset());      }    }  }  return pullResult;}

然后进入到 check 方法的第二部分

while (true) {  if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) break;    // 推进最小已处理偏移量  if (removeMap.containsKey(i)) /* 如果该 half 消息存在对应的 op 消息,说明已经被处理了(commit/rollback) */ {    // 取出放入到已处理偏移量队列    Long removedOpOffset = removeMap.remove(i);    doneOpOffset.add(removedOpOffset);   } else /* 否则说明当前 half 消息悬而未决  */ {    // 取出对应的半消息    GetResult getResult = getHalfMsg(messageQueue, i);		    /* pass: 半消息不存在时的意外处理 */     /*     * 检测是否要丢弃或跳过     *   丢弃条件: 当前事务已经超过了最大回查次数(15次)     *   跳过条件: 已经超过了过期文件最大保留时间(72小时)     */    if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {      // 处理并推进偏移量      // 具体的处理方法是: 投入 TRANS_CHECK_MAX_TIME_TOPIC 这个 Topic,等待手动处理      listener.resolveDiscardMsg(msgExt);            // 进入到下一个 half 消息      newOffset = i + 1;      i++;      continue;    }    if (msgExt.getStoreTimestamp() >= startTime) {      break;    }

上面的方法很好理解,只是对于已经被标记结束的事务的处理、和未结束事务的补足

接下来是第三部分,这里将继续对未结束事务的补足,与进行可能的回查操作

  // half 消息具有最小的检查时间(免疫时间), 检测时间以内可以跳过回查, 重新投入 half 消息的 Topic  long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();  long checkImmunityTime = transactionTimeout;  String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);  if (null != checkImmunityTimeStr) {    checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);    if (valueOfCurrentMinusBorn < checkImmunityTime) {      if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {        newOffset = i + 1;        i++;        continue;      }    }  } else {    if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {      break;    }  }    /*   * 对于当前事务的回查操作,需要满足三个条件之一   *  1.当前 op 消息的集合为空,且已经超过了最小检查时间(免疫时间)   *  2.最大偏移量的 op 消息的生成时间 已经超过了 最小检查时间   *  3.关闭最小检查时间   */  List<MessageExt> opMsg = pullResult.getMsgFoundList();  boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)    || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))    || (valueOfCurrentMinusBorn <= -1);   if (isNeedCheck) {    // 先将当前 half 消息放回    if (!putBackHalfMsgQueue(msgExt, i)) {      continue;    }    // 然后向 Product 发送检测消息    listener.resolveHalfMsg(msgExt);  } else {    // 否则更新 op 消息集合,以确保能够断言该 half 消息的状态    pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);    continue;  }} newOffset = i + 1; i++;}

上面这段代码主要围绕 "是否进行回查" 展开,且涉及到 "免疫时间"。

在一个事务消息被发送后,对应事务的执行当然需要一定的执行时间,如果我们不设置这个时间立刻进行回查,那么很有可能时候事务还没执行完,对于大多数情况下还没执行完的事务进行回查,毫无疑问带来的收益很低。所以我们需要设定一个时间,在这个时间内的事务先暂时不回查,这个时间就叫做"免疫时间"。

然后再来看下需要进行回查的三种情况:

  1. 当 op 消息的集合为空,说明当前还没有收到让当前事务结束的通知,且超过了"免疫时间",故回查
  2. 当前 op 消息最大偏移量的生成时间超过了"免疫时间",说明该事务的提交消息可能丢失了,故回查
  3. 不启用 "免疫时间"

其中发送的回查消息的请求码为 RequestCode.CHECK_TRANSACTION_STATE ,发送的也是 oneway 消息

最后的第四部分,同时更新 half 和 op 消息在 Queue 中的偏移量

// 对所有的 half 消息计算完成后,更新偏移量if (newOffset != halfOffset) {  transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);}// 根据已经被标记为完成的 op 消息更新偏移量long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);if (newOpOffset != opOffset) {  // 如果不等,说明并不是所有的 op 消息都被标记为完成了  // 所以我们只将偏移量更新到第一个未完成的 op 消息的位置,其后面的 op 消息会在下次重复处理  transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);}

然后在 Producer 这边,将由 ClientRemotingProcessor.checkTransactionState() 来处理回查操作

// 获取事务 IDString transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (null != transactionId && !"".equals(transactionId)) {    messageExt.setTransactionId(transactionId);}final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);if (group != null) {    // 从 MQClientFactory 找到注册的对应 Producer    MQProducerInner producer = this.mqClientFactory.selectProducer(group);    if (producer != null) {        final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());        // 让 Producer 检查在对应 IP 上的事务状态        producer.checkTransactionState(addr, messageExt, requestHeader);    } else {        log.debug("checkTransactionState, pick producer by group[{}] failed", group);    }} else {    log.warn("checkTransactionState, pick producer group failed");}

再进入 producer.checkTransactionState() 看看 Producer 是怎样检查事务状态的

TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();// 取出当前 Producer 的事务监听器TransactionListener transactionListener = getCheckListener();if (transactionCheckListener != null || transactionListener != null) {  LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;  Throwable exception = null;  try {    if (transactionCheckListener != null) {      // 调用其的事务回查方法      localTransactionState = transactionCheckListener.checkLocalTransactionState(message);    } else if (transactionListener != null) {      log.debug("Used new check API in transaction message");      localTransactionState = transactionListener.checkLocalTransaction(message);    }  } catch (Throwable e) {    log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);    exception = e;  }   // 再将事务执行结果其发回给 Broker  this.processTransactionState(    localTransactionState,    group,    exception);} else {  log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);}

最后发回的方法做的事情和在一开始发送事务状态的方法,所做的事情是一样的。Broker 做的处理也是一样的。

这样,补偿流程就执行完了。

◆ 批量消息

◆ 概念

在消息队列中,批量消息也是一个重要的部分,将消息压缩在一起发送不仅可以减少带宽的消耗,还能节省头部占用的空间。

有点失望的是,RocketMQ 对于批量消息的实现有点"粗糙"了

◆ 源码流程

首先,在调用 send() 的 batch 版本后,会先对批量消息进行校验

批量消息不允许延时、不允许发送到重试 Topic,且要求发送到的 Topic 必须是同一个 Topic

List<Message> messageList = new ArrayList<Message>(messages.size());Message first = null;for (Message message : messages) {  if (message.getDelayTimeLevel() > 0) {    throw new UnsupportedOperationException("TimeDelayLevel is not supported for batching");  }  if (message.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {    throw new UnsupportedOperationException("Retry Group is not supported for batching");  }  if (first == null) {    first = message;  } else {    if (!first.getTopic().equals(message.getTopic())) {      throw new UnsupportedOperationException("The topic of the messages in one batch should be the same");    }    if (first.isWaitStoreMsgOK() != message.isWaitStoreMsgOK()) {      throw new UnsupportedOperationException("The waitStoreMsgOK of the messages in one batch should the same");    }  }  messageList.add(message);}MessageBatch messageBatch = new MessageBatch(messageList);

在校验完成,且都放到一个 List 之后,接下来的步骤和普通的消息发送都差不多,只是在编码上理所当然的存在着不同

public static byte[] encodeMessages(List<Message> messages) {  //TO DO refactor, accumulate in one buffer, avoid copies  List<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());  int allSize = 0;  for (Message message : messages) {    // 编码每一个消息    byte[] tmp = encodeMessage(message);    encodedMessages.add(tmp);    allSize += tmp.length;  }   // 放到最后的大集合中  byte[] allBytes = new byte[allSize];  int pos = 0;  for (byte[] bytes : encodedMessages) {    System.arraycopy(bytes, 0, allBytes, pos, bytes.length);    pos += bytes.length;  }  return allBytes;}

然后使用 RequestCode.SEND_BATCH_MESSAGE 这个状态码发送出去。

在 Broker 端,其投入的过程大体上和普通消息类似,但是其最后的持久化到硬盘时,这块批量消息被拆分为了普通的单条消息。

即 RocketMQ 使用批量消息只减少了发送时的宽带传输,对于存储与交给消费者的部分并没有获得优化

// 拆分批量消息为每一个普通消息while (messagesByteBuff.hasRemaining()) {  // 1 TOTALSIZE  final int msgPos = messagesByteBuff.position();  final int msgLen = messagesByteBuff.getInt();  final int bodyLen = msgLen - 40; //only for log, just estimate it    /*  pass: 当作普通消息存储   */    queueOffset++;  msgNum++;  messagesByteBuff.position(msgPos + msgLen);}

◆ 延时消息

◆ 概念

在业务中,有时候有一些延时提交任务的需求,这时候就可以使用延时消息,即在投递一部分时间后才对消费者可见。

不过,在 RocketMQ 中,延迟级别并不支持自定义,而是具有固定的延迟级别。

不过商业版的 阿里云MQ 可以支持秒精度的自定义延迟时间,果然是为了阉割社区版来赚钱吗

◆ 源码流程

RocketMQ 对于延时消息的处理主要在于 Broker 端,所以我们只需要看在 Broker 对延时级别的处理。

首先,在 CommitLog 的 put 中,会对延迟级别进行判断,如果存在,会在这进行进行 Topic 的替换,将其存储到对应的延迟级别的 Queue

if (msg.getDelayTimeLevel() > 0) {  if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());  }   topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;  queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());   // Backup real topic, queueId  MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());  MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));  msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));   msg.setTopic(topic);  msg.setQueueId(queueId);}

然后会被在 DefaultMessageStore 中初始化的 ScheduleMessageService 处理

首先,该服务在启动时会进行初始化

public void start() {  // 保证只被执行一次  if (started.compareAndSet(false, true)) {    // 加载本地快照    super.load();    this.timer = new Timer("ScheduleMessageTimerThread", true);    for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {      // 取出每一个级别      Integer level = entry.getKey();      // 当前延迟级别对应的延迟时间      Long timeDelay = entry.getValue();      // 该延迟级别之前消费到的自己的队列的偏移量      Long offset = this.offsetTable.get(level);      if (null == offset) {        offset = 0L;      }       // 每一个延迟级别设置一个定时任务      if (timeDelay != null) {        this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);      }    }     // 定时持久化各个延迟级别的偏移量    this.timer.scheduleAtFixedRate(new TimerTask() {       @Override      public void run() {        try {          if (started.get()) ScheduleMessageService.this.persist();        } catch (Throwable e) {          log.error("scheduleAtFixedRate flush exception", e);        }      }    }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());  }}

每一个延迟级别的 Queue 都有对应的定时任务,且都会执行以下方法

public void executeOnTimeup() {  // 找到自己延迟级别的消费队列  ConsumeQueue cq =    ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,                                                                     delayLevel2QueueId(delayLevel));  long failScheduleOffset = offset;  if (cq != null) {    // 根据消费偏移量将指定的 MappedFile 文件加载进来    SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);    if (bufferCQ != null) {      try {        long nextOffset = offset;        int i = 0;        // 遍历每一个消息的索引        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {          long offsetPy = bufferCQ.getByteBuffer().getLong();          int sizePy = bufferCQ.getByteBuffer().getInt();          long tagsCode = bufferCQ.getByteBuffer().getLong();           /* pass  */           long now = System.currentTimeMillis();          long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);           nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);           long countdown = deliverTimestamp - now;          if (countdown <= 0) /* 目标时间小于当起时间,可以执行 */ {            // 根据偏移量取出消息            MessageExt msgExt =              ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(              offsetPy, sizePy);             if (msgExt != null) {              try {                // 将延迟消息恢复成原本消息的样子                MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);                /* pass */                                // 投入真实的 Topic                PutMessageResult putMessageResult =                  ScheduleMessageService.this.writeMessageStore                  .putMessage(msgInner);                 /* pass: 更新度量信息  */              } catch (Exception e) {                /* pass */              }            }          } else /* 否则,这个消息需要被消费的时间到了再通知我 */ {            ScheduleMessageService.this.timer.schedule(              new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),              countdown);            // 更新消费偏移量            ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);            return;          }        } // end of for         // 走到这里,说明暂时没有需要消费的延时消息        nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);        // 小睡一会        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(          this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);        return;      } finally {        bufferCQ.release();      }    } // end of if (bufferCQ != null)    /* pass */  } // end of if (cq != null)  ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,                                                                                failScheduleOffset), DELAY_FOR_A_WHILE);}

可以看出,延迟消息的实现还是十分简单的,由于先投入的延时消息必先快于后投入的消息的到期,所以只需要不断的拉取各个延迟级别对应的队列 的头部的延迟消息即可。这也是只支持固定级别的延迟消息带来的好处。

来源:

https://www.cnblogs.com/enoc/p/rocketmq-so-no-nana.html

“IT大咖说”欢迎广大技术人员投稿,投稿邮箱:aliang@itdks.com