zl程序教程

您现在的位置是:首页 >  其他

当前栏目

高性能消息队列中间件MQ_part2

2023-03-07 09:15:36 时间

接上一篇part1的内容

RabbitMQ通配符模式_编写消费者

接下来我们编写通配符模式的消费者:

// 站内信消费者
public class Customer_Station {
    public static void main(String[] args)
throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory
= new ConnectionFactory();
      
connectionFactory.setHost("192.168.0.162");
        connectionFactory.setPort(5672);
      
connectionFactory.setUsername("itbaizhan");
     
connectionFactory.setPassword("itbaizhan");
10      
connectionFactory.setVirtualHost("/");// 默
认虚拟机
11
12        //2.创建连接
13        Connection conn =
connectionFactory.newConnection();
14    
15        //3.建立信道
16        Channel channel =
conn.createChannel();
17    
18        // 4.监听队列
19      
channel.basicConsume("SEND_STATION3", true,
new DefaultConsumer(channel) {
20            @Override
21            public void
handleDelivery(String consumerTag, Envelope
envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
22                String message = new
String(body, "utf-8");
23                System.out.println("发送站内
信:"+message);
24           }
25       });
26   }
27 }
// 邮件消费者
30 public class Customer_Mail {
31    public static void main(String[] args)
throws IOException, TimeoutException {
32        // 1.创建连接工厂
33        ConnectionFactory connectionFactory
= new ConnectionFactory();
34      
connectionFactory.setHost("192.168.0.162");
35        connectionFactory.setPort(5672);
36      
connectionFactory.setUsername("itbaizhan");
37      
connectionFactory.setPassword("itbaizhan");
38      
connectionFactory.setVirtualHost("/");// 默
认虚拟机
39
40        //2.创建连接
41        Connection conn =
connectionFactory.newConnection();
42    
43        //3.建立信道
44        Channel channel =
conn.createChannel();
45    
46        // 4.监听队列
47        channel.basicConsume("SEND_MAIL3",true, new DefaultConsumer(channel) {
48            @Override
            public void
handleDelivery(String consumerTag, Envelope
envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
50                String message = newString(body, "utf-8");
51                System.out.println("发送邮件:"+message);
52           }
53       });
54   }
55
56 }
57
58 // 短信消费者
59 public class Customer_Message {
60    public static void main(String[] args)throws IOException, TimeoutException {
61        // 1.创建连接工厂
62        ConnectionFactory connectionFactory= new ConnectionFactory();
63      connectionFactory.setHost("192.168.0.162");
64        connectionFactory.setPort(5672);
65      connectionFactory.setUsername("itbaizhan");
66      connectionFactory.setPassword("itbaizhan");
67      connectionFactory.setVirtualHost("/");// 默认虚拟机
        //2.创建连接
        Connection conn =connectionFactory.newConnection();
    
        //3.建立信道
        Channel channel =conn.createChannel();
    
        // 4.监听队列
      
channel.basicConsume("SEND_MESSAGE3", true,
new DefaultConsumer(channel) {
            @Override
            public void
handleDelivery(String consumerTag, Envelopeenvelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
                String message = newString(body, "utf-8");
                System.out.println("发送短信:"+message);
           }
       });
   }
}

SpringBoot整合RabbitMQ_项目搭建

之前我们使用原生JAVA操作RabbitMQ较为繁琐,接下来我们使用SpringBoot整合RabbitMQ,简化代码编写。

1.创建SpringBoot项目,引入RabbitMQ起步依赖

<!-- RabbitMQ起步依赖 -->
<dependency>
  
<groupId>org.springframework.boot</groupI
d>
    <artifactId>spring-boot-starteramqp</artifactId>
</dependency>

2.编写配置文件

spring:
 rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: itbaizhan
   password: itbaizhan
   virtual-host: /
#日志格式
logging:
 pattern:
   console: '%d{HH:mm:ss.SSS}
%clr(%-5level) --- [%-15thread]
%cyan(%-50logger{50}):%msg%n'

SpringBoot整合RabbitMQ_创建对列和交换机

SpringBoot整合RabbitMQ时,需要在配置类创建队列和交换机,写法如下:

@Configuration
2 public class RabbitConfig {
3    private final String EXCHANGE_NAME ="boot_topic_exchange";
4    private final String QUEUE_NAME ="boot_queue";
5
6    // 创建交换机
7    @Bean("bootExchange")
8    public Exchange getExchange() {
9        return ExchangeBuilder
10               .topicExchange(EXCHANGE_NAME) // 交换机类型
11               .durable(true) // 是否持久化
12               .build();
   }
14
15    // 创建队列
16    @Bean("bootQueue")
17    public Queue getMessageQueue() {
18        return new Queue(QUEUE_NAME); // 队列名
19   }
20
21    // 交换机绑定队列
22    @Bean
23    public Binding bindMessageQueue(@Qualifier("bootExchange")Exchange exchange, @Qualifier("bootQueue")Queue queue) {
24        return BindingBuilder
25               .bind(queue)
26               .to(exchange)
27               .with("#.message.#")
28               .noargs();
29   }
30 }

SpringBoot整合RabbitMQ_编写生产者

SpringBoot整合RabbitMQ时,提供了工具类RabbitTemplate发送消息,编写生产者时只需要注入RabbitTemplate即可发送消息

@SpringBootTest
public class TestProducer {
    // 注入RabbitTemplate工具类
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSendMessage(){
        /**
         * 发送消息
         * 参数1:交换机
         *          * 参数2:路由key
         * 参数3:要发送的消息
         */
      
rabbitTemplate.convertAndSend("boot_topic_exchange","message","双十一开始了!");
   }
}

运行生产者代码,即可看到消息发送到了RabbitMQ中


SpringBoot整合RabbitMQ_编写消费者

我们编写另一个SpringBoot项目作为RabbitMQ的消费者

1.创建SpringBoot项目,引入RabbitMQ起步依赖

<!-- rabbitmq起步依赖 -->
<dependency>
  
<groupId>org.springframework.boot</groupI
d>
    <artifactId>spring-boot-starteramqp</artifactId>
</dependency>

2.编写配置文件

spring:
 rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: itbaizhan
   password: itbaizhan
   virtual-host: /
#日志格式
logging:
 pattern:
   console: '%d{HH:mm:ss.SSS}
%clr(%-5level) --- [%-15thread]
%cyan(%-50logger{50}):%msg%n'

3.编写消费者,监听队列

@Component
public class Consumer {
    // 监听队列
    @RabbitListener(queues = "boot_queue")
    public void listen_message(String
message){
        System.out.println("发送短
信:"+message);
   }
}

4.启动项目,可以看到消费者会消费队列中的消息


消息的可靠性投递_概念

RabbitMQ消息投递的路径为:生产者 —> 交换机 —> 队列 —> 消费者 在RabbitMQ工作的过程中,每个环节消息都可能传递失败,那么RabbitMQ是如何监听消息是否成功投递的呢?

  • 确认模式(confirm)可以监听消息是否从生产者成功传递到交换机。
  • 退回模式(return)可以监听消息是否从交换机成功传递到队列。
  • 消费者消息确认(Consumer Ack)可以监听消费者是否成功处理消息。

首先我们准备两个SpringBoot项目,分别代表生产者和消费者,配置文件如下:

spring:
 rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: itbaizhan
   password: itbaizhan
   virtual-host: /
  
#日志格式
logging:
 pattern:
   console: '%d{HH:mm:ss.SSS}
%clr(%-5level) --- [%-15thread]
%cyan(%-50logger{50}):%msg%n'

在生产者的配置类创建交换机和队列

@Configuration
public class RabbitConfig {
    private final String
EXCHANGE_NAME="my_topic_exchange";
    private final String
QUEUE_NAME="my_queue";
    // 1.创建交换机
    @Bean("bootExchange")
    public Exchange getExchange(){
        return ExchangeBuilder
10               .topicExchange(EXCHANGE_NAME) // 交换机类型
11               .durable(true) // 是否持久化
12           .build();
13   }
14
15    // 2.创建队列
16    @Bean("bootQueue")
17    public Queue getMessageQueue(){
18        return QueueBuilder
19               .durable(QUEUE_NAME) // 队列持久化
20               .build();
21   }
22
23    // 3.将队列绑定到交换机
24    @Bean
25    public Binding bindMessageQueue(@Qualifier("bootExchange")Exchange exchange, @Qualifier("bootQueue")Queue queue){
26        return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();
27   }
28 }

消息的可靠性投递_确认模式

确认模式(confirm)可以监听消息是否从生产者成功传递到交换机,使用方法如下

1.生产者配置文件开启确认模式

spring:
 rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: itbaizhan
   password: itbaizhan
   virtual-host: /
    # 开启确认模式
   publisher-confirm-type: correlated

2.生产者定义确认模式的回调方法

@SpringBootTest
public class ProducerTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testConfirm(){
8        // 定义确认模式的回调方法,消息向交换机发送后会调用confirm方法
9      rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
10            /**
11             * 被调用的回调方法
12             * @param correlationData 相关配置信息
13             * @param ack 交换机是否成功收到了消息
14             * @param cause 失败原因
15             */
16            @Override
17            public void confirm(CorrelationData correlationData,boolean ack, String cause) {
18                if (ack){
19                  System.out.println("confirm接受成功!");
20               }else{
21                  
System.out.println("confirm接受失败,原因为:"+cause);
22                    // 做一些处理。
23               }
24           }
25       });
26      
rabbitTemplate.convertAndSend("my_topic_exchange","my_routing","send message...");
   }
}

消息的可靠性投递_退回模式

退回模式(return)可以监听消息是否从交换机成功传递到队列,使用方法如下:

1 生产者配置文件开启退回模式

spring:
 rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: itbaizhan
   password: itbaizhan
   virtual-host: /
    # 开启确认模式
   publisher-confirm-type: correlated
    # 开启回退模式
   publisher-returns: true

2.生产者定义退回模式的回调方法

@SpringBootTest
public class ProducerTest {
    @Autowired
        private RabbitTemplate rabbitTemplate;
5
6    @Test
7    public void testReturn(){
8        // 定义退回模式的回调方法。交换机发送到队列失败后才会执行returnedMessage方法
9      rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
10            /**
11             * @param returned 失败后将失败信息封装到参数中
12             */
13            @Override
14            public void returnedMessage(ReturnedMessage returned)
{
15                System.out.println("消息对象:"+returned.getMessage());
16                System.out.println("错误码:"+returned.getReplyCode());
17                System.out.println("错误信息:"+returned.getReplyText());
18                System.out.println("交换机:"+returned.getExchange());
19                System.out.println("路由键:"+returned.getRoutingKey());
20                // 处理消息...
21           }
22       });
      rabbitTemplate.convertAndSend("my_topic_exchange","my_routing1","send message...");
24   }
25 }

消息的可靠性投递_Ack

在RabbitMQ中,消费者接收到消息后会向队列发送确认签收的消息,只有确认签收的消息才会被移除队列。这种机制称为消费者消息确认(Consumer Acknowledge,简称Ack)。类似快递员派送快递也需要我们签收,否则一直存在于快递公司的系统中。消息分为自动确认和手动确认。自动确认指消息只要被消费者接收到,无论是否成功处理消息,则自动签收,并将消息从队列中移除。但是在实际开发中,收到消息后可能业务处理出现异常,那么消息就会丢失。此时需要设置手动签收,即在业务处理成功再通知签收消息,如果出现异常,则拒签消息,让消息依然保留在队列当中。

  • 自动确认:spring.rabbitmq.listener.simple.acknowledge=“none”
  • 手动确认:spring.rabbitmq.listener.simple.acknowledge=“manual”

1 消费者配置开启手动签收

spring:
 rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: itbaizhan
   password: itbaizhan
   virtual-host: /
    # 开启手动签收
   listener:
     simple:
       acknowledge-mode: manual

2.消费者处理消息时定义手动签收和拒绝签收的情况

@Component
public class AckConsumer {
    @RabbitListener(queues = "my_queue")
    public void listenMessage(Message
message, Channel channel) throws IOException, InterruptedException {
        // 消息投递序号,消息每次投递该值都会+1
        long deliveryTag =message.getMessageProperties().getDelivery
Tag();
        try {
            int i = 1/0; //模拟处理消息出现bug
            System.out.println("成功接受到消息:"+message);
            // 签收消息
            /**
             * 参数1:消息投递序号
             * 参数2:是否一次可以签收多条消息
             *              */
15          channel.basicAck(deliveryTag,true);
16       }catch (Exception e){
17            System.out.println("消息消费失败!");
18            Thread.sleep(2000);
19            // 拒签消息
20            /**
21             * 参数1:消息投递序号
22             * 参数2:是否一次可以拒签多条消息
23             * 参数3:拒签后消息是否重回队列
24             */
25          channel.basicNack(deliveryTag,true,true);
26       }
27   }
28 }

RabbitMQ高级特性_消费端限流

之前我们讲过MQ可以对请求进行“削峰填谷”,即通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。

消费端限流的写法如下: 1 生产者批量发送消息

@Test
public void testSendBatch() {
    // 发送十条消息
    for (int i = 0; i < 10; i++) {
      
rabbitTemplate.convertAndSend("my_topic_e
xchange", "my_routing", "send
message..."+i);
   }
}

2.消费端配置限流机制

spring:
 rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: itbaizhan
   password: itbaizhan
   virtual-host: /
   listener:
     simple:
        # 限流机制必须开启手动签收
       acknowledge-mode: manual
        # 消费端最多拉取5条消息消费,签收后不满5条才会继续拉取消息。
       prefetch: 5

3.消费者监听队列

@Component
2 public class QosConsumer{
3    @RabbitListener(queues = "my_queue")
4    public void listenMessage(Message
message, Channel channel) throws IOException, InterruptedException {
5        // 1.获取消息
6        System.out.println(new String(message.getBody()));
7        // 2.模拟业务处理
8        Thread.sleep(3000);
9        // 3.签收消息
10      channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
11   }
12 }

RabbitMQ高级特性_利用限流实现不公平分发

在RabbitMQ中,多个消费者监听同一条队列,则队列默认采用的轮询分发。但是在某种场景下这种策略并不是很好,例如消费者1处理任务的速度非常快,而其他消费者处理速度却很慢。此时如果采用公平分发,则消费者1有很大一部分时间处于空闲状态。此时可以采用不公平分发,即谁处理的快,谁处理的消息多。

使用方法如下: 1 生产者批量发送消息

@Test
public void testSendBatch() {
    // 发送十条消息
    for (int i = 0; i < 10; i++) {
      
rabbitTemplate.convertAndSend("my_topic_e
xchange", "my_routing", "send
message..."+i);
   }
}

2.消费端配置不公平分发

spring:
 rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: itbaizhan
   password: itbaizhan
   virtual-host: /
   listener:
     simple:
        # 限流机制必须开启手动签收
       acknowledge-mode: manual
        # 消费端最多拉取1条消息消费,这样谁处理
的快谁拉取下一条消息,实现了不公平分发
       prefetch: 1

3.编写两个消费者

@Component
public class UnfairConsumer {
    // 消费者1
    @RabbitListener(queues = "my_queue")
    public void listenMessage1(Message message, Channel channel) throws Exception
{
        //1.获取消息
        System.out.println("消费者1:"+new String(message.getBody(),"UTF-8"));
        //2. 处理业务逻辑
        Thread.sleep(500); // 消费者1处理快
        //3. 手动签收          
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
12   }
13    
14    // 消费者2
15    @RabbitListener(queues = "my_queue")
16    public void listenMessage2(Message message, Channel channel) throws Exception{
17        //1.获取消息
18        System.out.println("消费者2:"+new String(message.getBody(),"UTF-8"));
19        //2. 处理业务逻辑
20        Thread.sleep(3000);// 消费者2处理慢
21        //3. 手动签收
22      channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
23   }
24 }

RabbitMQ高级特性_消息存活时间

RabbitMQ可以设置消息的存活时间(Time To Live,简称TTL),当消息到达存活时间后还没有被消费,会被移出队列。RabbitMQ可以对队列的所有消息设置存活时间,也可以对某条消息设置存活时间。

设置队列所有消息存活时间

1 在创建队列时设置其存活时间:

@Configuration
public class RabbitConfig2 {
    private final String
EXCHANGE_NAME="my_topic_exchange2";
    private final String
QUEUE_NAME="my_queue2";
    // 1.创建交换机
    @Bean("bootExchange2")
    public Exchange getExchange2(){
        return ExchangeBuilder
               
.topicExchange(EXCHANGE_NAME)
               .durable(true).
                build();
   }
    // 2.创建队列
    @Bean("bootQueue2")
    public Queue getMessageQueue2(){
        return QueueBuilder
               .durable(QUEUE_NAME)
               .ttl(10000) //队列的每条消息存活10s
               .build();
   }
    // 3.将队列绑定到交换机
    @Bean
    public Binding
bindMessageQueue2(@Qualifier("bootExchange2") Exchange exchange,
@Qualifier("bootQueue2") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();
   }
}

2.生产者批量生产消息,测试存活时间

@Test
2 public void testSendBatch2() throws InterruptedException {
3    // 发送十条消息
4    for (int i = 0; i < 10; i++) {
5      rabbitTemplate.convertAndSend("my_topic_exchange2", "my_routing", "sendmessage..."+i);
6   }
7 }

设置单条消息存活时间

@Test
public void testSendMessage() {
    //设置消息属性
    MessageProperties messageProperties =
new MessageProperties();
    //设置存活时间
  
messageProperties.setExpiration("10000");
    // 创建消息对象
    Message message = new Message("send
message...".getBytes(StandardCharsets.UTF_8)
, messageProperties);
    // 发送消息
  
rabbitTemplate.convertAndSend("my_topic_exc
hange", "my_routing", message);
}

注意: 1 如果设置了单条消息的存活时间,也设置了队列的存活时 间,以时间短的为准。 2 消息过期后,并不会马上移除消息,只有消息消费到队列顶 端时,才会移除该消息。

@Test
public void testSendMessage2() {
    for (int i = 0; i < 10; i++) {
        if (i == 5) {
            // 1.创建消息属性
            MessageProperties
messageProperties = new MessageProperties();
            // 2.设置存活时间
          
messageProperties.setExpiration("10000");
            // 3.创建消息对象
            Message message = new Message(("send message..." +i).getBytes(), messageProperties);
            // 4.发送消息
          
rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", message);
       } else {
          
rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", "send message..." + i);
       }
   }
}

在以上案例中,i=5的消息才有过期时间,10s后消息并没有 马上被移除,但该消息已经不会被消费了,当它到达队列顶 端时会被移除。


RabbitMQ高级特性_优先级队列

假设在电商系统中有一个订单催付的场景,即客户在一段时间内未付款会给用户推送一条短信提醒,但是系统中分为大型商家和小型商家。比如像苹果,小米这样大商家一年能给我们创造很大的利润,所以在订单量大时,他们的订单必须得到优先处理,此时就需要为不同的消息设置不同的优先级,此时我们要使用优先级队列。

优先级队列用法如下:

1.创建队列和交换机

@Configuration
public class RabbitConfig3 {
    private final String
EXCHANGE_NAME="priority_exchange";
    private final String
QUEUE_NAME="priority_queue";
    // 1.创建交换机
    @Bean(EXCHANGE_NAME)
    public Exchange priorityExchange(){
        return ExchangeBuilder
               
.topicExchange(EXCHANGE_NAME)
               .durable(true).
                build();
   }
       // 2.创建队列
    @Bean(QUEUE_NAME)
    public Queue priorityQueue(){
        return QueueBuilder
               .durable(QUEUE_NAME)
                //设置队列的最大优先级,最大可以设置到255,官网推荐不要超过10,,如果设置太高比较浪费资源
               .maxPriority(10)
               .build();
   }
    // 3.将队列绑定到交换机
    @Bean
    public Binding
bindPriority(@Qualifier(EXCHANGE_NAME)
Exchange exchange, @Qualifier(QUEUE_NAME)
Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();
   }
}

2.编写生产者

@Test
public void testPriority() {
    for (int i = 0; i < 10; i++) {
        if (i == 5) {
            // i为5时消息的优先级较高
                       MessageProperties
messageProperties = new MessageProperties();
          
messageProperties.setPriority(9);
            Message message = new Message(("send message..." +
i).getBytes(StandardCharsets.UTF_8),
messageProperties);
          
rabbitTemplate.convertAndSend("priority_exchange", "my_routing", message);
       } else {
          rabbitTemplate.convertAndSend("priority_exchange", "my_routing", "send message..."+ i);
       }
   }
}

3.编写消费者

@Component
2 public class PriorityConsumer {
3 @RabbitListener(queues ="priority_queue")
4    public void listenMessage(Message message, Channel channel) throws Exception
{
5        //获取消息
6        System.out.println(new String(message.getBody()));
7        //手动签收
8      channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
9   }
10 }

RabbitMQ死信队列_概念

在MQ中,当消息成为死信(Dead message)后,消息中间件可以将其从当前队列发送到另一个队列中,这个队列就是死信队列。而在RabbitMQ中,由于有交换机的概念,实际是将死信发送给了死信交换机(Dead Letter Exchange,简称DLX)。死信交换机和死信队列和普通的没有区别。

消息成为死信的情况:

  1. 队列消息长度到达限制。
  2. 消费者拒签消息,并且不把消息重新放入原队列。
  3. 消息到达存活时间未被消费。

RabbitMQ死信队列_代码实现

@Configuration
public class RabbitConfig4 {
    private final String DEAD_EXCHANGE =
"dead_exchange";
    private final String DEAD_QUEUE =
"dead_queue";
    private final String NORMAL_EXCHANGE =
"normal_exchange";
    private final String NORMAL_QUEUE =
"normal_queue";
    // 死信交换机
    @Bean(DEAD_EXCHANGE)
    public Exchange deadExchange(){
        return ExchangeBuilder
               
.topicExchange(DEAD_EXCHANGE)
               .durable(true)
               .build();
   }
    // 死信队列
    @Bean(DEAD_QUEUE)
    public Queue deadQueue(){
        return QueueBuilder
               .durable(DEAD_QUEUE)
               .build();
   }
       // 死信交换机绑定死信队列
    @Bean
    public Binding
bindDeadQueue(@Qualifier(DEAD_EXCHANGE)
Exchange
exchange,@Qualifier(DEAD_QUEUE)Queue queue){
        return BindingBuilder
               .bind(queue)
               .to(exchange)
               .with("dead_routing")
               .noargs();
   }
    // 普通交换机
    @Bean(NORMAL_EXCHANGE)
    public Exchange normalExchange(){
        return ExchangeBuilder
               
.topicExchange(NORMAL_EXCHANGE)
               .durable(true)
               .build();
   }
    // 普通队列
    @Bean(NORMAL_QUEUE)
    public Queue normalQueue(){
        return QueueBuilder
               .durable(NORMAL_QUEUE)
               
.deadLetterExchange(DEAD_EXCHANGE) // 绑定死信交换机
               
.deadLetterRoutingKey("dead_routing") // 死信队列路由关键字
53               .ttl(10000) // 消息存活10s
54               .maxLength(10) // 队列最大长度
为10
55               .build();
56   }
57
58    // 普通交换机绑定普通队列
59    @Bean
60    public Binding
bindNormalQueue(@Qualifier(NORMAL_EXCHANGE)
Exchange
exchange,@Qualifier(NORMAL_QUEUE)Queue
queue){
61        return BindingBuilder
62               .bind(queue)
63               .to(exchange)
64               .with("my_routing")
65               .noargs();
66   }
67 }

测试死信队列

1.生产者发送消息

@Test
public void testDlx(){
    // 存活时间过期后变成死信
    //       
rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
    // 超过队列长度后变成死信
    //       for (int i = 0; i < 20; i++)
{
    //           
rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
    //       }
    // 消息拒签但不返回原队列后变成死信
  
rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
}

2.消费者拒收消息

@Component
public class DlxConsumer {
    @RabbitListener(queues ="normal_queue")
    public void listenMessage(Message
message, Channel channel) throws IOException {
        // 拒签消息
      
channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);
   }
}

RabbitMQ延迟队列_概念

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。 例如:用户下单后,30分钟后查询订单状态,未支付则会取消订单

但RabbitMQ中并未提供延迟队列功能,我们可以使用死信队列实现延迟队列的效果。


RabbitMQ延迟队列_死信队列实现

接下来我们使用死信队列实现延迟队列

1 创建SpringBoot订单模块,添加SpringMVC、RabbitMQ、lombok依赖。

<dependency>
  
<groupId>org.springframework.boot</groupI
d>
    <artifactId>spring-boot-starteramqp</artifactId>
</dependency>
<dependency>
  
<groupId>org.springframework.boot</groupI
d>
    <artifactId>spring-boot-starterweb</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>

2.编写配置文件

spring:
 rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: itbaizhan
   password: itbaizhan
   virtual-host: /
  
# 日志格式
logging:
 pattern:
   console: '%d{HH:mm:ss.SSS}
%clr(%-5level) --- [%-15thread]
%cyan(%-50logger{50}):%msg%n'

3.创建队列和交换机

@Configuration
public class RabbitConfig {
    // 订单交换机和队列
    private final String ORDER_EXCHANGE =
"order_exchange";
    private final String ORDER_QUEUE =
"order_queue";
    // 过期订单交换机和队列
    private final String EXPIRE_EXCHANGE =
"expire_exchange";
    private final String EXPIRE_QUEUE =
"expire_queue";
    // 过期订单交换机
    @Bean(EXPIRE_EXCHANGE)
    public Exchange deadExchange(){
            return ExchangeBuilder
14               
.topicExchange(EXPIRE_EXCHANGE)
15               .durable(true)
16               .build();
17   }
18    // 过期订单队列
19    @Bean(EXPIRE_QUEUE)
20    public Queue deadQueue(){
21        return QueueBuilder
22               .durable(EXPIRE_QUEUE)
23               .build();
24   }
25    // 将过期订单队列绑定到交换机
26    @Bean
27    public Binding
bindDeadQueue(@Qualifier(EXPIRE_EXCHANGE)
Exchange exchange,@Qualifier(EXPIRE_QUEUE)
Queue queue){
28        return BindingBuilder
29               .bind(queue)
30               .to(exchange)
31               .with("expire_routing")
32               .noargs();
33   }
34
35    // 订单交换机
36    @Bean(ORDER_EXCHANGE)
37    public Exchange normalExchange(){
38        return ExchangeBuilder
39               
.topicExchange(ORDER_EXCHANGE)
              .durable(true)
               .build();
   }
    // 订单队列
    @Bean(ORDER_QUEUE)
    public Queue normalQueue(){
        return QueueBuilder
               .durable(ORDER_QUEUE)
               .ttl(10000) // 存活时间为10s,模拟30min
               
.deadLetterExchange(EXPIRE_EXCHANGE) // 绑定死信交换机
               
.deadLetterRoutingKey("expire_routing") //死信交换机的路由关键字
               .build();
   }
    // 将订单队列绑定到交换机
    @Bean
    public Binding
bindNormalQueue(@Qualifier(ORDER_EXCHANGE)
Exchange exchange,@Qualifier(ORDER_QUEUE)
Queue queue){
        return BindingBuilder
               .bind(queue)
               .to(exchange)
               .with("order_routing")
               .noargs();
   }
}

4.编写下单的控制器方法,下单后向订单交换机发送消息

@RestController
public class OrderController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //下单
    @GetMapping("/place/{orderId}")
    public String placeOrder(@PathVariable
String orderId){
        System.out.println("处理订单数据...");
        // 将订单id发送到订单队列
      
rabbitTemplate.convertAndSend("order_exch
ange", "order_routing", orderId);
        return "下单成功,修改库存";
   }
}

5.编写监听死信队列的消费者

// 过期订单消费者
@Component
public class ExpireOrderConsumer {
    // 监听队列
    @RabbitListener(queues ="expire_queue")
    public void listenMessage(String
orderId){
        System.out.println("查询"+orderId+"号订单的状态,如果已支付则无需处理,如果未支付则需要回退库存");
   }
}

RabbitMQ延迟队列_插件实现

在使用死信队列实现延迟队列时,会遇到一个问题:RabbitMQ只会移除队列顶端的过期消息,如果第一个消息的存活时长较长,而第二个消息的存活时长较短,则第二个消息并不会及时执行。

RabbitMQ虽然本身不能使用延迟队列,但官方提供了延迟队列插件,安装后可直接使用延迟队列。

安装延迟队列插件

1 使用rz将插件上传至虚拟机 2 安装插件

# 将插件放入RabbitMQ插件目录中
mv rabbitmq_delayed_message_exchange-
3.9.0.ez /usr/local/rabbitmq/plugins/
# 启用插件
rabbitmq-plugins enable
rabbitmq_delayed_message_exchange

3.重启RabbitMQ服务

#停止rabbitmq
rabbitmqctl stop
#启动rabbitmq
rabbitmq-server restart -detached

此时登录管控台可以看到交换机类型多了延迟消息