zl程序教程

您现在的位置是:首页 >  工具

当前栏目

RabbitMQ学习笔记(三)——RabbitMQ 常用高级特性

笔记学习RabbitMQ 常用 特性 高级
2023-06-13 09:11:07 时间

RabbitMQ 常用高级特性

  1. 发送端确认机制
  2. 消息返回机制
  3. 消费端确认机制
  4. 消费端限流机制
  5. 消息过期时间
  6. 死信队列

如何保证消息的可靠性

  1. 发送方
    • 需要使用RabbitMQ发送端确认机制,确认消息成功发送到RabbitMQ并被处理
    • 需要使用RabbitMQ消息返回机制,若没发现目标队列,中间件会通知发送方
  2. 消费方
    • 需要使用RabbitMQ消费端确认机制,确认消息没有发生处理异常
    • 需要使用RabbitMQ消费端限流机制,限制消息推送速度,保障接收端服务稳定
  3. RabbitMQ自身
    • 大量堆积的消息会给RabbitMQ产生很大的压力,需要使用RabbitMQ消息过期时间,防止消息大量积压
    • 过期后会直接被丢弃,无法对系统运行异常发出警报,需要使用RabbitMQ死信队列,收集过期消息,以供分析

1. 发送端确认机制

什么是发送端确认机制

  • 消息发送后,若中间件收到消息,会给发送端一个应答
  • 生产者接收应答,用来确认这条消息是否正常发送到中间件

三种确认机制

  1. 单条同步确认 配置channel,开启确认模式: channel.confirmSelect() 每发送一条消息,调用channel.waitForConfirms()方法等待确认
  2. 多条同步确认 配置channel,开启确认模式:channel.confirmSelect() 发送多条消息后,调用channel.waitForConfirms()方法,等待确认
  3. 异步确认 配置channel,开启确认模式: channel.confirmSelect() 在channel上添加监听:addConfirmListener,发送消息后,会回调此方法,通知是否发送成功 异步确认有可能是单条,也有可能是多条,取决于MQ

实例在order微服务中发送确认

        // 创建订单之后给restaurant发消息
        try (Connection connection = connectionFactory.newConnection();
             Channel channel = connection.createChannel()) {
            // 配置channel,开启确认模式
            channel.confirmSelect();

            //单条同步确认机制
            /*if (channel.waitForConfirms()) {
                log.info("RabbitMQ confirm success");
            } else {

                log.info("RabbitMQ confirm failed");
            }*/

            // 异步同步确认机制
            ConfirmListener confirmListener = new ConfirmListener() {
                @Override
                public void handleAck(long l, boolean b) throws IOException {
                    log.info("Ack deliveryTag:{},mutiple:{}", l, b);
                    // 消息发送成功
                }

                @Override
                public void handleNack(long l, boolean b) throws IOException {
                    log.info("Nack deliveryTag:{},mutiple:{}", l, b);
                    // 消息发送失败
                }
            };
            channel.addConfirmListener(confirmListener);

            String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);

            //(exchange,routingKey,消息特殊参数,消息体本身(字节))
            channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
            log.info("message sent");

            // 发送多条消息
            /*for (int i = 0; i < 10; i++) {
                channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
                log.info("message sent");
            }
            Thread.sleep(10000);*/

        } catch (InterruptedException e) {
            e.printStackTrace();
        }

2. 消息返回机制

消息返回机制的原理

  • 消息发送后,中间件会对消息进行路由
  • 若没有发现目标队列,中间件会通知发送方
  • Return Listener 会被调用

消息返回的开启方法

  • 在RabbitMQ基础配置中有一个关键配置项:Mandatory
  • Mandatory若为false,RabbitMQ将直接丢弃无法路由的消息
  • Mandatory若为true,RabbitMQ才会处理无法路由的消息

示例在restaurant微服务中无法被路由

 DeliverCallback deliverCallback = (consumerTag, message) -> {
        String messageBody = new String(message.getBody());
        log.info("deliverCallback:messageBody:{}", messageBody);
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(host);
        try {
            // 消息发表序列化
            OrderMessageDTO orderMessageDTO = objectMapper.readValue(messageBody,
                    OrderMessageDTO.class);
            // 查根据id数据库
            ProductPO productPO = productDao.selsctProduct(orderMessageDTO.getProductId());
            log.info("onMessage:productPO:{}", productPO);
            RestaurantPO restaurantPO = restaurantDao.selsctRestaurant(productPO.getRestaurantId());
            log.info("onMessage:restaurantPO:{}", restaurantPO);
            // 校验是否可以下订单
            if (ProductStatusEnum.AVALIABLE == productPO.getStatus() &&
                    RestaurantStatusEnum.OPEN == restaurantPO.getStatus()) {
                orderMessageDTO.setConfirmed(true);
                orderMessageDTO.setPrice(productPO.getPrice());
            } else {
                orderMessageDTO.setConfirmed(false);
            }
            log.info("sendMessage:restaurantOrderMessageDTO:{}", orderMessageDTO);
            // 校验完订单回消息给订单微服务
            try (Connection connection = connectionFactory.newConnection();
                 Channel channel = connection.createChannel()) {

                /*channel.addReturnListener(new ReturnListener() {
                 *//**
                 * 1. channel.basicPublish第三个参数Mandatory为true后
                 * 2. channel添加ReturnListener,
                 * 3. 当消息没有被路由后,会调用handleReturn方法
                 *//*
                    @Override
                    public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
                        log.info("Message Return:  replyCode: {},replyText: {},exchange: {},routingKey: {},properties: {},body : {}", i, s, s1, s2, basicProperties, new String(bytes));
                        // 除了打印log可以加别的业务操作
                    }
                });*/

                channel.addReturnListener(new ReturnCallback() {
                    @Override
                    public void handle(Return aReturn) {
                        /**
                         * Return aReturn中的字段和上面方法相同
                         */
                        log.info("Message Return:  returnMessage{}", aReturn);
                        // 除了打印log可以加别的业务操作
                    }
                });

                String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
                channel.basicPublish("exchange.order.restaurant", "key.order", true, null, messageToSend.getBytes());

                // 如果channel关闭则不能接收返回,睡眠1s
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } catch (JsonProcessingException | TimeoutException e) {
            e.printStackTrace();
        }
    };

3. 消费端确认机制

消费端ACK类型

自动ACK:消费端收到消息后,会自动签收消息 手动ACK:消费端收到消息后,不会自动签收消息,需要我们在业务代码中显式签收消息

手动ACK类型

单条手动ACK: multiple=false 多条手动ACK: multiple=true (推荐使用单条ACK)

重回队列

若设置了重回队列,消息被NACK之后,会返回队列末尾,等待进一步被处理 一般不建议开启重回队列,因为第一次处理异常的消息,再次处理,基本上也是异常

实现步骤:

  1. 设置不自动签收
  2. 声明手动签收/拒收(拒收重回队列不推荐,会造成死循环)
  3. 设置单挑或者多条签收
 /*channel.basicConsume("queue.restaurant",
        true,
        deliverCallback,
        consumerTag -> {
        });*/
// 处理消息手动ack
channel.basicConsume("queue.restaurant",
        false,
        deliverCallback,
        consumerTag -> {
        });

// 使用全局的channel进行消费者确认
// 手动签收消息(单条签收)
// channel.basicAck(message.getEnvelope().getDeliveryTag(), false);

// 多条消息手动签收(5条消息全部签收一次)
if (message.getEnvelope().getDeliveryTag() % 5 == 0) {
    channel.basicAck(message.getEnvelope().getDeliveryTag(), true);
}

// 手动拒收消息
// channel.basicNack(message.getEnvelope().getDeliveryTag(), false, true);

4. 消费端限流机制

消费端限流原因

业务高峰期,有个微服务崩溃了,崩溃期间队列挤压了大量消息,微服务上线后,收到大量并发消息。 将同样多的消息推给能力不同的副本,会导致部分副本异常。

RabbitMQ - QoS

  • 针对以上问题,RabbitMQ 开发了QoS (服务质量保证)功能
  • QoS功能保证了在一定数目的消息未被确认前,不消费新的消息
  • QoS功能的前提是不使用自动确认

QoS原理

  • QoS原理是当消费端有一定数量的消息未被ACK确认时,RabbitMQ不给消费端推送新的消息
  • RabbitMQ使用QoS机制实现了消费端限流

消费端限流机制参数设置

  • prefetchCount:针对一个消费端最多推送多少未确认消息
  • global: true:针对整个消费端限流false:针对当前channel
  • prefetchSize : 0 (单个消息大小限制, 一般为0)
  • prefetchSize 与global两项,RabbitMQ暂时未实现

实战:

  1. 在消费端手动ack之前设置qos
// 开启qos消费端限流(一个消费端最多推送多少未确认消息,剩下的状态是ready状态,可以进行多消费端进行接收)
channel.basicQos(2);
 
// 处理消息手动ack
channel.basicConsume("queue.restaurant",
      false,
      deliverCallback,
      consumerTag -> {
      });
while (true) {
  Thread.sleep(100000);
}
 

  1. 发送方连续发送50条消息
for (int i = 0; i < 50; i++) {
channel.basicPublish("exchange.order.restaurant", "key.restaurant", null, messageToSend.getBytes());
log.info("message sent");
}
 

未开启qos限流前(消息全部推送,造成消费端消息挤压,无法一次性接收,并且全处于unacked状态,其他消费端也无法抢占资源)

开启qos限流后(消息全部推送,无法一次性接收,并且全处于ready状态,其他消费端可以抢占资源形成'负载均衡'的效果)

5. 消息过期机制

RabbitMQ的过期时间(TTL)

  • RabbitMQ的过期时间称为TTL (Time to Live),生存时间
  • RabbitMQ的过期时间分为消息TTL和队列TTL
  • 消息TTL设置了单条消息的过期时间
  • 队列TTL设置了队列中所有消息的过期时间

如何找到适合自己的TTL?

  • TTL的设置主要考虑技术架构与业务
  • TTL应该明显长于服务的平均重启时间
  • 建议TTL长于业务高峰期时间

实战

  1. 设置单条消息的过期时间
// 设置单条消息的过期时间(时间到期后消息会被清除)
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("15000").build();
channel.basicPublish("exchange.order.restaurant", "key.restaurant", properties, messageToSend.getBytes());

 

  1. 设置队列内消息的过期时间(消费者方设置)
// 给整个队列设置过期消息时长,如果该队列里面在设置时间内没有消费完的消息会自动清除
Map<String, Object> args = new HashMap<>(16);
args.put("x-message-ttl", 150000);
// 声明队列
channel.queueDeclare(
    "queue.restaurant",
    true,
    false,
    false,
    args);
 

6. 死信队列

什么是死信队列

  • 死信队列:队列被配置了DLX属性(Dead-Letter- Exchange)
  • 当一个消息变成死信(dead message)后,能重新被发布到另一个Exchange,这个Exchange也是一个普通交换机
  • 死信被死信交换机路由后,一般进入一个固定队列

怎样变成死信

  • 消息被拒绝(reject/nack) 并且requeue=false
  • 消息过期(TTL到期)
  • 队列达到最大长度

死信队列设置方法

  1. 设置转发、接收死信的交换机和队列: ◆Exchange: dlx.exchange ◆Queue: dlx.queue ◆RoutingKey: #
  2. 在需要设置死信的队列加入参数: ◆x-dead-letter-exchange = dlx.exchange

代码实现

// 声明接收私信消息的死信交换机和死信队列
channel.exchangeDeclare(
        "exchange.dlx",
        BuiltinExchangeType.TOPIC,
        true,
        false,
        null);

channel.queueDeclare(
        "queue.dlx",
        true,
        false,
        false,
        null
);
// 绑定死信交换机和队列
channel.queueBind("queue.dlx", "exchange.dlx", "#");

  // 声明交换机
channel.exchangeDeclare(
        "exchange.order.restaurant",
        BuiltinExchangeType.DIRECT,
        true,
        false,
        null);

// 给整个队列设置过期消息时长,如果该队列里面在设置时间内没有消费完的消息会自动清除或者丢到死信队列
Map<String, Object> args = new HashMap<>(16);
args.put("x-message-ttl", 15000);
// 设置队列最长长度(超过队列长度将消息丢到死信队列)
args.put("x-max-length", 5);
args.put("x-dead-letter-exchange", "exchange.dlx");

// 声明交队列
channel.queueDeclare(
        "queue.restaurant",
        true,
        false,
        false,
        args);
// 声明绑定关系
channel.queueBind(
        "queue.restaurant",
        "exchange.order.restaurant",
        "key.restaurant");

或者在消费端手动签收消息时,拒收消息时(channel.basicNack),会将消息丢到死信队列

当前项目的不足之处分析

  1. 手动建立连接:目前项目中,需要手动建立连接,增加了代码量和bug概率
  2. 手动监听消息:目前项目中,需要手动启动监听线程,不方便
  3. 显式指定Calback方法:目前项目中,需要显式指定Callback方法,代码可读性差
  4. 显式声明队列和交换机:目前项目中,需要显式声明队列和交换机,增加了代码量和Bug概率

实际开发中经验及小结

经验

  1. 善用RabbitMQ高级特性 ◆对于RabbitMQ的高级特性,要善加利用 ◆接收端确认、死信队列是非常常用的特性
  2. 慎用RabbitMQ高级特性 ◆不要无限追求高级,用.上所有RabbitMQ的高级特性 ◆重回队列、发送端确认是不常用的特性,谨慎使用
  3. 善用RabbitMQ管控台 ◆管控台是RabbitMQ调试的利器 ◆RabbitMQ高级特性多数都涉及交换机、队列的属性配置,可以在管控台确认配置是否生效 ◆RabbitMQ高级特性很多都可以在管控台进行试验

小结

  • 为了确保消息发送,使用了发送端确认机制
  • 为了确保消息正确路由,使用了消息返回机制
  • 为了保证消息正常梳理,使用了消费端确认机制
  • 为了保证消费端稳定,使用消费端限流机制,
  • 为了中间件问题,使用过期时间机制
  • 为了处理异常消息,使用死信机制