zl程序教程

您现在的位置是:首页 >  其他

当前栏目

RabbitMq五种模式代码案例及使用详解

2023-04-18 16:29:35 时间

目录

RabbitMQ简介:

准备环节:

1. 简单模式:Hello_world

生产者代码

消费者代码

抽取工具类

2.工作模式:work_queues

生产者代码:发送10条消息

创建两个消费者(代码相同):

 3.订阅模式:pub/sub

生产者代码:

消费者一:接收消息保存至数据库

消费者二:接收消息打印至控制台

 4. 路由模式:Routing

生产者代码:

消费队列一(error)

消费者二(info,error,warning)

 5.通配符模式:Topics

生产者代码:

消费者一(队列:test_topic_queue1)

消费者二(队列:test_topic_queue2)

SpringBoot整合RabbitMq

生产者:

消费者:

消息丢失:


RabbitMQ简介:

RabbitMQ是一个实现了AMQP(Advanced Message Queuing Protocol)高级消息队列协议的消息队列服务,用Erlang语言。是面向消息的中间件。

你可以把它想像成一个邮局:你把信件放入邮箱,邮递员就会把信件投递到你的收件人处。在这个比喻中,RabbitMQ是一个邮箱、邮局、邮递员。RabbitMQ和邮局的主要区别是,它处理的不是纸,而是接收、存储和发送二进制的数据——消息。


主要流程:生产者(Producer)与消费者(Consumer)和 RabbitMQ 服务(Broker)建立连接, 然后生产者发布消息(Message)同时需要携带交换机(Exchange) 名称以及路由规则(Routing Key),这样消息会到达指定的交换机,然后交换机根据路由规则匹配对应的 Binding,最终将消息发送到匹配的消息队列(Quene),最后 RabbitMQ 服务将队列中的消息投递给订阅了该队列的消费者(消费者也可以主动拉取消息)。

这里总结了rabbitmq的五种模式代码案例,全部手敲测试通过,简单记录

准备环节:

rabbitmq,创建用户,分配主机

 创建工程并引入依赖

 <dependencies>
<!-- rabbitmq 依赖-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.9.0</version>
        </dependency>
    </dependencies>

1. 简单模式:Hello_world

 单个生产者生产消息,直接通过队列进行发送至单个消费者,消费者消费

生产者代码

package com.yutao.producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 发送消息
 *
 * @author yt
 * @create 2022/10/14 13:40
 */
public class Producer_HelloWorld{
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置参数
        factory.setHost("192.168.149.129");
        //端口
        factory.setPort(5672);
        //自己在网页上创建的虚拟机
        factory.setVirtualHost("/itcast");
        //账号密码
        factory.setUsername("yutao");
        factory.setPassword("yutao");

        //创建连接
        Connection connection = factory.newConnection();
        //创建channel
        Channel channel = connection.createChannel();
        /**
         * 创建队列Queue
         * 参数:
         * 1.queue:队列名称
         * 2.durable:是否持久化,当mq重启之后是否还在
         * 3.exclusive:是否独占,只能有一个消费者监听这个队列    当connection关闭时是否=删除队列
         * 4.aotoDelete:是否自动删除,当没有接收端时是否自动删除 null
         * 5.arguments:参数
         */
        //如果没有hello_world的队列会自动创建,有就不会创建
        channel.queueDeclare("hello_world",true,false,false,null);
        //发送消息
        /**
         * 1.exchange:交换机名称。简单模式用默认的“”
         * 2.routingket:路由名称
         * 3.props:配置信息
         * 4.body:字节数组   发送的数据
         */

        String body = "你好呀,我是于涛";
        channel.basicPublish("","hello_world",null,body.getBytes());
        channel.close();
        connection.close();
    }
}

消费者代码

package com.yutao.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者
 *
 * @author yt
 * @create 2022/10/14 13:39
 */
public class Consumer_HelloWorld{
    public static void main(String[] args)  throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置参数
        factory.setHost("192.168.149.129");
        //端口
        factory.setPort(5672);
        //自己在网页上创建的虚拟机
        factory.setVirtualHost("/itcast");
        //账号密码
        factory.setUsername("yutao");
        factory.setPassword("yutao");

        //创建连接
        Connection connection = factory.newConnection();
        //创建channel
        Channel channel = connection.createChannel();
        /**
         * 创建队列Queue
         * 参数:
         * 1.queue:队列名称
         * 2.durable:是否持久化,当mq重启之后是否还在
         * 3.exclusive:是否独占,只能有一个消费者监听这个队列    当connection关闭时是否=删除队列
         * 4.aotoDelete:是否自动删除,当没有接收端时是否自动删除 null
         * 5.arguments:参数
         */
        //如果没有hello_world的队列会自动创建,有就不会创建
        channel.queueDeclare("hello_world",true,false,false,null);

        //接收消息
        /**
         * 1.queue:队列名称
         * 2.aotoACK:是否自动确认
         * 3.callback:回调对象
         */
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 回调方法。当收到消息之后会自动执行该方法
             * @param consumerTag 标识
             * @param envelope 获取一些信息,交换机,路由Key
             * @param properties 配置信息
             * @param body 数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag = " + consumerTag);
                System.out.println("Exchange = " + envelope.getExchange());
                System.out.println("RoutingKey = " + envelope.getRoutingKey());
                System.out.println("properties = " + properties);
                System.out.println("body = " + new String(body));
            }
        };
        channel.basicConsume("hello_world",true,consumer);


    }


}

抽取工具类

package com.yutao.producer.util;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author yt
 * @create 2022/10/14 14:30
 */
public class RabbitUtils {
    public static ConnectionFactory connectionFactory;

    static {
        connectionFactory = new ConnectionFactory();
        //设置参数
        connectionFactory.setHost("192.168.149.129");
        //端口
        connectionFactory.setPort(5672);
        //自己在网页上创建的虚拟机
        connectionFactory.setVirtualHost("/itcast");
        //账号密码
        connectionFactory.setUsername("yutao");
        connectionFactory.setPassword("yutao");
    }

    public static Connection getConnection(){
        try {
           return connectionFactory.newConnection();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        return null;
    }

    public static void closeConnectionAndchannel(Channel channel,Connection connection){
        try {
            if (channel!=null){
                channel.close();
            }
            if (connection!=null){
                connection.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

}

2.工作模式:work_queues

 多个消费者争抢一条消息消费,一条消息只能被一个消费者消费,相当于对简单模式的拓展

让多个消费者绑定到一个队列共同消费队列中的消息。默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息

生产者代码:发送10条消息

package com.yutao.producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 发送消息
 * 工作队列发送消息,一个发送者发送消息   n个消费者监听  只能有一个消费者消费
 *
 * @author yt
 * @create 2022/10/14 13:40
 */
public class Producer_WorkQueues {
    public static void main(String[] args) throws IOException, TimeoutException {
      
        //创建连接
        Connection connection = RabbitUtils.getConnection();
        //创建channel
        Channel channel = connection.createChannel();
        /**
         * 创建队列Queue
         * 参数:
         * 1.queue:队列名称
         * 2.durable:是否持久化,当mq重启之后是否还在
         * 3.exclusive:是否独占,只能有一个消费者监听这个队列    当connection关闭时是否=删除队列
         * 4.aotoDelete:是否自动删除,当没有接收端时是否自动删除 null
         * 5.arguments:参数
         */
        //如果没有hello_world的队列会自动创建,有就不会创建
        channel.queueDeclare("work_queues", true, false, false, null);
        //发送消息
        /**
         * 1.exchange:交换机名称。简单模式用默认的“”
         * 2.routingket:路由名称
         * 3.props:配置信息
         * 4.body:字节数组   发送的数据
         */

        for (int i = 0; i < 10; i++) {
            String body = i + "你好呀,我是于涛";
            channel.basicPublish("", "work_queues", null, body.getBytes());
        }
        channel.close();
        connection.close();
    }
}

创建两个消费者(代码相同):

package com.yutao.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者
 *
 * @author yt
 * @create 2022/10/14 13:39
 */
public class Consumer_WorkQueues1 {
    public static void main(String[] args)  throws IOException, TimeoutException {
    //创建连接
        Connection connection = RabbitUtils.getConnection();
        //创建channel
        Channel channel = connection.createChannel();
        /**
         * 创建队列Queue
         * 参数:
         * 1.queue:队列名称
         * 2.durable:是否持久化,当mq重启之后是否还在
         * 3.exclusive:是否独占,只能有一个消费者监听这个队列    当connection关闭时是否=删除队列
         * 4.aotoDelete:是否自动删除,当没有接收端时是否自动删除 null
         * 5.arguments:参数
         */
        //如果没有hello_world的队列会自动创建,有就不会创建
        channel.queueDeclare("work_queues",true,false,false,null);

        //接收消息
        /**
         * 1.queue:队列名称
         * 2.aotoACK:是否自动确认
         * 3.callback:回调对象
         */
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 回调方法。当收到消息之后会自动执行该方法
             * @param consumerTag 标识
             * @param envelope 获取一些信息,交换机,路由Key
             * @param properties 配置信息
             * @param body 数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
   /*             System.out.println("consumerTag = " + consumerTag);
                System.out.println("Exchange = " + envelope.getExchange());
                System.out.println("RoutingKey = " + envelope.getRoutingKey());
                System.out.println("properties = " + properties);*/
                System.out.println("body = " + new String(body));
            }
        };
        channel.basicConsume("work_queues",true,consumer);


    }


}

 消费结果

消费者1:

 消费者2

 3.订阅模式:pub/sub

生产者将消息发送给交换机,交换机将消息分散到队列中,然后消费者在对应的队列中消费

交换机(Exchange)有三种:

Fanout 广播:将消息交给所有绑定到交换机的队列

Diect 定向:把消息交给符合指定的key的队列

Topic 通配符:把消息交给符合routing pattern(路由模式的队列)

交换机只负责转发消息,不具备存储消息的能力,没有队列绑定交换机,或者没有路由规则的队列,消息将丢失!!!

生产者代码:

创建交换机
创建两个队列
将两个队列绑定至交换机上
发送消息到交换机上
关闭资源
package com.yutao.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.RabbitUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 发送消息
 * 生产者将消息发送给交换机,交换机将消息分散到队列中,然后消费者在对应的队列中消费
 *
 * 创建交换机
 * 创建两个队列
 * 将两个队列绑定至交换机上
 * 发送消息到交换机上
 * 关闭资源
 *
 * @author yt
 * @create 2022/10/14 13:40
 */
public class Producer_PubSub {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接
        Connection connection = RabbitUtils.getConnection();

        //创建channel 信道
        Channel channel = connection.createChannel();
        //创建交换机
        /**
         * String exchange,交换机名称
         * BuiltinExchangeType type, 交换机类型 @See BuiltinExchangeType 定向 广播 通配符 参数匹配
         * boolean durable, 是否持久化
         * boolean autoDelete, 是否自动删除
         * internal 内部使用 一般是false
         * Map<String, Object> arguments 参数列表
         */
        String ExchangeName = "test_fanout";
        channel.exchangeDeclare(ExchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
        //创建队列
        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);
        //绑定队列和交换机
        /**
         * String queue, 队列名称
         * String exchange, 交换机名称
         * String routingKey 路由key
         */
        channel.queueBind(queue1Name,ExchangeName,"");
        channel.queueBind(queue2Name,ExchangeName,"");
        //发送消息
        String body = "日志信息:张三调用了findAll方法  日志级别为:info";
        channel.basicPublish(ExchangeName,"",null,body.getBytes());
        //释放资源
        RabbitUtils.closeConnectionAndchannel(channel,connection);
    }
}

两个消费者分别监听两个队列即可

消费者一:接收消息保存至数据库

package com.yutao.consumer;

import com.rabbitmq.client.*;
import util.RabbitUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者
 * 接收队列一消息
 *
 * @author yt
 * @create 2022/10/14 13:39
 */
public class Consumer_PubSub1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitUtils.getConnection();
        //创建channel
        Channel channel = connection.createChannel();

        String queue1Name = "test_fanout_queue1";

        //接收消息
        /**
         * 1.queue:队列名称
         * 2.aotoACK:是否自动确认
         * 3.callback:回调对象
         */
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * 回调方法。当收到消息之后会自动执行该方法
             * @param consumerTag 标识
             * @param envelope 获取一些信息,交换机,路由Key
             * @param properties 配置信息
             * @param body 数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body = " + new String(body));
                System.out.println("将日志信息保存至数据库");
            }
        };
        //接收队列一
        channel.basicConsume(queue1Name, true, consumer);


    }


}

 

消费者二:接收消息打印至控制台

package com.yutao.consumer;

import com.rabbitmq.client.*;
import util.RabbitUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者
 * 接收队列二消息
 * @author yt
 * @create 2022/10/14 13:39
 */
public class Consumer_PubSub2 {
    public static void main(String[] args)  throws IOException, TimeoutException {
        Connection connection = RabbitUtils.getConnection();
        //创建channel
        Channel channel = connection.createChannel();

        String queue2Name = "test_fanout_queue2";

        //接收消息
        /**
         * 1.queue:队列名称
         * 2.aotoACK:是否自动确认
         * 3.callback:回调对象
         */
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 回调方法。当收到消息之后会自动执行该方法
             * @param consumerTag 标识
             * @param envelope 获取一些信息,交换机,路由Key
             * @param properties 配置信息
             * @param body 数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body = " + new String(body));
                System.out.println("将日志信息打印至控制台");
            }
        };
        //接收队列二
        channel.basicConsume(queue2Name,true,consumer);


    }


}

 4. 路由模式:Routing

 生产者将消息发送至交换机,不同级别的日志走不同的队列,区分不同的消息,消费者通过订阅不同的队列来处理自己的逻辑

生产者代码:

队列一绑定error级别的日志
队列二绑定info,error,warning级别的日志
队列一只能接收到error的消息
队列二能收到info,error,warning的消息
package com.yutao.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.RabbitUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 发送消息
 *
 *  队列一绑定error级别的日志
 *  队列二绑定info,error,warning级别的日志
 *
 *  队列一只能接收到error的消息
 *  队列二能收到info,error,warning的消息
 * @author yt
 * @create 2022/10/14 13:40
 */
public class Producer_Routing {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接
        Connection connection = RabbitUtils.getConnection();

        //创建channel 信道
        Channel channel = connection.createChannel();
        //创建交换机
        /**
         * String exchange,交换机名称
         * BuiltinExchangeType type, 交换机类型 @See BuiltinExchangeType 定向 广播 通配符 参数匹配
         * boolean durable, 是否持久化
         * boolean autoDelete, 是否自动删除
         * internal 内部使用 一般是false
         * Map<String, Object> arguments 参数列表
         */
        String ExchangeName = "test_direct";
        channel.exchangeDeclare(ExchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
        //创建队列
        String queue1Name = "test_direct_queue1";
        String queue2Name = "test_direct_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);
        //绑定队列和交换机
        /**
         * String queue, 队列名称
         * String exchange, 交换机名称
         * String routingKey 路由key
         */
        //队列一绑定error级别的日志
        channel.queueBind(queue1Name,ExchangeName,"error");
        //队列二绑定info,error,warning级别的日志
        channel.queueBind(queue2Name,ExchangeName,"info");
        channel.queueBind(queue2Name,ExchangeName,"error");
        channel.queueBind(queue2Name,ExchangeName,"warning");
        //发送消息
        String body = "日志信息:张三调用了findAll方法  日志级别为:info";
        //参数二指定绑定的队列
        channel.basicPublish(ExchangeName,"info",null,body.getBytes());
        //释放资源
        RabbitUtils.closeConnectionAndchannel(channel,connection);
    }
}

消费队列一(error)

package com.yutao.consumer;

import com.rabbitmq.client.*;
import util.RabbitUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者
 * 接收队列一消息
 *
 * @author yt
 * @create 2022/10/14 13:39
 */
public class Consumer_Routing1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitUtils.getConnection();
        //创建channel
        Channel channel = connection.createChannel();

        String queue1Name = "test_direct_queue1";

        //接收消息
        /**
         * 1.queue:队列名称
         * 2.aotoACK:是否自动确认
         * 3.callback:回调对象
         */
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * 回调方法。当收到消息之后会自动执行该方法
             * @param consumerTag 标识
             * @param envelope 获取一些信息,交换机,路由Key
             * @param properties 配置信息
             * @param body 数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body = " + new String(body));
                System.out.println("将日志信息保存至数据库");
            }
        };
        //接收队列一
        channel.basicConsume(queue1Name, true, consumer);


    }


}

无法收到info的消息 ,因为绑定的是error

消费者二(info,error,warning)

package com.yutao.consumer;

import com.rabbitmq.client.*;
import util.RabbitUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者
 * 接收队列一消息
 *
 * @author yt
 * @create 2022/10/14 13:39
 */
public class Consumer_Routing2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitUtils.getConnection();
        //创建channel
        Channel channel = connection.createChannel();

        String queue2Name = "test_direct_queue2";

        //接收消息
        /**
         * 1.queue:队列名称
         * 2.aotoACK:是否自动确认
         * 3.callback:回调对象
         */
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * 回调方法。当收到消息之后会自动执行该方法
             * @param consumerTag 标识
             * @param envelope 获取一些信息,交换机,路由Key
             * @param properties 配置信息
             * @param body 数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body = " + new String(body));
                System.out.println("将日志信息打印至控制台");
            }
        };
        //接收队列一
        channel.basicConsume(queue2Name, true, consumer);


    }


}

可以收到

当生产者发送error级别的消息时

 

 队列一能收到

 队列二能收到

 Routing模式要求队列在绑定交换机时需要指定routing key,消费会转发到符合routing key的队列

 5.通配符模式:Topics

 生产者生产消息发送至交换机,交换机通过通配符将消息发送给能匹配上的队列,再由订阅队列的消费者进行消费

生产者代码:

package com.yutao.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.RabbitUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 发送消息
 *根据通配符发送指定队列
 * @author yt
 * @create 2022/10/14 13:40
 */
public class Producer_Topics {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接
        Connection connection = RabbitUtils.getConnection();

        //创建channel 信道
        Channel channel = connection.createChannel();
        //创建交换机
        /**
         * String exchange,交换机名称
         * BuiltinExchangeType type, 交换机类型 @See BuiltinExchangeType 定向 广播 通配符 参数匹配
         * boolean durable, 是否持久化
         * boolean autoDelete, 是否自动删除
         * internal 内部使用 一般是false
         * Map<String, Object> arguments 参数列表
         */
        String ExchangeName = "test_topic";
        channel.exchangeDeclare(ExchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
        //创建队列
        String queue1Name = "test_topic_queue1";
        String queue2Name = "test_topic_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);
        //绑定队列和交换机
        /**
         * String queue, 队列名称
         * String exchange, 交换机名称
         * String routingKey 路由key
         */
        //队列一绑定所有以error为结尾的日志,和绑定order后面所有的日志
        channel.queueBind(queue1Name,ExchangeName,"#.error");
        channel.queueBind(queue1Name,ExchangeName,"order.*");
        //队列二绑定所有级别的日志
        channel.queueBind(queue2Name,ExchangeName,"*.*");
        //发送消息
        String body = "日志信息:张三调用了delete方法  日志级别为:order.info";
        //参数二指定绑定的队列
        channel.basicPublish(ExchangeName,"order.info",null,body.getBytes());
        //释放资源
        RabbitUtils.closeConnectionAndchannel(channel,connection);
    }
}

消费者一(队列:test_topic_queue1)

package com.yutao.consumer;

import com.rabbitmq.client.*;
import util.RabbitUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者
 * 接收队列一消息
 *
 * @author yt
 * @create 2022/10/14 13:39
 */
public class Consumer_Topic1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitUtils.getConnection();
        //创建channel
        Channel channel = connection.createChannel();

        String queue1Name = "test_topic_queue1";

        //接收消息
        /**
         * 1.queue:队列名称
         * 2.aotoACK:是否自动确认
         * 3.callback:回调对象
         */
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * 回调方法。当收到消息之后会自动执行该方法
             * @param consumerTag 标识
             * @param envelope 获取一些信息,交换机,路由Key
             * @param properties 配置信息
             * @param body 数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body = " + new String(body));
                System.out.println("将日志信息保存至数据库");
            }
        };
        //接收队列一
        channel.basicConsume(queue1Name, true, consumer);


    }


}

消费者二(队列:test_topic_queue2)

package com.yutao.consumer;

import com.rabbitmq.client.*;
import util.RabbitUtils;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者
 * 接收队列一消息
 *
 * @author yt
 * @create 2022/10/14 13:39
 */
public class Consumer_Topic2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = RabbitUtils.getConnection();
        //创建channel
        Channel channel = connection.createChannel();

        String queue2Name = "test_topic_queue2";

        //接收消息
        /**
         * 1.queue:队列名称
         * 2.aotoACK:是否自动确认
         * 3.callback:回调对象
         */
        Consumer consumer = new DefaultConsumer(channel) {
            /**
             * 回调方法。当收到消息之后会自动执行该方法
             * @param consumerTag 标识
             * @param envelope 获取一些信息,交换机,路由Key
             * @param properties 配置信息
             * @param body 数据
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body = " + new String(body));
                System.out.println("将日志信息保存至数据库");
            }
        };
        //接收队列一
        channel.basicConsume(queue2Name, true, consumer);


    }


}

 通配符和定向相比,通配符更加灵活!

SpringBoot整合RabbitMq

生产者:

创建工程,导入依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.3.9.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <version>2.3.10.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

在application.yml中导入配置信息

#基本信息
spring:
  rabbitmq:
    #ip
    host: 192.168.149.129
    #端口
    port: 5672
    username: guest
    password: guest
    #虚拟机
    virtual-host: /

创建rabbitmq的配置,创建交换机,并绑定队列,这里演示的是通配符的配置

package com.yutao.rabbitmq.config;


import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 主要是创建一个交换机,创建一个队列,将二者绑定上即可,这里只简单列出方法,后续可以在此基础上进行拓展
 * @author yt
 * @create 2022/10/14 16:44
 */
@Configuration
public class RabbitMQConfig {
    //交换机名称
    public static final String EXCANGE_NAME = "boot_topic_echange";
    //队列名称
    public static final String QUEUE_NAME = "boot_queue";

    //交换机
    @Bean("bootExchange")
    public Exchange bootExchange() {
        //创建通配符交换机
        return ExchangeBuilder.topicExchange(EXCANGE_NAME).durable(true).build();
    }

    //队列
    @Bean("bootQueue")
    public Queue bootQueue() {
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    //绑定关系

    /**
     * 那个队列和那个交换机进行绑定.最后with的是routing Key  这里展示的是通配符的配置
     */
    @Bean
    public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
    }

}

创建启动类

package com.yutao;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author yt
 * @create 2022/10/14 16:43
 */
@SpringBootApplication
public class ProducerAppliction {
    public static void main(String[] args) {
        SpringApplication.run(ProducerAppliction.class,args);
    }
}

创建测试类,导入rabbitTemplate,进行消息发送

convertAndSend参数分别是:交换机名称,routingkey,发送的消息
package com.yutao;

import com.yutao.rabbitmq.config.RabbitMQConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * @author yt
 * @create 2022/10/14 16:55
 */
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSend() {
        //convertAndSend参数分别是:交换机名称,routingkey,发送的消息
        String msg = "springboot 整合 rabbitmq";
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCANGE_NAME, "boot.haha", msg);
    }
}

消息发送成功!

消费者:

创建工程,引入依赖

  <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.3.9.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <version>2.3.10.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

编写application.yml文件

#基本信息
spring:
  rabbitmq:
    #ip
    host: 192.168.149.129
    #端口
    port: 5672
    username: guest
    password: guest
    #虚拟机
    virtual-host: /

创建启动类

package com.yutao;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author yt
 * @create 2022/10/14 17:13
 */
@SpringBootApplication
public class ConsumerAppliction {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerAppliction.class,args);
    }
}

创建监听队列的方法,注意要注入spring容器

package com.yutao.rabbitmq.consumer;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author yt
 * @create 2022/10/14 17:15
 */
@Component
public class RabbitMQListener {
    @RabbitListener(queues = "boot_queue")
    public void ListenerQueue(Message message) {
        System.out.println("message = " + message);
        System.out.println("收到消息为:" + new String(message.getBody()));

    }
}

启动之后即可收到消息

消息丢失:

 在异常情况下,比如交换机挂了,消费者挂了,都会导致消息丢失的情况

1.生产者到rabbitmq消息丢失,这个可以用手动ack确认来解决

2.消息到达rabbitmq中丢失,这个可以用rabbit中的持久化来解决

3.rabbitmq到消费者过程丢失,不能判断是否被消费,这个也可以用回调的方式来解决

消息生产者到rabbitMq手动ACK确认

在原有的生产者代码的application.yml文件中增加如下配置

spring:
 rabbitmq:
    #ip
   host: 120.27.224.146 #mq服务器的ip地址
   port: 5678
   virtual-host: /sbzq
   username: kaying
   password: hs3PGVzZD1rmrdSx
   connection-timeout: 15000
    //发送消息到交换机后会触发回调方法
   publisher-confirm-type: correlated

添加回调方法即可显示成功与失败



import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * 实现 RabbitTemplate.ConfirmCallback 接口 实现里面的回调方法
 * 将 rabbitTemplate 用init方法注入到 ConfirmCallback中即可使用  @PostConstruct注解是将rabbitTemplate注入
 * @author yt
 * @create 2022/12/29 15:09
 */
@Component
public class Callback implements RabbitTemplate.ConfirmCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    //注入
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     *
     * @param correlationData  保存了回调信息的id及相关信息(可以自己将发送的消息放到里面)
     * @param ack 交换机是否收到消息true=收到了,false=没收到
     * @param cause 失败原因,成功=null,失败会有失败原因,可以作为日志保存
     */
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) { // 消息投递到broker 的状态,true表示成功
            System.out.println("消息发送到Broker成功!");
            System.out.println("correlationData = " + correlationData);
            System.out.println("ack = " + ack);
            System.out.println("cause = " + cause);
        } else { // 发送异常
            System.out.println("发送失败");
            System.out.println("correlationData = " + correlationData);
            System.out.println("ack = " + ack);
            System.out.println("cause = " + cause);
        }
    }

}

结果:

如果交换机到队列的时候找不到队列,将会删除消息造成消息丢失

配置文件中增加

#基本信息
spring:
 rabbitmq:
    #ip
   host: 120.27.224.146 #mq服务器的ip地址
   port: 5678
   virtual-host: /sbzq
   username: kaying
   password: hs3PGVzZD1rmrdSx
   connection-timeout: 15000
   publisher-confirm-type: correlated
    //回退消息
   publisher-returns: true

实现回退接口并注入

package com.yutao.rabbitmq.config;

import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * 实现 RabbitTemplate.ConfirmCallback 接口 实现里面的回调方法
 * 将 rabbitTemplate 用init方法注入到 ConfirmCallback中即可使用  @PostConstruct注解是将rabbitTemplate注入
 * @author yt
 * @create 2022/12/29 15:09
 */
@Component
public class Callback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //注入
    @PostConstruct
    public void init(){
        //注入回调
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    /**
     *
     * @param correlationData  保存了回调信息的id及相关信息(可以自己将发送的消息放到里面)
     * @param ack 交换机是否收到消息true=收到了,false=没收到
     * @param cause 失败原因,成功=null,失败会有失败原因,可以作为日志保存
     */
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) { // 消息投递到broker 的状态,true表示成功
            System.out.println("消息发送到Broker成功!");
            System.out.println("correlationData = " + correlationData);
            System.out.println("ack = " + ack);
            System.out.println("cause = " + cause);
        } else { // 发送异常
            System.out.println("发送失败");
            System.out.println("correlationData = " + correlationData);
            System.out.println("ack = " + ack);
            System.out.println("cause = " + cause);
        }
    }

    //在消息不可达消费者时将会退给生产者,只有不可达目的地时才会回退
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        System.out.println("returnedMessage = " + returnedMessage);
    }
}

结果