zl程序教程

您现在的位置是:首页 >  工具

当前栏目

《Apache RocketMQ用户指南》之有序的消息示例

Apache消息 指南 用户 示例 有序 rocketmq
2023-06-13 09:13:33 时间

RocketMQ使用FIFO队列提供有序消息.

以下示例演示发送/接收全局和分区有序消息。

发送消息示例代码

public class OrderedProducer {

 public static void main(String[] args) throws Exception {

 //Instantiate with a producer group name.

 MQProducer producer = new DefaultMQProducer( example_group_name 

 //Launch the instance.

 producer.start();

 String[] tags = new String[] { TagA , TagB , TagC , TagD , TagE 

 for (int i = 0; i 100; i++) {

 int orderId = i % 10;

 //Create a message instance, specifying topic, tag and message body.

 Message msg = new Message( TopicTestjjj , tags[i % tags.length], KEY + i,

 ( Hello RocketMQ + i).getBytes(RemotingHelper.DEFAULT_CHARSET));

 SendResult sendResult = producer.send(msg, new MessageQueueSelector() {

 @Override

 public MessageQueue select(List MessageQueue mqs, Message msg, Object arg) {

 Integer id = (Integer) arg;

 int index = id % mqs.size();

 return mqs.get(index);

 }, orderId);

 System.out.printf( %s%n , sendResult);

 //server shutdown

 producer.shutdown();



订阅消息简单示例代码


public class OrderedConsumer {

 public static void main(String[] args) throws Exception {

 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( example_group_name 

 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

 consumer.subscribe( TopicTest , TagA || TagC || TagD 

 consumer.registerMessageListener(new MessageListenerOrderly() {

 AtomicLong consumeTimes = new AtomicLong(0);

 @Override

 public ConsumeOrderlyStatus consumeMessage(List MessageExt msgs,

 ConsumeOrderlyContext context) {

 context.setAutoCommit(false);

 System.out.printf(Thread.currentThread().getName() + Receive New Messages: + msgs + %n 

 this.consumeTimes.incrementAndGet();

 if ((this.consumeTimes.get() % 2) == 0) {

 return ConsumeOrderlyStatus.SUCCESS;

 } else if ((this.consumeTimes.get() % 3) == 0) {

 return ConsumeOrderlyStatus.ROLLBACK;

 } else if ((this.consumeTimes.get() % 4) == 0) {

 return ConsumeOrderlyStatus.COMMIT;

 } else if ((this.consumeTimes.get() % 5) == 0) {

 context.setSuspendCurrentQueueTimeMillis(3000);

 return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;

 return ConsumeOrderlyStatus.SUCCESS;

 consumer.start();

 System.out.printf( Consumer Started.%n 




原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/93869.html