zl程序教程

您现在的位置是:首页 >  其他

当前栏目

RabbitMQ基础介绍及同步通讯及异步通讯

2023-04-18 16:50:04 时间

一、同步通讯与异步通讯


大多数情况下会使用同步,对并发没有很高的要求,但是对时效性有很高的要求,因为我希望我查询到的信息立马就在下面的业务中用到,那必须得用同步调用(因为异步调用只是通知干一件事情,干完之后又不会告诉我,我们没有办法等待异步通讯的结果)

而如果说不需要对方的结果,只是希望对方能干一件事情,对吞吐量的要求、并发的要求较高,还希望解除服务间的耦合关系,那此时应该使用异步通信

1.1 同步通讯


微服务间基于Feign的调用就属于同步方式,存在一些问题

优点:

时效性较强,可以立即得到结果

缺点

  • 耦合度高:每次加入新的需求,都要修改原来的代码

  • 性能下降:调用者需要等待服务提供者响应,如果调用链过长则相应时间等于每次调用的时间之和。

  • 资源浪费:调用链中的每个服务在等待响应过程中,不能释放请求占用的资源,高并发场景下会极度浪费系统资源

  • 级联失败:如果服务提供者出现问题,所有调用方都会跟着出问题,迅速导致整个微服务集群故障

1.2 异步通讯


异步调用常见实现就是事件驱动模式

  • 服务解耦合

当我们不需要电信服务之后,我们就让Broker不通知短信服务,此时就取消了短信服务,解除了服务间耦合

  • 性能提升,吞吐量提高

因为之前我们支付服务要调取订单服务、仓储服务、短信服务等,每个服务都有一定的耗时,总耗时就是四个服务的耗时之和;

而现在支付服务向broker发布事件,这个时候支付服务就可以立即结束告诉用户支付成功了(后续的订饭服务、仓储服务、短信服务和支付服务是没有关系的),后续的服务由Broker去通知他们完成,而他们什么时候完成,耗时多久和支付服务并没有任何的关系,只能能做完就行

因此业务的总耗时变成了支付时间耗时+发布支付成功事件耗时

此时我们支付服务的总耗时是60ms

  • 服务没有强依赖,不担心级联失败问题

比如仓储服务挂掉了,但是和支付服务没关系了。支付服务发布完就结束了,所以后面的所有的服务挂掉了都和支付服务没有关系

  • 流量削峰

Broker就像一个大坝,压力都由Broker扛着

比如现在很多用户都在进行支付服务,有着更多的请求。假设订单服务、仓储服务、短信服务都只能一次处理一个请求,来个多个请求之后一次性处理不了,此时Borker就起到了一个缓冲的作用,将多余的请求拦截下来,并根据服务一次能处理几个请求分发给服务,这样以来服务处理请求的速度一直按照自己的能力来。

  • 依赖于Broker的可靠性、安全性、吞吐能力

假如Broker崩掉之后也会对后续的服务造成很大的影响

  • 架构复杂了,业务没有明显的流程线,不好追踪管理

将来出了问题不好排查

1.3 MQ常见技术介绍


MQ (MessageQueue),中文是消息队列,字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。

支持的协议越多,以后能干的事情就越多

RabbitMQ、RocketMQ、Kafka都是能够支持分布式集群

二、RabbitMQ介绍与安装(Windows)


官方网站: Messaging that just works — RabbitMQ

安装笔记借鉴:

https://blog.csdn.net/qq_25919879/article/details/113055350?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522167875575016800192257608%2522%252C%2522scm%2522%253A%252220140713.130102334..%2522%257D&request_id=167875575016800192257608&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~top_positive~default-1-113055350-null-null.142^v73^insert_down1,201^v4^add_ask,239^v2^insert_chatgpt&utm_term=rabbitmq%E5%AE%89%E8%A3%85&spm=1018.2226.3001.4187

我出现的问题是运行RabbitMQ之后无法查看web页面
解决方案:
rabbitmq-plugins enable rabbitmq_management
执行完这行代码后,需要进入sbin目录下开启rabbitmq-server.bat,此时进入浏览器才能显示rabbit MQ界面
如果直接查看status会报错,显示rabbitMQ没有运行。
一定要确定RabbitMQ服务是启动的

点击下面的页面进入到下面的RabbitMQ

RabbitMQ Management

进入之前需要先登录,默认的登陆账号和密码都是guest

  • Overview是总览,主要记录结点的详细信息

  • Connections连接

将来不论是消息的发送这还是消息的接收着,都应该和RabbitMQ建立连接

  • Channels通道

将来建立连接之后一定要建立一个通道,不论是生产者还是消费者,基于Channels完成消息的发送和接收

  • Exchanges 交换机(路由器)

  • Queues 队列

队列就是来做消息存储的,目前还没有任何消息

  • Admin 管理

我们还可以新添加一个用户

2.1 RabbitMQ的结构和概念

Publisher(消息发送者)会把消息发送到exchange(交换机),交换机负责路由把消息投递到队列,队列负责暂存消息,Consumer(消费者)再从队列中获取消息并处理消息

将来我们创建用户之后会有一个虚拟主机Virtual Host,各个虚拟主机之间是相互隔离的,互相看不到的,不会互相干扰

三、常见消息模型

  • 基本消息队列(BasicQueue)、工作消息队列(WorkQueue)

消息的发送和接收都是基于队列来完成的(下图中红色的部分就是队列),并没有出现交换机

  • 发布订阅(Publish、Subscribe)

又根据交换机类型的不同分为三种:

Fanout Exchange:广播

Direct Exchange:路由

Topic Exchange:主题

这三个图中都一个紫色的部分,这一块就是交换机

3.2 HelloWord案例

官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:

  • publisher:消息发布者,将消息发送到队列queue

  • queue:消息队列,负责接收并缓存消息

  • consumer:订阅队列,处理队列中的消息

3.2.1消息的发布:

    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立连接   必须用到连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.150.101");  //端口不一样就需要改动代码
        factory.setPort(5672);      //mq当中的消息通信是5672   ui管理台是15672
        factory.setVirtualHost("/");//虚拟主机  至于为什么是单斜杠,进入rabbitMQ页面的admin栏下查看Can access virtual hosts字段值
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.发送消息
        String message = "hello, rabbitmq!";
        //queueName是我们要发送到的队列,message.getBytes()将消息转成字节发送
        channel.basicPublish("", queueName, null, message.getBytes());  
        System.out.println("发送消息成功:【" + message + "】");

        // 5.关闭通道和连接
        channel.close();
        connection.close();

    }

当我们运行这段程序之后,下图会显示,已经建立起连接

建立起通道:

创建的队列

发送消息

消息发送完成之后关闭通道和连接,也不用管对方收没收到

解除了耦合,而且是异步的

3.2.2 消息的接收:

消息的接收和发布非常的像,只不过将消息的发布改变成了消息的订阅


public class ConsumerTest {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.订阅消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
         // 这个地方采用了匿名内部类的方式 ,并重写了方法 handleDelivery处理投递的消息
             @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.处理消息
                //发的时候是字节,那接的时候也是字节byte[] body
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }
}

可能有人会有疑问:为什么在发布的时候创建队列了,那为什么接收消息的时候还要创建队列?

这是因为生产者和消费者启动的顺序是不确定的,万一消费者先启动,会出现找不到队列的情况,为了预防这种情况的发生,他们各自去声明

那发布和订阅都创建同一个队列,那会不会有冲突?

答案是并没有,如果队列名重复的话,不会创建两个

当我们接收到消息之后,队列中的Ready数变成了0

3.2.3 基本消息队列消息发送/接收流程总结

基本消息队列消息发送流程总结

  • 建立connection

  • 创建channel

  • 利用channel声明队列

  • 利用channel向队列发送消息

基本消息队列消息接收流程总结

  • 建立connection

  • 创建channel

  • 利用channel声明队列

  • 定义consumer的消费行为handleDelivery()

  • 利用channel将消费者与队列绑定