zl程序教程

您现在的位置是:首页 >  .Net

当前栏目

《ASP.NET Core技术内幕与项目实战》精简集-DDD准备5.5:集成事件RabbitMQ

2023-03-20 15:33:41 时间

 

本节内容,部分为补充内容,部分涉及到9.3.10-9.3.12(P335-342)。主要NuGet包:

  • RabbitMQ.Client

 

微服务间,跨进程的事件发布和订阅,需要借助第三方服务器作为事件总线,目前常用的有Redis、RabbitMQ、Kafka等,本章节介绍RabbitMQ。

一、基本过程

1、生产者(通常指一个微服务应用),将消息发布到RabbitMQ服务器的指定交换机,如果没有则新建一个交换机,每个消息都有一个routingKey(本质是一个事件)。

2、消费者(通常指另一个微服务应用),声明一个队列用于接收消息,队列通过交换机和routingKey进行匹配,监听指定交换机和routingKey的消息。

3、生产者发布消息或消费者接收消息,首先要与服务器建立TCP连接,TCP连接比较消耗资源,所以建立连接后,通过创建虚拟信道来发布和接收消息,发布或接收到消息后,信道关闭,但TCP连接会保留重复使用。

4、如果有A和B两个消费者,想读取同一条消息,应分别声明不同的队列名称,即同一条消息可以被多个消费者接收到。如果声明的队列名称相同,则同一条消息按先到先得的方式被其中一个消费者接收。

5、消息失败重发,消费者如果在接收消息进行处理时出错,可以通知队列“消息处理出错”,队列会重新发送消息。

 

 

 

 

 

二、两个控制台应用的事件发布和订阅案例。由于RabbitMQ的使用比较繁琐,所在大多数框架都会对RabbitMQ进行二次封装,并和领域事件进行集成,使我们在使用事件机制时,无论是领域事件,还是集成事件,API和用法都基本相同。但仍然需要了解一下RabbitMQ的原生使用方式。下面代码,是书中案例,但跑不起来,因为主要使用框架开发,都有集成事件的封装,所以懒得高度了,主要是了解一下集成事件底层的代码实现原理。

1、创建一个控制台应用,作为生产者

//声明RabbitMQ服务器,案例中以开发本机作为服务器
var factory = new ConnectionFactory();
factory.HostName = "localhost";
factory.DispatchConsumersAsync = true;

//命名交换机和路由名称,路由routinKey本质是一个事件,消费端通过routingKey来匹配队列
string exchangeName = "exchange1";
string routingKey = "event1";

//声明TCP连接,多个信道复用这个连接
using var conn = factory.CreateConnection();

//创建虚拟信道、连接交换机,发布信息
//本案例中,每延迟1秒,就发送一次信息
while (true)
{
    //定义传递的信息,每次发关事件
    string msg = DateTime.Now.TimeOfDay.ToString();

    //创建一个虚拟信道,连接交换机,发送消息
    using (var channel = conn.CreateModel())
    {
        //准备①:配置信道属性
        var properties = channel.CreateBasicProperties();
        properties.DeliveryMode = 2;

        //准备②:声明交接机。本例中,每次新的信道都使用相同的交接机
        channel.ExchangeDeclare(exchange:exchangeName,type:"direct");

        //准备③:RabbitMQ中的消息都是按照byte[]来传递,将消息转为byte[]格式
        byte[] body = Encoding.UTF8.GetBytes(msg);

        //发布消息,参数分别为交换机、routingKey(事件)、消息、信息属性
        channel.BasicPublish(exchange:exchangeName,routingKey:routingKey,body:body,basicProperties:properties,mandatory:true);
    }

    //每次发送完信息后,都关闭信道
    Console.WriteLine($"发布了消息:{msg}");

    //暂停1秒后,循环发送消息
    Thread.Sleep(1000);
}

 

2、创建另一个控制台应用,作为消费者

//声明RabbitMQ服务器,案例中以开发本机作为服务器
var factory = new ConnectionFactory();
factory.HostName = "127.0.0.1";
factory.DispatchConsumersAsync = true;

//定义交换机和路由名称
string exchangeName = "exchange1";
string routingKey = "event1";

//声明TCP连接,多个信道复用这个连接
using var conn = factory.CreateConnection();

//声明一个信道
using var channel = conn.CreateModel();

//声明交换机
channel.ExchangeDeclare(exchange:exchangeName,type:"direct");

//声明队列
string queueName = "queue1";
channel.QueueDeclare(queue:queueName,durable:true,exclusive:false,autoDelete:false,arguments:null);

//将队列和指定交换机及routingKey绑定
channel.QueueBind(queue: queueName, exchange:exchangeName,routingKey:routingKey);

//通过信道,从队列中接收消息
var consumer = new AsyncEventingBasicConsumer(channel);

//定义一个事件处理函数
consumer.Received += ConsumerReceived ;

//执行事件处理
channel.BasicConsume(queue:queueName,autoAck:false,consumer:consumer);
Console.ReadLine();

//收到消息后的事件处理函数
async Task ConsumerReceived(object sender,BasicDeliverEventArgs args)
{
    try
    {
        var bytes = args.Body.ToArray();
        string msg = Encoding.UTF8.GetString(bytes);
        Console.WriteLine(DateTime.Now + "收到了消息" + msg);
        channel.BasicAck(args.DeliveryTag, multiple: false); //通知队外,事件处理成功
        await Task.Delay(1000);
    }
    catch (Exception ex)
    {
        channel.BasicReject(args.DeliveryTag, true); //通知队列,事件处理失败,请重发
        Console.WriteLine("处理消息出错"+ex);
    }
}

 

 

三、书中有提供一个Zack.EventBus库,提供集成事件的再封装,极大的简化了集成事件的使用,详见P338-342

 

 

特别说明:
1、本系列内容主要基于杨中科老师的书籍《ASP.NET Core技术内幕与项目实战》及配套的B站视频视频教程,同时会增加极少部分的小知识点
2、本系列教程主要目的是提炼知识点,追求快准狠,以求快速复习,如果说书籍学习的效率是视频的2倍,那么“简读系列”应该做到再快3-5倍