zl程序教程

您现在的位置是:首页 >  大数据

当前栏目

分布式异步队列RabbitMQ 探究

2023-09-11 14:13:57 时间

一.分布式异步队列
1.什么是分布式异步队列
2.异步队列的意义和作业
3.常见的队列组件和优势特点
4.c#驱动rabbitmq
5.生产者消费模式


1.什么是分布式异步队列
分布式:
异步队列:生产者--中间人(rabbitmq)---消费这----》数据库
2.异步队列的意义和作业
同步架构:
异步架构:异步处理,肖峰,解耦,高可用,增加了复杂性,

3.常见的队列组件和优势特点
redis:
rabbitmq:
kafka:
activemq:

4.c#驱动rabbitmq
Nuget引入程序包RabbitMQ.Client
定义生产者消费者
生产消息,写入消息,消费消息

Queue
C#中的数据本质是一个数组,先进先出,维护一个最大的索引

RabbitMQ:
内部存储数据结构是Key-Value形式存储;

rabbitmq:基于amqp协议
以字节为单位的桢传输
一个生产者一个消费者;
消息只是被消费一次;


5.安装:
Erlang环境 RabbitMQ包下载
Erlang语言运行环境
下载地址:http://www.erlang.org/downloads
RabbitMQ包
下载地址: https://www.rabbitmq.com/

配置环境变量
ERLANG_HOME:C:\Program Files\erl10.5
Path: %ERLANG_HOME%\bin

RABBITMQ_SERVER: C:\Program Files\erl_rabbitmq_server-3.8.3(根目录)
Path :%RABBITMQ_SERVER%\sbin


5.交换机介绍:
生产端消息确认:
1.confirm:异步模式confirmSelect();WaitForConfirmsOrDie()
2.事务模式:同步模式,来自协议支持的,TxSelect(),TxCommit(),TxRollback()
消费端消息确认:
1.自动确认:
2.显示确认:

var message = Encoding.UTF8.GetString(ea.Body.ToArray());
//如果在这里处理消息的手,异常了呢?
//Console.WriteLine($"接收到消息:{message}"); ;
if (i < 50)
{
//手动确认 消息正常消费 告诉Broker:你可以把当前这条消息删除掉了
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
Console.WriteLine(message);
}
else
{
//否定:告诉Broker,这个消息我没有正常消费; requeue: true:重新写入到队列里去; false:你还是删除掉;
channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);
}


////处理消息
////autoAck: true 自动确认;
//channel.BasicConsume(queue: "ConsumptionACKConfirmQueue", autoAck: true, consumer: consumer);

//处理消息
//autoAck: false 显示确认;
channel.BasicConsume(queue: "ConsumptionACKConfirmQueue", autoAck: false, consumer: consumer);

失败
优先级
持久化:队列,交换机,消息持久化
事务机制
5.1 Direct Exchange:直接交换器,工作方式类似于单播,Exchange会将消息发送完全匹配ROUTING_KEY的Queue
DirectExchange 更像从路由方来筛选消息;

//定义,绑定,发布:
channel.BasicPublish(exchange: "DirectExChange",
routingKey: log.LogType,
basicProperties: null,
body: log.Msg);
//消费消息
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine($"【{message}】,写入文本~~");
};
//处理消息 消息确认
channel.BasicConsume(queue: "DirectExchangeLogAllQueue",
autoAck: true,
consumer: consumer);

5.2 Fanout Exchange:fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中

定义队列,交换机,绑定交换机,队列,不需要指定路由routingKey

5.3 Topic Exchange:Topic路由:
Exchange绑定队列需要制定Key; Key 可以有自己的规则;Key可以有占位符;*/#*匹配一个单词、#匹配多个单词,在Direct基础上加上模糊匹配;

routingKey:匹配规则绑定 #.product order.#

5.4 Header Exchange
headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。 在绑定Queue与Exchange时指定一组键值对以及x-match参数,x-match参数是字符串类型,可以设置为any或者all。如果设置为any,意思就是只要匹配到了headers表中的任何一对键值即可,all则代表需要全部匹配。

6.分布式事务,解决方案:
消息持久化:
生产端消息确认:
消费断消息确认:

 

Mobaxterm