16-RabbitMQ高级特性-消费端的消息ACK与重回队列
2023-06-13 09:13:50 时间
消费端的消息ACK与重回队列
消费端的手工ACK和NACK
- ACK分为自动和手动
- 消费端进行消费的时候, 如果由于业务异常我们可以进行日志的记录, 然后进行补偿
- 如果由于服务器宕机等严重问题, 那我们就需要手工进行ACK保障消费端消费成功
消费端的重回队列
- 消费端重回队列是为了对没有处理成功的消息, 把消息重新会递给Broker
- 一般我们在实际应用中, 都会关闭重回队列, 也就是设置为FALSE
- 为什么不使用重回队列的功能呢, 因为消息重回队列会加入到队列的尾部, 也会造成一条甚至大量消息一直重复投递在队列中死循环
- 说道这里, 其实我是真实碰到过的, 当时正是双11, 我们的失败策略就是用的重回队列, 导致有大量的消息一直因为业务的异常, 重回队列, 导致了4000万的订单MQ消息, 一直压力下不去, 差点被领导骂死~, 后面还做了重大事故回顾会议, 哎
消息重回队列代码实现
消费者
package com.dance.redis.mq.rabbit.rqueue;
import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
public class Receiver {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQHelper.getChannel();
String queueName = "test001";
RabbitMQHelper.queueDeclare(channel,queueName,true);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
try {
System.out.println("receive message:" + new String(body) + ", RoutingKey: " + envelope.getRoutingKey());
/**
* 手工ACK
* 消费失败, 但是消息不重回队列
* channel.basicNack(envelope.getDeliveryTag(), false, false);
* 消费失败, 将消息重新丢回消息队列尾部
* channel.basicNack(envelope.getDeliveryTag(), false, true);
* 消费成功
* channel.basicAck(envelope.getDeliveryTag(), false);
*/
if((Integer)properties.getHeaders().get("flag") == 0) {
//throw new RuntimeException("异常");
// 设置为false表示关闭重回队列
channel.basicNack(envelope.getDeliveryTag(), false, false);
// 设置为true表示开启重回队列 将这条消息重回放入队列
// channel.basicNack(envelope.getDeliveryTag(), false, true);
} else {
channel.basicAck(envelope.getDeliveryTag(), false);
}
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
// 参数:队列名称、是否自动ACK、Consumer
channel.basicConsume(queueName, false, consumer);
// 等待回调函数执行完毕之后,关闭资源。
TimeUnit.SECONDS.sleep(50);
channel.close();
RabbitMQHelper.closeConnection();
}
}
生产者
package com.dance.redis.mq.rabbit.rqueue;
import com.dance.redis.mq.rabbit.RabbitMQHelper;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import java.util.HashMap;
import java.util.Map;
public class Sender {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQHelper.getChannel();
//4 声明
String queueName = "test001";
RabbitMQHelper.queueDeclare(channel, queueName, true);
for (int i = 0; i < 5; i++) {
String msg = "Hello World RabbitMQ " + i;
Map<String, Object> headers = new HashMap<>();
headers.put("flag", i);
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(headers).build();
channel.basicPublish("", queueName, props, msg.getBytes());
}
}
}
测试
开启重回队列测试
启动消费者
启动生产者
查看消费者
可以看到flag=0的消息, 再一直被重回队列, 当然, 我们可以通过程序去控制这个是不是要重回队列
关闭重回队列测试
启动消费者
启动生产者
查看消费者
可以看到哪怕, 我们手工NACK之后, 消息也没有重回队列
相关文章
- 最详解消息队列以及RabbbitMQ之HelloWorld
- RabbitMQ模拟消息队列群发邮件
- Docker安装RabbitMQ消息队列
- 字节跳动3-3大牛力荐!RabbitMQ实战指南:消息队列面试必刷手册
- 《玩游戏,学技术》第二篇,用消息队列实现所有游戏功能
- 消息队列RabbitMQ核心:简单(Hello World)模式、队列(Work Queues)模式、发布订阅模式
- 面试系列-kafka消息相关机制
- 消息队列:第四章:延迟检查队列
- 【Android 异步操作】手写 Handler ( 消息队列 MessageQueue | 消息保存到链表 | 从链表中获取消息 )
- RabbitMQ消息队列入门及解决常见问题
- SQLServer 错误 21892 无法在与虚拟网络名称“%s”相关联的可用性组主副本上查询 sys.availability_replicas 以获取成员副本的服务器名称:错误 = %d,错误消息 = %s。 故障 处理 修复 支持远程
- redis用list做消息队列的实现示例
- redis中队列消息实现应用解耦的方法
- 使用消息队列实现分布式事务-公认较为理想的分布式事务解决方案详解架构师
- spring事务与消息队列详解编程语言
- java spring boot消息队列 RabbitMQ详解编程语言
- 消息称iPhone 14高配版将采用钛合金 鸿海独家供应
- 读取Linux消息队列阻塞读取的研究(linux消息队列阻塞)
- 队列利用Redis实现延迟消息队列(redis延迟)
- Mysql消息队列实现跨系统异步通讯(消息队列 mysql)
- Linux如何配置消息队列(MQ)?(linux配置mq)
- Redis消息队列开发实战篇(消息队列实战redis)
- 实现高性能数据消息订阅学习如何使用Redis(如何使用redis订阅)
- Redis实现高并发消息订阅(redis高并发订阅)
- 基于Redis实现的消息队列配置实例(redis配置mq实例)
- 基于Redis的消息订阅过滤(redis过滤订阅)
- Redis解决消息中的队列问题(redis消息怎么做队列)