zl程序教程

您现在的位置是:首页 >  工具

当前栏目

RocketMQ学习(五):Pull和Push

学习 rocketmq PUSH pull
2023-09-14 09:02:12 时间

源代码版本是3.2.6。在rocketmq里,consumer被分为2类:MQPullConsumer和MQPushConsumer,其实本质都是拉模式(pull),即consumer轮询从broker拉取消息。

区别是:

push方式里,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。

pull方式里,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。

文字描述可能不是很清楚,前面的文章都是push方式的,所以这里只上pull方式的,贴代码:


properties

project.build.sourceEncoding UTF-8 /project.build.sourceEncoding

logback.version 1.0.13 /logback.version

rocketmq.version 3.2.6 /rocketmq.version

/properties

dependencies

dependency

groupId ch.qos.logback /groupId

artifactId logback-classic /artifactId

version 1.0.13 /version

/dependency

dependency

groupId ch.qos.logback /groupId

artifactId logback-core /artifactId

version 1.0.13 /version

/dependency

dependency

groupId com.alibaba.rocketmq /groupId

artifactId rocketmq-client /artifactId

version ${rocketmq.version} /version

/dependency

dependency

groupId junit /groupId

artifactId junit /artifactId

version 4.10 /version

scope test /scope

/dependency

/dependencies


package com.zoo.quickstart;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.client.producer.DefaultMQProducer;

import com.alibaba.rocketmq.client.producer.SendResult;

import com.alibaba.rocketmq.common.message.Message;

/**

* Producer,发送消息

*

*/

public class Producer {

public static void main(String[] args) throws MQClientException, InterruptedException {

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

producer.setNamesrvAddr("192.168.0.104:9876");

producer.start();

for (int i = 0; i 5; i++) {

try {

Message msg = new Message("TopicTest",// topic

"TagA",// tag

("Hello RocketMQ " + i).getBytes()// body

);

SendResult sendResult = producer.send(msg);

System.out.println(sendResult);

Thread.sleep(6000);

}

catch (Exception e) {

e.printStackTrace();

Thread.sleep(3000);

}

}

producer.shutdown();

}

}


import java.util.Set;

import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;

import com.alibaba.rocketmq.client.consumer.PullResult;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.common.message.MessageQueue;

/**

* PullConsumer,订阅消息

*/

public class PullConsumer {

private static final Map MessageQueue, Long offseTable = new HashMap MessageQueue, Long

public static void main(String[] args) throws MQClientException {

DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");

consumer.setNamesrvAddr("192.168.0.104:9876");

consumer.start();

Set MessageQueue mqs = consumer.fetchSubscribeMessageQueues("TopicTest");

for (MessageQueue mq : mqs) {

System.out.println("Consume from the queue: " + mq);

SINGLE_MQ: while (true) {

try {

PullResult pullResult =

consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);

System.out.println(pullResult);

putMessageQueueOffset(mq, pullResult.getNextBeginOffset());

switch (pullResult.getPullStatus()) {

case FOUND:

// TODO

break;

case NO_MATCHED_MSG:

break;

case NO_NEW_MSG:

break SINGLE_MQ;

case OFFSET_ILLEGAL:

break;

default:

break;

}

}

catch (Exception e) {

e.printStackTrace();

}

}

}

consumer.shutdown();

}

private static void putMessageQueueOffset(MessageQueue mq, long offset) {

offseTable.put(mq, offset);

}

private static long getMessageQueueOffset(MessageQueue mq) {

Long offset = offseTable.get(mq);

if (offset != null)

return offset;

return 0;

}

}


package com.zoo.quickstart.pull;

import com.alibaba.rocketmq.client.consumer.MQPullConsumer;

import com.alibaba.rocketmq.client.consumer.MQPullConsumerScheduleService;

import com.alibaba.rocketmq.client.consumer.PullResult;

import com.alibaba.rocketmq.client.consumer.PullTaskCallback;

import com.alibaba.rocketmq.client.consumer.PullTaskContext;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.common.message.MessageQueue;

import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;

public class PullScheduleService {

public static void main(String[] args) throws MQClientException {

final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1");

scheduleService.getDefaultMQPullConsumer().setNamesrvAddr("192.168.0.104:9876");

scheduleService.setMessageModel(MessageModel.CLUSTERING);

scheduleService.registerPullTaskCallback("TopicTest", new PullTaskCallback() {

@Override

public void doPullTask(MessageQueue mq, PullTaskContext context) {

MQPullConsumer consumer = context.getPullConsumer();

try {

// 获取从哪里拉取

long offset = consumer.fetchConsumeOffset(mq, false);

if (offset 0)

offset = 0;

PullResult pullResult = consumer.pull(mq, "*", offset, 32);

System.out.println(offset + "\t" + mq + "\t" + pullResult);

switch (pullResult.getPullStatus()) {

case FOUND:

break;

case NO_MATCHED_MSG:

break;

case NO_NEW_MSG:

case OFFSET_ILLEGAL:

break;

default:

break;

}

// 存储Offset,客户端每隔5s会定时刷新到Broker

consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());

// 设置再过100ms后重新拉取

context.setPullNextDelayTimeMillis(100);

}

catch (Exception e) {

e.printStackTrace();

}

}

});

scheduleService.start();

}

}


RocketMQ学习Broker流程、生产者和存储流程联系 放入消息之后,进行操作体现在asyncSendMessage中。将消息以异步方式存储到存储器中,处理器可以处理下一个请求,而不是在结果完成后等待结果,以异步方式通知客户端。此时可以看到asyncPutMessage的操作中会进入到CommitLog中,此时进行提交日志操作,此时会执行写入到ByteBuffer中,然后刷盘到硬盘中。同时执行统计操作,进行HA同步。
RocketMQ的NameServer执行流程学习梳理 首先NamesrvStartUp启动,首先经过main()方法,也是我们常见的main方法进入到main0()执行创建controller操作与启动controller操作这两个操作。而创建controller的操作则首先需要拿到namesrvConfig的配置信息和NettyServerConfig的配置信息,此时会 创建这两个对象,并填充配置信息然后放入到创建的controller对象中的构造函数中,并进行controller的启动操作,而启动操作首先会初始化一些信息和添加jvm钩子,也即会进行如下操作:加载键值对配置管理器、创建远程服务器remotingServer,创建远程线程池rem
rocketmq学习2 前面我们已经通过quickstrat可以看到nameServer的启动:从启动类中,我们看到:首先创建NamesrvConfig、nettyServerConfig,设置监听端口,将8888改成9876。填充NamesrvConfig、NettyServerConfig、BrokerConfig,获取namesrvAddr,创建Controller,注册钩子函数,启动start。 NamesrvController的属性信息、构造函数:
Rocketmq学习一 首先从github中拉取Rocketmq的代码,进行运行。 1.由于rocketmq需要依赖nameServer,类似于zookeeper。首先启动时,配置好NamesrvStartup的环境变量信息,也即rocketmq的ROCKEMQ_HOME与你的项目对应。接着就可以启动了。
读 RocketMQ 源码,学习并发编程三大神器 笔者是 RocketMQ 的忠实粉丝,在阅读源码的过程中,学习到了很多编程技巧。 这篇文章,笔者结合 RocketMQ 源码,分享并发编程三大神器的相关知识点。
自顶向下学习 RocketMQ(十):消息重投和消息重试 生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway 没有任何保证。消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在 RocketMQ 中是无法避免的问题。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。另外,生产者主动重发、consumer 负载变化也会导致重复消息。