消息发送和接收演示
2023-02-26 12:27:26 时间
导入依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.2</version> </dependency>
发送消息
消息发送步骤:
(福利推荐:阿里云、腾讯云、华为云服务器最新限时优惠活动,云服务器1核2G仅88元/年、2核4G仅698元/3年,点击这里立即抢购>>>)
创建消息消费者, 指定消费者所属的组名
指定Nameserver地址
指定消费者订阅的主题和标签
设置回调函数,编写处理消息的方法
启动消息消费者
代码示例:
package com.wxit.test; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; /** * @Author wj **/ //发送消息 public class RocketMQSendMessageTest { public static void main(String[] args) throws Exception { //1.创建消息生产者,并且设置生产组名 DefaultMQProducer producer = new DefaultMQProducer("myproducer-group"); //2. 指定Nameserver地址 producer.setNamesrvAddr("192.168.91.4:9876"); //3.启动生产者 producer.start(); //4.创建消息对象,指定主题、标签和消息体 Message message = new Message("myTopic", "myTag", ("Test RocketMQ Message").getBytes()); //5.发送消息 SendResult sendResult = producer.send(message, 10000); System.out.println(sendResult); //6.关闭生产者 producer.shutdown(); } }
接收消息
消息接收步骤:
1. 创建消息消费者, 指定消费者所属的组名
2. 指定Nameserver地址
3. 指定消费者订阅的主题和标签
4. 设置回调函数,编写处理消息的方法
5. 启动消息消费者
代码示例:
package com.wxit.test; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * @Author wj **/ //接收消息 public class RocketMQReceiveMessageTest { //接收消息 public static void main(String[] args) throws Exception { //1 创建消费者,并且为其指定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myproducer-group"); //2 为消费者设置NameServer的地址 consumer.setNamesrvAddr("192.168.91.4:9876"); //3 指定消费者订阅的主题和标签 consumer.subscribe("myTopic", "*"); //4 设置一个回调函数,并在函数中编写接收到消息之后的处理方法 consumer.registerMessageListener(new MessageListenerConcurrently() { //处理获取到的消息 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { //消费逻辑 System.out.println("Message===>" + list); //返回消费成功状态 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //5 启动消费者 consumer.start(); System.out.println("启动消费者成功了"); } }
案例
接下来我们模拟一种场景: 下单成功之后,向下单用户发送短信。设计图如下:
订单微服务发送消息
1 在 shop-order 中添加rocketmq的依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.2</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency>
2 添加配置
#rocketmq rocketmq: name-server: 192.168.91.4:9876 #rocketMQ服务的地址 producer: group: shop-order # 生产者组
3 编写测试代码
@RestController @Slf4j public class OrderController { @Autowired private RestTemplate restTemplate; @Autowired private OrderService orderService; @Autowired private ProductService productService; @Autowired private DiscoveryClient discoveryClient; @Autowired private RocketMQTemplate rocketMQTemplate; //下单 --ribbon自定义负载均衡 //fegin @RequestMapping("/order/prod/{pid}") public Order order(@PathVariable("pid") Integer pid){ log.info("接收到{}号商品的下单请求,接下来调用商品的微服务查询此商品信息",pid); /** * 调用商品微服务,查询商品信息 * 问题: * 1.代码可读性不好 * 2.编程风格不统一 */ Product product = productService.findByPid(pid); if (product.getPid() == -100){ Order order = new Order(); order.setOid(-100L); order.setPname("下单失败"); return order; } log.info("查询到{}号商品的信息",pid); //下单,创建订单 Order order = new Order(); order.setUid(1); order.setUsername("测试用户"); order.setPid(pid); order.setPname(product.getPname()); order.setPprice(product.getPprice()); order.setNumber(1); orderService.createOrder(order); log.info("创建订单成功,订单信息为{}",order); //向mq中投递一个下单成功的信息 rocketMQTemplate.convertAndSend("order-topic",order); return order; }
用户微服务订阅消息
1 修改 shop-user 模块配置,导入依赖
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.2</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency>
2 修改主类
package com.wxit; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; /** * @Author wj **/ @SpringBootApplication @EnableDiscoveryClient public class UserApplication { public static void main(String[] args) { SpringApplication.run(UserApplication.class); } }
3 修改配置文件
cloud: nacos: discovery: server-addr: 127.0.0.1:8848#rocrocketmq: name-server: 192.168.91.4:9876
4 编写消息接收服务
package com.wxit.service;import com.wxit.domain.Order;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Service;/** * @Author wj [email protected]@[email protected](consumerGroup = "shop-user", topic = "order-topic")public class SmsService implements RocketMQListener<Order> { @Override public void onMessage(Order message) { log.info("接收到了一个订单信息{},接下来就可以发送短信通知了", message); }}
5 启动服务,执行下单操作,观看后台输出
你还在原价购买阿里云、腾讯云、华为云、天翼云产品?那就亏大啦!现在申请成为四大品牌云厂商VIP用户,可以3折优惠价购买云服务器等云产品,并且可享四大云服务商产品终身VIP优惠价,还等什么?赶紧点击下面对应链接免费申请VIP客户吧:
相关文章
- 二手车行业临近洗牌时间
- 渠道加速,北汽极狐静待爆发
- 人工智能计算中心多点开花,为城市装上“最强大脑”
- 二手车行业等待Shopify
- 解码百度大脑“一叶红船见百年”AR互动:见证勇立潮头的“中国AI”
- “爆款制造机”盒马,刷新网红美食的孵化路径
- 领投有来医生,百度健康下了盘什么棋?
- 三年落地20座城市,平安智慧生活的硬核扩张
- DAY4-白雪
- SpringCloudAlibaba入门(2023版)
- R语言多元(多变量)GARCH :GO-GARCH、BEKK、DCC-GARCH和CCC-GARCH模型和可视化|附代码数据
- 2023-01-12:一个n*n的二维数组中,只有0和1两种值, 当你决定在某个位置操作一次, 那么该位置的行和列整体都会变成1,不管之前是什么状态。 返回让所
- 骑电动车不戴头盔识别抓拍系统
- Leetcode元初第一题, 1. two sum
- leetcode 链表初探 21. merge two sorted lists
- 如何自动邀请和主动邀请网站上的访客对话(附代码)
- LogicFlow更多配置选项
- LogicFlow自定义边(Edge)
- LogicFlow自定义业务节点
- 由浅入深,详解ViewModel的那些事