zl程序教程

您现在的位置是:首页 >  其他

当前栏目

高性能消息队列中间件MQ

2023-03-07 09:15:36 时间

毕业后工作半年,在自己的讲课中需要介绍消息队列,以前在大学也有经常接触message queen,但却还不够深入了解掌握,这次写个专门针对mq的文章理清头绪。

以下是学习mq的知识框架,我会不定时更新补充


RabbitMQ概念_MQ

消息队列

MQ全称Message Queue(消息队列),是在消息的传输过程中保 存消息的容器。多用于系统之间的异步通信 同步通信相当于两个人当面对话,你一言我一语。必须及时回复。

异步通信相当于通过第三方转述对话,可能有消息的延迟,但不需要二人时刻保持联系。

消息

两台计算机间传送的数据单位。消息可以非常简单,例如只包含文 本字符串;也可以更复杂,可能包含嵌入对象。

队列

数据结构中概念。在队列中,数据先进先出,后进后出。


RabbitMQ概念_MQ的优势

应用解耦

在电商平台中,用户下订单需要调用订单系统,此时订单系统还需 要调用库存系统、支付系统、物流系统完成业务。此时会产生两个 问题:

  1. 如果库存系统出现故障,会造成整个订单系统崩溃。
  2. 如果需求修改,新增了一个X系统,此时必须修改订单系统的代码。

如果在系统中引入MQ,即订单系统将消息先发送到MQ中,MQ再 转发到其他系统,则会解决以下问题:

  1. 由于订单系统只发消息给MQ,不直接对接其他系统,如果库存系统出现故障,不影响整个订单。
  2. 如果需求修改,新增了一个X系统,此时无需修改订单系统的代码,只需修改MQ将消息发送给X系统即可。

异步提速

如果订单系统同步访问每个系统,则用户下单等待时长如下:

如果引入MQ,则用户下单等待时长如下:

削峰填谷

假设我们的系统每秒只能承载1000请求,如果请求瞬间增多到每秒 5000,则会造成系统崩溃。此时引入mq即可解决该问题

使用了MQ之后,限制消费消息的速度为1000,这样一来,高峰期 产生的数据势必会被积压在MQ中,高峰就被“削”掉了,但是因为消 息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持 在1000,直到消费完积压的消息,这就叫做“填谷”。


RabbitMQ概念_MQ的劣势

系统可用性降低 系统引入的外部依赖越多,系统稳定性越差。一旦MQ宕机,就会对业务造成影响。

系统复杂度提高 MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调用。

一致性问题 A系统处理完业务,通过MQ给B、C、D三个系统发消息数据,如果B系统、C系统处理成功,D系统处理失败,则会造成数据处理的不一致。


# RabbitMQ概念_MQ应用场景

抢红包、秒杀活动、抢火车票等

这些业务场景都是短时间内需要处理大量请求,如果直接连接系统处理业务,会耗费大量资源,有可能造成系统瘫痪

而使用MQ后,可以先让用户将请求发送到MQ中,MQ会先保存请求消息,不会占用系统资源,且MQ会进行消息排序,先请求的秒杀成功,后请求的秒杀失败。

消息分发

如电商网站要推送促销信息,该业务耗费时间较多,但对时效性要求不高,可以使用MQ做消息分发。

数据同步

假如我们需要将数据保存到数据库之外,还需要一段时间将数据同步到缓存(如Redis)、搜索引擎(如Elasticsearch)中。此时可以将数据库的数据作为消息发送到MQ中,并同步到缓存、搜索引擎中。

异步处理

在电商系统中,订单完成后,需要及时的通知子系统(进销存系统发货,用户服务积分,发送短信)进行下一步操作。为了保证订单系统的高性能,应该直接返回订单结果,之后让MQ通知子系统做其他非实时的业务操作。这样能保证核心业务的高效及时。

离线处理

在银行系统中,如果要查询近十年的历史账单,这是非常耗时的 操作。如果发送同步请求,则会花费大量时间等待响应。此时使 用MQ发送异步请求,等到查询出结果后获取结果即可。


RabbitMQ概念_AMQP

RabbitMQ是由Erlang语言编写的基于AMQP的MQ产品。

AMQP

即Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,专门为消息中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受不同中间件产品,不同开发语言等条件的限制。2006年AMQP规范发布,类比HTTP。

AMQP工作过程

生产者(Publisher)将消息发布到交换机(Exchange),交换机根据规 则将消息分发给交换机绑定的队列(Queue),队列再将消息投递给 订阅了此队列的消费者。


RabbitMQ概念_RabbitMQ工作原理

Producer 消息的生产者。也是一个向交换机发布消息的客户端应用程序。 Connection 连接。生产者/消费者和RabbitMQ服务器之间建立的TCP连接。 Channel 信道。是TCP里面的虚拟连接。例如:Connection相当于电缆,Channel相当于独立光纤束,一条TCP连接中可以创建多条信道,增加连接效率。无论是发布消息、接收消息、订阅队列都是通过信道完成的。 Broker 消息队列服务器实体。即RabbitMQ服务器 Virtual host 虚拟主机。出于多租户和安全因素设计的,把AMQP的基本组件 划分到一个虚拟的分组中。每个vhost本质上就是一个mini版的 RabbitMQ服务器,拥有自己的队列、交换机、绑定和权限机 制。当多个不同的用户使用同一个RabbitMQ服务器时,可以划 分出多个虚拟主机。RabbitMQ默认的虚拟主机路径是 / Exchange 交换机。用来接收生产者发送的消息,并根据分发规则,将这些 消息分发给服务器中的队列中。不同的交换机有不同的分发规 则。 Queue 消息队列。用来保存消息直到发送给消费者。它是消息的容器, 也是消息的终点。消息一直在队列里面,等待消费者链接到这个 队列将其取走。 Binding 消息队列和交换机之间的虚拟连接,绑定中包含路由规则,绑定 信息保存到交换机的路由表中,作为消息的分发依据。 Consumer 消息的消费者。表示一个从消息队列中取得消息的客户端应用程 序。

RabbitMQ为什么使用信道而不直接使用TCP连接通信? TCP连接的创建和销毁开销特别大。创建需要3次握手,销毁需 要4次分手。高峰时每秒成千上万条TCP连接的创建会造成资源 巨大的浪费。而且操作系统每秒处理TCP连接数也是有限制的, 会造成性能瓶颈。而如果一条线程使用一条信道,一条TCP链接 可以容纳无限的信道,即使每秒成千上万的请求也不会成为性 能的瓶颈。


RabbitMQ安装_安装Erlang

RabbitMQ安装_安装RabbitMQ

RabbitMQ安装_账户管理

RabbitMQ安装_管控台

RabbitMQ安装_Docker安装

我这里就不教了,大家自行百度哈 ps:大家如果实在不会,又比较需要学习怎么安装,那就多@我,留评论我再来补充(手动狗头),毕竟这篇文章是偏概念和理清mq是个啥的


RabbitMQ简单模式_概念

RabbitMQ共有六种工作模式:简单模式(Simple)、工作队列模式(Work Queue)、发布订阅模式(Publish/Subscribe)、路由模式(Routing)、通配符模式(Topics)、远程调用模式(RPC,不常用,课程不对此模式进行讲解)

首先我们讲解简单模式:

特点:

  1. 一个生产者对应一个消费者,通过队列进行消息传递。
  2. 该模式使用direct交换机,direct交换机是RabbitMQ默认交换机。

RabbitMQ简单模式_项目搭建

接下来我们使用JAVA代码操作RabbitMQ,让其按照简单模式进行 工作。

JMS

由于MQ产品很多,操作方式各有不同,于是JAVA提供了一套规则 ——JMS,用于操作消息中间件。JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件的API。JMS是JavaEE规范中的一种,类比JDBC。很多MQ产品都实现了JMS规范,例如ActiveMQ。RabbitMQ官方并没有实现JMS规范,但是开源社区有JMS的实现包

创建项目

1.启动RabbitMQ

# 开启管控台插件
rabbitmq-plugins enable
rabbitmq_management
# 启动rabbitmq
rabbitmq-server -detached

2.创建普通maven项目,添加RabbitMQ依赖:

<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqpclient</artifactId>
        <version>5.14.0</version>
    </dependency>
</dependencies>

RabbitMQ简单模式_编写生产者

接下来我们编写生产者代码创建消息:

// 生产者
public class Producer {
    public static void main(String[] args)
throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory
= new ConnectionFactory();
connectionFactory.setHost("192.168.0.162");
7        connectionFactory.setPort(5672);
8      
connectionFactory.setUsername("itbaizhan");
9      
connectionFactory.setPassword("itbaizhan");
10      
connectionFactory.setVirtualHost("/");
11        // 2.创建连接
12        Connection connection =
connectionFactory.newConnection();
13        // 3.建立信道
14        Channel channel =
connection.createChannel();
15        // 4.创建队列,如果队列已存在,则使用该队列
16        /**
17         * 参数1:队列名
18         * 参数2:是否持久化,true表示MQ重启后队列
还在。
19         * 参数3:是否私有化,false表示所有消费者
都可以访问,true表示只有第一次拥有它的消费者才能访问
20         * 参数4:是否自动删除,true表示不再使用队
列时自动删除队列
21         * 参数5:其他额外参数
22         */
23      
channel.queueDeclare("simple_queue",false,f
alse,false,null);
24        // 5.发送消息
25        String message = "hello!rabbitmq!";
26        /**
27         * 参数1:交换机名,""表示默认交换机
28         * 参数2:路由键,简单模式就是队列名
29         * 参数3:其他额外参数
30         * 参数4:要传递的消息字节数组
31         */
32      
channel.basicPublish("","simple_queue",null
,message.getBytes());
33        // 6.关闭信道和连接
34        channel.close();
35        connection.close();
36        System.out.println("===发送成功===");
37   }
38 }

运行生产者后,我们可以看到在RabbitMQ中创建了队列,队列中 已经有了消息。


RabbitMQ简单模式_编写消费者

接下来我们编写消费者代码消费消息:

// 消费者
public class Consumer {
    public static void main(String[] args)
throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory
= new ConnectionFactory();
      
connectionFactory.setHost("192.168.0.162");
        connectionFactory.setPort(5672);
      
connectionFactory.setUsername("itbaizhan");
9      
connectionFactory.setPassword("itbaizhan");
10      
connectionFactory.setVirtualHost("/");
11        // 2.创建连接
12        Connection connection =
connectionFactory.newConnection();
13        // 3.建立信道
14        Channel channel =
connection.createChannel();
15        // 4.监听队列
16        /**
17         * 参数1:监听的队列名
18         * 参数2:是否自动签收,如果设置为false,
则需要手动确认消息已收到,否则MQ会一直发送消息
19         * 参数3:Consumer的实现类,重写该类方法
表示接受到消息后如何消费
20         */
21      
channel.basicConsume("simple_queue",true,ne
w DefaultConsumer(channel){
22            @Override
23            public void
handleDelivery(String consumerTag, Envelope
envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
24                String message = new
String(body, "UTF-8");
25                System.out.println("接受消息,
消息为:"+message);
26           }
27       });
		}
	}

运行消费者后,我们可以看到在RabbitMQ中的消息已经被消费。


RabbitMQ工作队列模式_概念

与简单模式相比,工作队列模式(Work Queue)多了一些消费者,该模式也使用direct交换机,应用于处理消息较多的情况。特点如下

  1. 一个队列对应多个消费者.
  2. 一条消息只会被一个消费者消费。
  3. 消息队列默认采用轮询的方式将消息平均发送给消费者。

RabbitMQ工作队列模式_编写生产者

接下来我们编写生产者代码创建大量消息:

public class Producer {
    public static void main(String[] args)
throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory
= new ConnectionFactory();
      
connectionFactory.setHost("192.168.0.162");
        connectionFactory.setPort(5672);
      
connectionFactory.setUsername("itbaizhan");
      
connectionFactory.setPassword("itbaizhan");
      
connectionFactory.setVirtualHost("/");
        // 2.创建连接
                Connection connection =
connectionFactory.newConnection();
        // 3.建立信道
        Channel channel =
connection.createChannel();
        // 4.创建队列,持久化队列
      
channel.queueDeclare("work_queue",true,fals
e,false,null);
        // 5.发送大量消息,参数3表示该消息为持久化
消息,即除了保存到内存还会保存到磁盘中
        for (int i = 1; i <= 100; i++) {
          
channel.basicPublish("","work_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
                   ("你好,这是今天的第"+i+"条
消息").getBytes());
       }
        // 6.关闭资源
        channel.close();
        connection.close();
   }
}

效果如下:


RabbitMQ工作队列模式_编写消费者

接下来我们编写三个消费者监听同一个队列:

// 消费者1
public class Consumer1 {
    public static void main(String[] args)
throws IOException, TimeoutException {
        // 1.创建连接工厂
        ConnectionFactory connectionFactory
= new ConnectionFactory();
6      
connectionFactory.setHost("192.168.0.162");
7        connectionFactory.setPort(5672);
8      
connectionFactory.setUsername("itbaizhan");
9      
connectionFactory.setPassword("itbaizhan");
10      
connectionFactory.setVirtualHost("/");
11        // 2.创建连接
12        Connection connection =
connectionFactory.newConnection();
13        // 3.建立信道
14        Channel channel =
connection.createChannel();
15        // 4.监听队列,处理消息
16      
channel.basicConsume("work_queue",true,new
DefaultConsumer(channel){
17            @Override
18            public void
handleDelivery(String consumerTag, Envelope
envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
19                String message = new
String(body, "UTF-8");
20                System.out.println("消费者1消
费消息,消息为:"+message);
21           }
22       });
   }
24 }
25
26 // 消费者2
27 public class Consumer2 {
28    public static void main(String[] args)
throws IOException, TimeoutException {
29        // 1.创建连接工厂
30        ConnectionFactory connectionFactory
= new ConnectionFactory();
31      
connectionFactory.setHost("192.168.0.162");
32        connectionFactory.setPort(5672);
33      
connectionFactory.setUsername("itbaizhan");
34      
connectionFactory.setPassword("itbaizhan");
35      
connectionFactory.setVirtualHost("/");
36        // 2.创建连接
37        Connection connection =
connectionFactory.newConnection();
38        // 3.建立信道
39        Channel channel =
connection.createChannel();
40        // 4.监听队列,处理消息
41      
channel.basicConsume("work_queue",true,new
DefaultConsumer(channel){
42            @Override
43            public void
handleDelivery(String consumerTag, Envelope
envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
44                String message = new
String(body, "UTF-8");
45                System.out.println("消费者2消
费消息,消息为:"+message);
46           }
47       });
48   }
49 }
50
51 // 消费者3
52 public class Consumer3 {
53    public static void main(String[] args)
throws IOException, TimeoutException {
54        // 1.创建连接工厂
55        ConnectionFactory connectionFactory
= new ConnectionFactory();
56      
connectionFactory.setHost("192.168.0.162");
57        connectionFactory.setPort(5672);
58      
connectionFactory.setUsername("itbaizhan");
59      
connectionFactory.setPassword("itbaizhan");
60      
connectionFactory.setVirtualHost("/");
61        // 2.创建连接
62        Connection connection =
connectionFactory.newConnection();
63        // 3.建立信道
64        Channel channel =
connection.createChannel();
65        // 4.监听队列,处理消息
66      
channel.basicConsume("work_queue",true,new
DefaultConsumer(channel){
67            @Override
68            public void
handleDelivery(String consumerTag, Envelope
envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
69                String message = new
String(body, "UTF-8");
70                System.out.println("消费者3消
费消息,消息为:"+message);
71           }
72       });
73   }
74 }

效果如下:

RabbitMQ发布订阅模式_概念

在开发过程中,有一些消息需要不同消费者进行不同的处理,如电商网站的同一条促销信息需要短信发送、邮件发送、站内信发送等。此时可以使用发布订阅模式(Publish/Subscribe)

特点

  1. 生产者将消息发送给交换机,交换机将消息转发到绑定此交换机的每个队列中。
  2. 工作队列模式的交换机只能将消息发送给一个队列,发布订阅模式的交换机能将消息发送给多个队列。发布订阅模式使用fanout交换机。

RabbitMQ发布订阅模式_编写生产者

接下来我们编写发布订阅模式的生产者:

// 生产者
public class Producer {
    public static void main(String[] args)
throws IOException, TimeoutException {
4        // 1.创建连接工厂
5        ConnectionFactory connectionFactory
= new ConnectionFactory();
6      
connectionFactory.setHost("192.168.0.162");
7        connectionFactory.setPort(5672);
8      
connectionFactory.setUsername("itbaizhan");
9      
connectionFactory.setPassword("itbaizhan");
10      
connectionFactory.setVirtualHost("/");
11        // 2.创建连接
12        Connection connection =
connectionFactory.newConnection();
13        // 3.建立信道
14        Channel channel =
connection.createChannel();
15        // 4.创建交换机
16        /**
17         * 参数1:交换机名
18         * 参数2:交换机类型
19         * 参数3:交换机持久化
20         */
21      
channel.exchangeDeclare("exchange_fanout",
BuiltinExchangeType.FANOUT,true);
22        // 5.创建队列
23      
channel.queueDeclare("SEND_MAIL",true,false
,false,null);
24      
channel.queueDeclare("SEND_MESSAGE",true,fa
lse,false,null);
25      
channel.queueDeclare("SEND_STATION",true,fa
lse,false,null);
26        // 6.交换机绑定队列
27        /**
28         * 参数1:队列名
29         * 参数2:交换机名
30         * 参数3:路由关键字,发布订阅模式写""即可
31         */
32      
channel.queueBind("SEND_MAIL","exchange_fan
out","");
33      
channel.queueBind("SEND_MESSAGE","exchange_
fanout","");
34      
channel.queueBind("SEND_STATION","exchange_
fanout","");
35        // 7.发送消息
36        for (int i = 1; i <= 10 ; i++) {
37          
channel.basicPublish("exchange_fanout","",n
ull,
38                   ("你好,尊敬的用户,秒杀商品
开抢
了!"+i).getBytes(StandardCharsets.UTF_8));
39       }
40        // 8.关闭资源
41        channel.close();
        connection.close();
   }
}
42

效果如下


RabbitMQ发布订阅模式_编写消费者

接下来编写三个消费者,分别监听各自的队列

// 站内信消费者
public class CustomerStation {
    public static void main(String[] args)
throws IOException, TimeoutException {
4        // 1.创建连接工厂
5        ConnectionFactory connectionFactory
= new ConnectionFactory();
6      
connectionFactory.setHost("192.168.0.162");
7        connectionFactory.setPort(5672);
8      
connectionFactory.setUsername("itbaizhan");
9      
connectionFactory.setPassword("itbaizhan");
10      
connectionFactory.setVirtualHost("/");// 默
认虚拟机
11        //2.创建连接
12        Connection conn =
connectionFactory.newConnection();
13        //3.建立信道
14        Channel channel =
conn.createChannel();
15        // 4.监听队列
16        channel.basicConsume("SEND_STATION",
true, new DefaultConsumer(channel) {
17            @Override
18            public void
handleDelivery(String consumerTag, Envelope
envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
19                String message = new
String(body, "utf-8");
                System.out.println("发送站内
信:"+message);
21           }
22       });
23   }
24 }
25
26 // 邮件消费者
27 public class Customer_Mail {
28    public static void main(String[] args)
throws IOException, TimeoutException {
29        // 1.创建连接工厂
30        ConnectionFactory connectionFactory
= new ConnectionFactory();
31      
connectionFactory.setHost("192.168.0.162");
32        connectionFactory.setPort(5672);
33      
connectionFactory.setUsername("itbaizhan");
34      
connectionFactory.setPassword("itbaizhan");
35      
connectionFactory.setVirtualHost("/");// 默
认虚拟机
36        //2.创建连接
37        Connection conn =
connectionFactory.newConnection();
38        //3.建立信道
39        Channel channel =
conn.createChannel();
40        // 4.监听队列
        channel.basicConsume("SEND_MAIL",
true, new DefaultConsumer(channel) {
42            @Override
43            public void
handleDelivery(String consumerTag, Envelope
envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
44                String message = new
String(body, "utf-8");
45                System.out.println("发送邮
件:"+message);
46           }
47       });
48   }
49 }
50
51 // 短信消费者
52 public class Customer_Message {
53    public static void main(String[] args)
throws IOException, TimeoutException {
54        // 1.创建连接工厂
55        ConnectionFactory connectionFactory
= new ConnectionFactory();
56      
connectionFactory.setHost("192.168.0.162");
57        connectionFactory.setPort(5672);
58      
connectionFactory.setUsername("itbaizhan");
59      
connectionFactory.setPassword("itbaizhan");
60      
connectionFactory.setVirtualHost("/");// 默
认虚拟机
61        //2.创建连接
62        Connection conn =
connectionFactory.newConnection();
63        //3.建立信道
64        Channel channel =
conn.createChannel();
65        // 4.监听队列
66        channel.basicConsume("SEND_MESSAGE",
true, new DefaultConsumer(channel) {
67            @Override
68            public void
handleDelivery(String consumerTag, Envelope
envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
69                String message = newString(body, "utf-8");
70                System.out.println("发送短信:"+message);
71           }
72       });
73   }
74 }