《Apache RocketMQ用户指南》之有序的消息示例
2023-06-13 09:13:33 时间
RocketMQ使用FIFO队列提供有序消息.
以下示例演示发送/接收全局和分区有序消息。
发送消息示例代码public class OrderedProducer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. MQProducer producer = new DefaultMQProducer( example_group_name //Launch the instance. producer.start(); String[] tags = new String[] { TagA , TagB , TagC , TagD , TagE for (int i = 0; i 100; i++) { int orderId = i % 10; //Create a message instance, specifying topic, tag and message body. Message msg = new Message( TopicTestjjj , tags[i % tags.length], KEY + i, ( Hello RocketMQ + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List MessageQueue mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); }, orderId); System.out.printf( %s%n , sendResult); //server shutdown producer.shutdown();订阅消息简单示例代码
public class OrderedConsumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( example_group_name consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe( TopicTest , TagA || TagC || TagD consumer.registerMessageListener(new MessageListenerOrderly() { AtomicLong consumeTimes = new AtomicLong(0); @Override public ConsumeOrderlyStatus consumeMessage(List MessageExt msgs, ConsumeOrderlyContext context) { context.setAutoCommit(false); System.out.printf(Thread.currentThread().getName() + Receive New Messages: + msgs + %n this.consumeTimes.incrementAndGet(); if ((this.consumeTimes.get() % 2) == 0) { return ConsumeOrderlyStatus.SUCCESS; } else if ((this.consumeTimes.get() % 3) == 0) { return ConsumeOrderlyStatus.ROLLBACK; } else if ((this.consumeTimes.get() % 4) == 0) { return ConsumeOrderlyStatus.COMMIT; } else if ((this.consumeTimes.get() % 5) == 0) { context.setSuspendCurrentQueueTimeMillis(3000); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; return ConsumeOrderlyStatus.SUCCESS; consumer.start(); System.out.printf( Consumer Started.%n
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/93869.html
相关文章
- Apache中 RewriteRule 规则参数介绍
- Apache IoTDB 相关论文入选国际数据库顶级会议 ICDE 2022
- nginx php apache php 对比,Apache和nginx的比较「建议收藏」
- linux apache安装与配置_Apache配置
- Apache配置虚拟主机_apache启动但是访问不到
- 如何使用ApacheTomcatScanner扫描Apache Tomcat服务器漏洞
- 详解 Apache Pulsar 消息生命周期
- 详解 Apache Pulsar 消息生命周期
- 打造消息中台,华为终端云基于 Apache Pulsar 的演进实践
- windows apache部署SSL证书让网站支持https的配置方法
- apache用Linux服务器架设QQ五笔输入法服务:基于Apache技术(qq五笔linux)
- 服务Linux下重启Apache服务的步骤(linux重启apache)
- 结合开发,推动进步: Apache与MySQL的协作(apache和mysql)
- 搭配Apache MySQL:超强联手服务器助力(apache和mysql)
- Linux 下 Apache 服务器安装指南(linux安装apache)
- 的结合Apache和MySQL的完美结合(apache与mysql)
- apache深度探索Linux下的Apache服务器(linuxgt)
- 硬核观察 | 吴晟当选首位华人 Apache 软件基金会董事
- Apache服务器实现301重定向规则
- Apache与MySQL联动改变Web服务(apache跟mysql)
- 《Apache RocketMQ用户指南》之定时消息示例
- linux下安装apache与php;Apache+PHP+MySQL配置攻略
- Linux下Apache+Php4+Mysql的安装
- apache的多站点虚拟主机配置方法
- windows下实现定时重启Apache与MySQL方法
- apache在win2003下的安全设置方法
- 如何查看Apache的连接数和当前连接数
- 使用脚本实现故障时自动重启Apache
- Linux+php+apache+oracle环境搭建之CentOS下安装Apache