zl程序教程

您现在的位置是:首页 >  其他

当前栏目

消息发送和接收演示

2023-02-26 12:27:26 时间

导入依赖

         <dependency>             <groupId>org.apache.rocketmq</groupId>             <artifactId>rocketmq-spring-boot-starter</artifactId>             <version>2.0.2</version>         </dependency>

发送消息

消息发送步骤:

(福利推荐:阿里云、腾讯云、华为云服务器最新限时优惠活动,云服务器1核2G仅88元/年、2核4G仅698元/3年,点击这里立即抢购>>>

​ 创建消息消费者, 指定消费者所属的组名

​ 指定Nameserver地址

​ 指定消费者订阅的主题和标签

​ 设置回调函数,编写处理消息的方法

​ 启动消息消费者

代码示例:

package com.wxit.test;  import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message;  /**  * @Author wj  **/ //发送消息 public class RocketMQSendMessageTest {     public static void main(String[] args) throws Exception {         //1.创建消息生产者,并且设置生产组名         DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");          //2. 指定Nameserver地址         producer.setNamesrvAddr("192.168.91.4:9876");          //3.启动生产者         producer.start();          //4.创建消息对象,指定主题、标签和消息体         Message message = new Message("myTopic", "myTag", ("Test RocketMQ Message").getBytes());          //5.发送消息         SendResult sendResult = producer.send(message, 10000);         System.out.println(sendResult);          //6.关闭生产者         producer.shutdown();     } } 

接收消息

消息接收步骤:

​ 1. 创建消息消费者, 指定消费者所属的组名

​ 2. 指定Nameserver地址

​ 3. 指定消费者订阅的主题和标签

​ 4. 设置回调函数,编写处理消息的方法

​ 5. 启动消息消费者

代码示例:

package com.wxit.test;   import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt;  import java.util.List;  /**  * @Author wj  **/ //接收消息 public class RocketMQReceiveMessageTest {      //接收消息     public static void main(String[] args) throws Exception {          //1 创建消费者,并且为其指定消费者组名         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myproducer-group");          //2 为消费者设置NameServer的地址         consumer.setNamesrvAddr("192.168.91.4:9876");          //3 指定消费者订阅的主题和标签         consumer.subscribe("myTopic", "*");          //4 设置一个回调函数,并在函数中编写接收到消息之后的处理方法         consumer.registerMessageListener(new MessageListenerConcurrently() {             //处理获取到的消息             @Override             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {                 //消费逻辑                 System.out.println("Message===>" + list);                  //返回消费成功状态                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;             }         });          //5 启动消费者         consumer.start();         System.out.println("启动消费者成功了");     } } 

案例

接下来我们模拟一种场景: 下单成功之后,向下单用户发送短信。设计图如下:

订单微服务发送消息

1 在 shop-order 中添加rocketmq的依赖

        <dependency>             <groupId>org.apache.rocketmq</groupId>             <artifactId>rocketmq-spring-boot-starter</artifactId>             <version>2.0.2</version>         </dependency>         <dependency>             <groupId>org.apache.rocketmq</groupId>             <artifactId>rocketmq-client</artifactId>             <version>4.4.0</version>         </dependency>

2 添加配置

#rocketmq rocketmq:   name-server: 192.168.91.4:9876 #rocketMQ服务的地址   producer:     group: shop-order  # 生产者组

3 编写测试代码

@RestController @Slf4j public class OrderController {      @Autowired     private RestTemplate restTemplate;      @Autowired     private OrderService orderService;      @Autowired     private ProductService productService;      @Autowired     private DiscoveryClient discoveryClient;      @Autowired     private RocketMQTemplate rocketMQTemplate;      //下单 --ribbon自定义负载均衡     //fegin     @RequestMapping("/order/prod/{pid}")     public Order order(@PathVariable("pid") Integer pid){         log.info("接收到{}号商品的下单请求,接下来调用商品的微服务查询此商品信息",pid);          /**          * 调用商品微服务,查询商品信息          * 问题:          * 1.代码可读性不好          * 2.编程风格不统一          */         Product product = productService.findByPid(pid);          if (product.getPid() == -100){             Order order = new Order();             order.setOid(-100L);             order.setPname("下单失败");             return order;         }          log.info("查询到{}号商品的信息",pid);          //下单,创建订单         Order order = new Order();         order.setUid(1);         order.setUsername("测试用户");          order.setPid(pid);         order.setPname(product.getPname());         order.setPprice(product.getPprice());         order.setNumber(1);          orderService.createOrder(order);          log.info("创建订单成功,订单信息为{}",order);          //向mq中投递一个下单成功的信息         rocketMQTemplate.convertAndSend("order-topic",order);          return order;     }

用户微服务订阅消息

1 修改 shop-user 模块配置,导入依赖

        <dependency>             <groupId>com.alibaba.cloud</groupId>             <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>         </dependency>         <dependency>             <groupId>org.apache.rocketmq</groupId>             <artifactId>rocketmq-spring-boot-starter</artifactId>             <version>2.0.2</version>         </dependency>         <dependency>             <groupId>org.apache.rocketmq</groupId>             <artifactId>rocketmq-client</artifactId>             <version>4.4.0</version>         </dependency>

2 修改主类

package com.wxit;  import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient;  /**  * @Author wj  **/ @SpringBootApplication @EnableDiscoveryClient public class UserApplication {      public static void main(String[] args) {         SpringApplication.run(UserApplication.class);     } } 

3 修改配置文件

  cloud:    nacos:      discovery:        server-addr: 127.0.0.1:8848#rocrocketmq:  name-server: 192.168.91.4:9876

4 编写消息接收服务

package com.wxit.service;import com.wxit.domain.Order;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Service;/** * @Author wj [email protected]@[email protected](consumerGroup = "shop-user", topic = "order-topic")public class SmsService implements RocketMQListener<Order> {    @Override    public void onMessage(Order message) {        log.info("接收到了一个订单信息{},接下来就可以发送短信通知了", message);    }}

5 启动服务,执行下单操作,观看后台输出

消息发送和接收演示


本站部分内容转载自网络,版权属于原作者所有,如有异议请联系QQ153890879修改或删除,谢谢!
转载请注明原文链接:消息发送和接收演示

你还在原价购买阿里云、腾讯云、华为云、天翼云产品?那就亏大啦!现在申请成为四大品牌云厂商VIP用户,可以3折优惠价购买云服务器等云产品,并且可享四大云服务商产品终身VIP优惠价,还等什么?赶紧点击下面对应链接免费申请VIP客户吧:

1、点击这里立即申请成为腾讯云VIP客户

2、点击这里立即注册成为天翼云VIP客户

3、点击这里立即申请成为华为云VIP客户

4、点击这里立享阿里云产品终身VIP优惠价

喜欢 (0)
[[email protected]]
分享 (0)