zl程序教程

您现在的位置是:首页 >  大数据

当前栏目

rocketmq延迟队列原理_rocketmq延迟队列原理

队列队列原理 延迟 rocketmq
2023-06-13 09:13:03 时间

大家好,又见面了,我是你们的朋友全栈君。

在java的延迟队列中,无法支持集群的延迟。 Redis可以做到对应的延迟功能,但是自己封装毕竟局限于业务。而且封装也需要耗费一定时间。 今天我们就讲一个现有的延迟队列,不仅支持分布式服务,而且解耦业务代码,而且支持不同延迟时间的造好的轮子吧。 ~ 那就是 RocketMQ 延时队列。

RocketMQ将延时队列的延时延时时间分为18个级别

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 分别对应下面的延迟时间,在使用时,直接传递 level即可。 messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 当然这个时间可以自己修改,如果不维护 则按照默认的

在发送MQ消息的时候只需要设置

Message.setDelayTimeLevel(delayLevel);

MQ发送的代码:

public class DelayMQProducerTest { 

public static void main(String[] args) throws MQClientException, InterruptedException { 

DefaultMQProducer producer = new DefaultMQProducer("delay_test_group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
try { 

for (int i = 0; i < 3; i++) { 

Message msg = new Message("Topic_Delay_Test",// topic
"Tag_Delay",// tag
(new Date() + "Topic_Delay_Test" + i).getBytes()// body
);
msg.setDelayTimeLevel(2); // 设置延迟级别为2 也就是 5s 
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
} catch (Exception e) { 

e.printStackTrace();
}
producer.shutdown();
}
}

接下来就跟进到代码里看是RocketMQ是如何是做到延迟发送消息的。

本人使用的是rocketMQ 4.2 下载地址

进入Message可以看到两个方法:

// 获取延迟等级
public int getDelayTimeLevel() { 

String t = this.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
if (t != null) { 

return Integer.parseInt(t);
}
return 0;
}
// 设置延迟等级
public void setDelayTimeLevel(int level) { 

this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
}

既然设置方法可以看到,那通过获取Level追看哪里使用,然后研究对应的实现。

在这里将topic和queueId替换为延迟队列的队列(SCHEDULE_TOPIC_XXXX),这样就保证消息不会立即被发送出去。 而是经过SCHEDULE_TOPIC_XXXX的特殊处理后,然后在发送到Consumer。

那在这里被替换后,是怎么保证延迟发送呢?

继续往下

由于对源码的不熟悉,也不了解,其实费了一些功夫,发现ScheduleMessageService.java 有start方法

 public void start() { 

for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { 

Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) { 

offset = 0L;
}
if (timeDelay != null) { 

this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
this.timer.scheduleAtFixedRate(new TimerTask() { 

@Override
public void run() { 

try { 

ScheduleMessageService.this.persist();
} catch (Throwable e) { 

log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}

从start方法中可以看到,这个时候就启动了定时器,开始从队列里获取数据了。 那么start方法是怎么被调用的呢?

在DefaultMessageStore中启动的。

接下来我们还是把注意力放在 ScheduleMessageService.start方法的执行过程吧。 通过源码追踪,就看到了这个方法

   public void executeOnTimeup() { 

ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;
if (cq != null) { 

SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) { 

try { 

long nextOffset = offset;
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { 

long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
if (cq.isExtAddr(tagsCode)) { 

if (cq.getExt(tagsCode, cqExtUnit)) { 

tagsCode = cqExtUnit.getTagsCode();
} else { 

//can't find ext content.So re compute tags code.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
tagsCode, offsetPy, sizePy);
long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}
long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = deliverTimestamp - now;
if (countdown <= 0) { 

MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);
if (msgExt != null) { 

try { 

MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
PutMessageResult putMessageResult =
ScheduleMessageService.this.defaultMessageStore
.putMessage(msgInner);
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { 

continue;
} else { 

// XXX: warn and notify me
log.error(
"ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
msgExt.getTopic(), msgExt.getMsgId());
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel,
nextOffset), DELAY_FOR_A_PERIOD);
ScheduleMessageService.this.updateOffset(this.delayLevel,
nextOffset);
return;
}
} catch (Exception e) { 

/* * XXX: warn and notify me */
log.error(
"ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
+ offsetPy + ",sizePy=" + sizePy, e);
}
}
} else { 

ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // end of for
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally { 

bufferCQ.release();
}
} // end of if (bufferCQ != null)
else { 

long cqMinOffset = cq.getMinOffsetInQueue();
if (offset < cqMinOffset) { 

failScheduleOffset = cqMinOffset;
log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
+ cqMinOffset + ", queueId=" + cq.getQueueId());
}
}
} // end of if (cq != null)
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}

但是注意处理的逻辑就在这里了。 如果到了延迟时间,就发送消息 否则就继续进行延迟返送。

总结,RocketMQ的延迟消息,使用起来方便,而且解耦代码,但是配置的延迟时间不够灵活。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/234086.html原文链接:https://javaforall.cn