RocketMQ学习(一):简介和QuickStart
支持严格的消息顺序
支持Topic与Queue两种模式
亿级消息堆积能力
比较友好的分布式特性
同时支持Push与Pull方式消费消息
历经多次天猫双十一海量消息考验
RocketMQ是纯java编写,基于通信框架Netty。
代码地址:https://github.com/alibaba/RocketMQ,目前分支是3.2.2 develop。
下载完代码后,将各个模块导入eclipse,本地尝试启动看看。
1.启动nameServer,运行rocketmq-namesrv的NamesrvStartup,运行之前需设置环境变量ROCKETMQ_HOME为RocketMQ项目的根目录,这样有一个作用是,指向logback的配置文件路径,保证在nameServer启动时,logback的正常初始化。我本机设置的是:ROCKETMQ_HOME=C:\Users\Administrator\git\RocketMQ。
The Name Server boot success. 表示启动成功。
2.启动brokerServer,运行rocketmq-broker的BrokerStartup,同样,运行之前需设置环境变量ROCKETMQ_HOME,然后启动参数需要带上【-n “192.168.0.109:9876″】,我本机的ip是192.168.0.109。如果不带-n的参数,那么broker会去访问http://jmenv.tbsite.net:8080/rocketmq/nsaddr获取nameServer的地址,这个地址不是我们自己的nameServer。
The broker[LENOVO-PC, 192.168.0.109:10911] boot success. and name server is 192.168.0.109:9876表示成功。
3.这个非必选项,不运行也可以。还可以启动rocketmq-srvutil的FiltersrvStartup,这是Consumer使用Java代码,在服务器做消息过滤。启动方式和broker一样,具体的过滤原理以后再详细的说。
到此就可以运行demo了。
pom.xml依赖:
groupId ch.qos.logback /groupId
artifactId logback-classic /artifactId
version 1.0.13 /version
/dependency
dependency
groupId ch.qos.logback /groupId
artifactId logback-core /artifactId
version 1.0.13 /version
/dependency
dependency
groupId com.alibaba.rocketmq /groupId
artifactId rocketmq-client /artifactId
version 3.2.2 /version
/dependency
dependency
groupId junit /groupId
artifactId junit /artifactId
version 4.10 /version
scope test /scope
/dependency
/dependencies
name Nexus /name
url http://maven.oschina.net/content/groups/public/ /url
releases
enabled true /enabled
/releases
snapshots
enabled true /enabled
/snapshots
/repository
/repositories
package com.zoo.quickstart;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
/**
* Producer,发送消息
*
*/
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("192.168.0.109:9876");
producer.start();
for (int i = 0; i 1000; i++) {
try {
Message msg = new Message("TopicTest",// topic
"TagA",// tag
("Hello RocketMQ " + i).getBytes()// body
);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
Thread.sleep(3000);
}
catch (Exception e) {
e.printStackTrace();
Thread.sleep(3000);
}
}
producer.shutdown();
}
}
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
/**
* Consumer,订阅消息
*/
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("192.168.0.109:9876");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 br
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List MessageExt msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
System.out.println(" Receive Message Size: " + msgs.size());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}
因为demo代码来自于rocketmq-example,所以没有上传Github。
ps:以前rocketmq在Github开源的时候没有学习,后来突然有一天发现Github上404了,心里后悔莫急,这次rocketmq重新开源出来,一定不能错过了。
RocketMQ学习Broker流程、生产者和存储流程联系 放入消息之后,进行操作体现在asyncSendMessage中。将消息以异步方式存储到存储器中,处理器可以处理下一个请求,而不是在结果完成后等待结果,以异步方式通知客户端。此时可以看到asyncPutMessage的操作中会进入到CommitLog中,此时进行提交日志操作,此时会执行写入到ByteBuffer中,然后刷盘到硬盘中。同时执行统计操作,进行HA同步。
RocketMQ的NameServer执行流程学习梳理 首先NamesrvStartUp启动,首先经过main()方法,也是我们常见的main方法进入到main0()执行创建controller操作与启动controller操作这两个操作。而创建controller的操作则首先需要拿到namesrvConfig的配置信息和NettyServerConfig的配置信息,此时会 创建这两个对象,并填充配置信息然后放入到创建的controller对象中的构造函数中,并进行controller的启动操作,而启动操作首先会初始化一些信息和添加jvm钩子,也即会进行如下操作:加载键值对配置管理器、创建远程服务器remotingServer,创建远程线程池rem
rocketmq学习2 前面我们已经通过quickstrat可以看到nameServer的启动:从启动类中,我们看到:首先创建NamesrvConfig、nettyServerConfig,设置监听端口,将8888改成9876。填充NamesrvConfig、NettyServerConfig、BrokerConfig,获取namesrvAddr,创建Controller,注册钩子函数,启动start。 NamesrvController的属性信息、构造函数:
Rocketmq学习一 首先从github中拉取Rocketmq的代码,进行运行。 1.由于rocketmq需要依赖nameServer,类似于zookeeper。首先启动时,配置好NamesrvStartup的环境变量信息,也即rocketmq的ROCKEMQ_HOME与你的项目对应。接着就可以启动了。
读 RocketMQ 源码,学习并发编程三大神器 笔者是 RocketMQ 的忠实粉丝,在阅读源码的过程中,学习到了很多编程技巧。 这篇文章,笔者结合 RocketMQ 源码,分享并发编程三大神器的相关知识点。
自顶向下学习 RocketMQ(十):消息重投和消息重试 生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway 没有任何保证。消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在 RocketMQ 中是无法避免的问题。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。另外,生产者主动重发、consumer 负载变化也会导致重复消息。
相关文章
- 分布式学习和联邦学习简介
- CDO学习1 CDO简介[通俗易懂]
- Cobaltstrike 学习笔记(一)简介与安装
- 主动学习(Active Learning)简介综述汇总以及主流技术方案
- Netty框架学习之(一):Netty框架简介
- 【Flutter】Flutter Gallery 官方示例简介 ( 学习示例 | 邮件应用 | 零售应用 | 理财应用 | 旅行应用 | 新闻应用 | 自适应布局应用 )
- SQL Server中Check约束的学习教程
- Struts2学习笔记一 简介及入门程序详解编程语言
- 训练课程广州尚观创造Linux学习新体验(广州尚观linux)
- Terraform 学习总结 — Terraform 简介
- JavaScriptDOM学习第五章表单简介
- php学习资料零碎东西
- php学习笔记之函数声明(二)
- 深入DropDownList用法的一些学习总结分析