02_RabbitMQ【work模式】
2023-09-14 09:14:08 时间
RabbitMQ Tutorials — RabbitMQhttps://rabbitmq.com/getstarted.html
1. work模式
同一个消息只能被一个客户端所获取
2. 生产者
发送100条消息到队列,并且每次发送消息后sleep i*10 毫秒;
public class Send {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道s
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 50; i++) {
// 消息内容
String message = "" + i;
channel.basicPublish("", QUEUE_NAME, null, new Product(i, "哈哈").toString().getBytes());
System.out.println(" [x] Sent '" + message + "'");
Thread.sleep(i * 3);
}
channel.close();
connection.close();
}
}
public class Product implements Serializable {
private Integer id;
private String productName;
private Integer status;
private Double price;
private String productDesc;
private String caption;
public Product() {
}
public Product(Integer id, String productName) {
this.id = id;
this.productName = productName;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getProductName() {
return productName;
}
public void setProductName(String productName) {
this.productName = productName;
}
public Integer getStatus() {
return status;
}
public void setStatus(Integer status) {
this.status = status;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
public String getProductDesc() {
return productDesc;
}
public void setProductDesc(String productDesc) {
this.productDesc = productDesc;
}
public String getCaption() {
return caption;
}
public void setCaption(String caption) {
this.caption = caption;
}
@Override
public String toString() {
return "Product{" +
"id=" + id +
"---productName=" + productName +
'}';
}
}
3. 消费者1
从队列中获取消息,每次获取到消息后,休眠10毫秒,这个消费者消费消息比较快。
public class Recv1 {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
//channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
// channel.basicConsume(QUEUE_NAME, false, consumer);
channel.basicConsume(QUEUE_NAME, true, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
//休眠
Thread.sleep(10);
// 返回确认状态
// channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
4. 消费者2
从队列中获取消息,每次获取到消息后,休眠1000毫秒,这个消费者消费消息比较慢。
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
//channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
// channel.basicConsume(QUEUE_NAME, false, consumer);
channel.basicConsume(QUEUE_NAME, true, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
//休眠
Thread.sleep(50);
// 返回确认状态
// channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
5. 测试结果
结果:消费者1和消费者2获取到的消息是一样多,各50个。 这样是否合理? -- 不合理的。
6. work模式中的能者多劳
实际这种情况更加的合理。
7. 自动和手动反馈消息消费状态
自动:
从服务端获取到消息后,就认为该消息已经成功消息,无论客户端是否出异常。
手动:
从服务端获取消息后,服务端要标记为该不可用状态,等待客户端的反馈,如果客户端一直没有反馈,该消息一直被标记为不可用,如果接收到客户端的反馈,服务端就将该消息删除。
如何选择手动和自动? -- 根据不同的需求做出不同选择。
相关文章
- Rabbitmq 消费者的推模式与拉模式(go语言版本)
- 设计模式的C语言应用-建造者模式-第七章
- Spring Cloud 应用 Proxyless Mesh 模式探索与实践
- 模式的秘密-策略模式
- spring boot:rabbitmq用topic模式发送接收消息(spring boot 2.4.4)
- python+rabbitMQ实现生产者和消费者模式
- 【java设计模式】之 单例(Singleton)模式
- RAC 开启归档模式及设置归档路径
- Java设计模式之享元flyweight模式代码示例
- hadoop mapreduce求解有序TopN(高效模式)
- Atitit 单片机与嵌入式系统原理与概念 目录 1. 寄存器、数据库,堆栈2 1.1. 寻址模式2 1.2. 指令2 1.3. Watchdog 中断2 2. 软件是如何影响硬件设计的2
- 状态模式
- 大话设计模式C++版——装饰模式
- Git Reset 三种模式
- RabbitMQ入门-Routing直连模式
- RabbitMQ入门-消息订阅模式
- 转 RabbitMQ 入门教程(PHP版) 使用rabbitmq-delayed-message-exchange插件实现延迟功能
- RabbitMQ简单实现,exchange四种模式,持久化
- 【RabbitMQ笔记07】消息队列RabbitMQ七种模式之Publisher Confirms发布确认模式
- 【RabbitMQ笔记05】消息队列RabbitMQ七种模式之Routing路由键模式
- 【RabbitMQ笔记02】消息队列RabbitMQ七种模式之最简单的模式
- 03_RabbitMQ【订阅模式】
- 05_RabbitMQ【通配符匹配模式】
- 07_Springboot 整合 RabbitMQ【5个消息队列模式】