zl程序教程

您现在的位置是:首页 >  Python

当前栏目

什么时候用MQ、MQ 的作用、延迟消息

2023-03-14 22:59:33 时间

本文主要参考沈剑大佬的消息队列系列的四篇博文和博文评论,以及刘海丰老师的《架构设计面试精讲》,文末是完整参考。

1、什么时候用MQ、MQ 的作用

MQ是一个互联网架构中常见的解耦利器。

1.1 MQ 的组成

Producer:消息的生产者;

Broker:MQ 服务器;

Topic:存储不同类别的消息,存在于 Broker

Partition:分区,topic 数据的分流,用于提高 topic 的并发能力。每个分区存储 topic 的一部分数据,一条消息唯一且仅存在于一个 topic,同一 topic 的不同分区的数据无交叉重复。每个partition在物理机上对应一个文件夹,该文件夹下存储这个 partition 的所有消息和索引文件;

Consumer:消费者,一个分区的数据只能被一个消费者消费,每个消费者能消费某多个分区的消息。

Consumer Group:消费者组,消费同一 topic 的消费者集群,

1.2 本质

观察者模式的应用,以消息或者事件为驱动

1.3 什么时候不使用MQ?

上游实时关注和依赖执行结果

1.4 什么时候使用MQ?

1)上游不关心下游执行结果

2)下游执行时间长,化同步为异步,异步返回结果,也可以接口回调结果,

3)数据驱动的任务依赖(规则引擎、cron排班表:多个定时任务有依赖关系时利用 MQ 来消息通知【这种场景主要是定时任务之间缺少通信机制,除了 MQ,还可用用 redis 等公共内存来通信】)

1.5 作用

削峰填谷,异步解耦,信息通讯

同步转异步,提高吞吐量

1.5.1 削峰填谷

上下游处理能力不匹配,吞吐量不匹配

削峰就是暂存流量,缓冲流量,等待下游慢慢消化流量,流量发生了延迟。如果调用方不限速,容易把下游打垮。需要下游做的操作就是控制从 MQ 中获取消息的频率,限速拉取,可以定时获取,也可以批量获取,批量处理还能提高整体的吞吐量。

1.5.2 异步解耦

消费者服务出现异常,消息可以暂存在消息队列中,等消费者服务恢复后继续消费和处理消息,不影响用户请求的正常处理。

1.6 问题:

问:如果不用 MQ 抗住并发(如何用接口调用处理上下游吞吐量不匹配的问题?)

业务上游使用队列缓冲请求,限速调用下游

业务下游使用队列缓冲请求,限速执行任务。

这两种情况需要注意队列长度的问题,如果任务特别多,且队列是存在内存里,那么有可能导致 OOM

问:如果上游发送流量过大,MQ提供拉模式确实可以起到下游自我保护的作用,会不会导致消息在MQ中堆积?

会,如果下游处理能力跟不上,堆积是必然的,所以还是得尽量要提高下游处理能力。削峰强调的是短暂的流量波峰起一个缓冲作用,不让系统垮掉,如果长时间有大流量请求,还是得把下游的处理能力提高到与上游相当,否则堆积只会越来越多

问:大量堆积可能会导致什么问题

导致请求大量超时。

问:MQ 如何实现堆积和处理消息堆积

把消息固化到磁盘或者 数据库,比如 RocketMQ 的实现

问:批量消费可能导致什么问题

可能批量拉取的消息中部分处理失败,增加了处理难度,几乎任何效率的提升和可维护性的提升都会带来系统复杂度的提升。

问:为什么要自研MQ,不用开源的

某个具体开源的组件有突出的优势,但是可能很满足业务的全部诉求,比如很少开源 MQ 能同时实现高可用、消息幂等、消息必达、消息固化和延迟消息,所以往往业务可能会根据自己的诉求对开源 MQ封装,或者完全自研。

问:消息的推拉有什么不同?

推实时但难以流控,拉有时延但能限速,需要结合业务场景来选型

问:怎么处理消息堆积

生产者发送消息增多,还是消费者处理速度变慢

从消费者上看:要临时扩容,增加消费端的数量,与此同时,降级一些非核心的业务。通过扩容和降级承担流量,这是为了表明你对应急问题的处理能力。

其次,才是排查解决异常问题,如通过监控,日志等手段分析是否消费端的业务逻辑代码出现了问题,优化消费端的业务处理逻辑。

在回答问题的时候,你需要特别注意的是,让面试官了解到你的思维过程,这种解决问题的能力是面试官更为看中的,比你直接回答一道面试题更有价值。

问:水平扩容消费者服务时需要注意什么

Kafka 约定一个分区只能被一个消费者消费,所以如果需要水平扩展消费者服务,那么一定要同步扩展分区数量

问:MQ 高性能的原因

1.内存池设计。

2.默认200ms批量发送一次数据,同一个 broker 的请求会合并成一个。

  1. Reactor 网络设计模式(selector ,网络线程池,IO线程池)。
  2. 顺序写磁盘。

5.零拷贝。读:1.跳表设计,定位到文件。 2.日志存储是稀疏索引,定位到数据。

问:什么是死信队列

用来存放被消费者拉取的次数超过最大重试次数的消息的特殊队列,死信队列的消息一般被认为是业务无法处理的消息,一般需要人工介入处理。

问:有哪些分区策略

分区策略: producer 写入数据时 partition 的分配规则,Producer API提供了三种默认实现:随机、 轮询和key hash, 用户可以通过实现Partitioner接口,定制分区策略

2、MQ 如何实现延迟消息和定时任务

定时任务和延迟消息其实有很多共同点,那就是都是想在将来某个时间点执行某个任务。所以现实生产过程中经常用延迟消息来实现定时任务。利用“延时消息”,对于每个任务只触发一次,保证效率的同时保证实时性

2.1 常规的定时任务实现

创建一个 cronjob 定时任务,每1小时或者多久执行一次,周期性轮询数据库,查询数据库记录,检查是否满足执行条件,满足则执行定时任务,执行完修改数据库记录

1分钟实现“延迟消息”功能 中有滴滴打车订单结束 48 小时后自动 5 星好评的案例。

10w定时任务,如何高效触发超时 有一个 30s 自动关闭 tcp 连接的案例。

2.1.1 缺点

(1)轮询效率比较低

(2)每次扫库,已经被执行过记录,仍然会被扫描(只是不会出现在结果集中),有重复计算的嫌疑

(3)时效性不够好,如果每小时轮询一次,最差的情况下,时间误差等于定时任务的执行周期。上一次定时任务执行完毕恰好产生一条满足执行条件的记录,但是需要等到下次定时任务触发才能被执行。

(4)如果通过增加cron轮询频率来减少(3)中的时间误差,但是(1)中轮询低效和(2)中重复计算的问题会进一步凸显

2.2 延迟消息实现(环形队列-也称为「时间轮」算法)

环形队列是一个实现“延时消息”的好方法,开源的MQ好像都不支持延迟消息

设置一个秒级别或者毫秒级别的环形队列,队列的长度是延迟时间的周期,比如30s,长度为30,1小时,长度为 3600等,队列的每个元素是一个该秒需要执行的定时任务对象 Set 集合,Set 集合里存储的是延迟任务的延迟时间取余队列长度结果相等的所有定时任务对象。

同时启动一个 timer 定时器,每秒钟在上述环形队列中移动一个结点,有一个 Current Index 指针来标识正在检测的结点,执行完当前结点的 Set 集合的所有定时任务后,从 Set 中删除执行完毕的定时任务,Current Index指针下一秒移动到下一个结点。

注:为了能在 1 秒钟内执行完 Set 集合内的所有定时任务,不耽误 Current Index 指针下一秒的移动,所以一般会创建一个新的协程去异步处理 Set 集合的扫描任务和 num 更新,而 timer 定时器则继续等待下一秒移动到队列下一个结点。异步协程又可以创建新的协程去处理每个定时任务,通过线程池或者协程池来避免协程数量和资源开销的问题。

1456655-20230213012044607-1429126590.png

(沈剑老师的手绘稿)

2.2.1 环形队列的长度为 3600s,但是延迟时间超过 1 小时怎么办,比如 3610s

存储在 Set 集合里的每个定时任务对象额外存储一个属性:轮次 cycle-num,表示当前定时任务再被扫描几轮可以执行,队列结点被 current index 扫描到,异步协程检查Set 集合中的每个定时任务对象,检查 cycle-num 是否大于0,如果大于0,说明还有下一轮需要扫描,那么 num 减一,如果 num 等于0,那么直接执行定时任务,执行完后从 Set 中删除该定时任务。比如希望3610秒之后,触发一个延时消息任务,那么设置 num 初始值为1,当前定时任务放在下标为 10 的对象结点所在的 Set 集合中。表示当被第二次扫描到就应该被执行。cycle-num 的思路也称之为分层时间轮。

注:另一个思路是把 cycle-num 轮次属性,替换成时间戳,每次遍历 set 集合中的任务时,检查 task 的时间戳是否满足要求,满足则执行和删除,否则跳过且保留。

2.2.2 如果想在固定时间点执行,

比如 17 点,那么需要动态计算每个定时任务的的延迟时间,然后根据延迟时间对队列长度取模,决定定时任务的队列下标和轮次 cycle-num。

2.2.3 如果延迟时间是一个固定的周期

那么最好把队列长度设置成跟固定的周期一样长,这样每次插入定时任务到队列时,这个新任务恰好是需要经过一轮周期才能被执行,那么只需要插入到当前指针的上一个结点所在的集合里即可。

注:如果是这种固定周期的定时任务,由于新任务到达的时间恰好错过了一个周期,任务每次都会放在上当前指针所指向的结点的上一个结点的集合,新任务的在队列的位置是不可预知的,如果有查找到该任务或者移动该任务的需求,就需要另外用一个map,存储下task所在的队列下标,比如 30s tcp 关闭 tcp 连接的场景,如果 30s 有新的消息,那么就需要重新统计 30s,这时就需要找到原来的 task 所在set集合删除,添加到新集合中。

2.3 时间轮法实现定时任务的优势

(1)无需再轮询业务表全部的所有记录,效率高

(2)一个订单或者一个业务记录,任务只执行一次

(3)时效性好,精确到秒(控制timer移动频率可以控制精度)

2.4 问题

问:还有哪些方式可以实现延迟消息

1.redis的 zset 按照到期时间排序, 始终轮询第一条数据,~不过不支持事务…一旦处理异常会丢失数据或者死循环

  1. 最小堆,把所有的的定时任务用一个最小堆按照到期时间排序,每次 sleep 当前时间与堆顶的时间间隔,这样就不需要每秒都执行一次 timer 移动,但是这种只适合定时任务数量不多的情况。
  2. rocket MQ 实现了延迟消息,通过创建多个延迟消费的 topic,写入消息后不会进入它的逻辑消费队列,定时器扫描把满足时间要求的消息从延迟消费的队列移动到逻辑消费队列,这样消费者就可以拉取到这个消息开始消费了。
  3. netty 的hashtimewheel 就是用来做时间轮的
  4. 高并发队列框架 disruptor ,底层也是一个环,单机支持数据量 100 万以上的实时高并发并行队列处理。这个开源框架已经在类似证券行情和交易的系统中了。
  5. Java内置了一个阻塞延时队列DelayQueue

7.rabbit mq利用 ttl 特性可以实现、不过无法取消已放入队列里面的数据。可能对某些业务不适用。(好像说 activemq 也支持延迟消息?)

8.数据库,利用 ts 轮询可以实现。不过业务量大会有性能瓶颈。

问:用 redis 的 zset 怎么实现延时队列?有什么问题

一个思路是:zset 按评价时间排序 只要轮询第一条数据,zset 的分数可以重复吗?

问:就滴滴2小时默认好评这个场景,能不用定时任务来实现吗

给2个星级(一个默认5级,一个空),显示层简单的根据时间判断取哪个

问:滴滴2小时默认好评这个场景,如果 2 小时内,用户手动打了评价,怎么办?需要从 set 中找到任务然后删除吗

不需要删除,只需要在执行定时任务时,额外校验一下用户是否已评价即可,如果已经评价则不执行默认 5 星好评的处理。

问:任务都是存储在内存中的,万一程序重启怎么恢复?

先固化到磁盘或者数据库,收到 ack 后再删除,执行完从Set 中删除的同时也删除数据库中的记录。这其实是 MQ 不丢消息的实现方式。

如果程序重启,从数据库或者磁盘重新加载到定时任务后重建环形队列。像维护 tcp 连接这样的定时任务,如果服务器重启,为了防止雪崩,客户端会 random 一个时间,给服务端重建环形队列的时间。

问:如果时间比较长比如30天后触发,那么累计的量就比较大了,这么多数据存在哪里呢

30天后触发,一个思路因为时间比较长,所以最终执行的时间可能不需要精确到秒,误差几秒或许可以接受,那么把时间轮的精度加粗一些,比如每1分钟移动一个结点,每个结点存储此分钟内待执行的定时任务,但是这并不能解决随着时间堆积定时任务越来越多的问题。

一个优化方案是把时间比较久的任务,先固化,未来再取出,但是未来再取出这个操作又怎么实现呢,难道又弄一个定时任务来检查和加载吗?那这种场景下还是用定时任务还是延迟消息呢? 待考证

问:定时任务太多?一个实例处理太慢怎么办?

分布式水平扩展队列服务来提高吞吐能力,多个结点同时执行定时任务,约定每个结点 load 的定时任务,这样不会重复执行,也不需要加锁。

问:如果在遍历set执行任务的过程中同时往这个set中添加新任务,这种情况下怎么保证set的安全性

用线程安全数据结构,golang 有哪些线程安全的 Set 数据结构?channel,好像不适合队列场景,map 好像线程不安全,sync.Map 或许可以

问:客服热线 30s 无通信关闭 tcp 连接怎么实现

10w定时任务,如何高效触发超时 文中介绍了三种方式

法一:“轮询扫描法”

1)用一个Map<uid, last_packet_time>来记录每一个uid最近一次请求时间last_packet_time

2)当某个用户uid有请求包来到,实时更新这个Map

3)启动一个timer,当Map中不为空时,轮询扫描这个Map,看每个uid的last_packet_time是否超过30s,如果超过则进行超时处理

法二:“多timer触发法”

1)用一个Map<uid, last_packet_time>来记录每一个uid最近一次请求时间last_packet_time

2)当某个用户uid有请求包来到,实时更新这个Map,并同时对这个uid请求包启动一个timer,30s之后触发

3)每个uid请求包对应的timer触发后,看Map中,查看这个uid的last_packet_time是否超过30s,如果超过则进行超时处理

分析

方案一:只启动一个timer,但需要轮询,效率较低

方案二:不需要轮询,但每个请求包要启动一个timer,比较耗资源

法三:环形队列时间轮法

具体实现跟上面的时间轮算法是一样的,只不过时间轮队列的长度为 30,且需要一个 map 来记录每个 uid 的task 在队列的哪个结点的 Set 里,插入 set 和删除 set 时都需要更新 map。详细实现可阅读10w定时任务,如何高效触发超时

优势

(1)只需要1个timer

(2)timer每1s只需要一次触发,消耗CPU很低

(3)批量超时,Current Index扫到的slot,Set中所有元素都应该被超时掉

问:current index 指向的 Set 中,哪些任务应该被执行

cycle-num 为 0 或者不存在 cycle-num 属性的任务都应该被执行,比如 30s tcp 关闭连接的案例,不需要 cycle-num 属性,那么 Current Index 每秒钟移动一个结点,这个结点对应的 Set 中所有uid都应该被关闭连接。

问:延迟消息还有哪些应用场景

超时连接释放、超时会话管理,超时执行默认操作(给好评,自动取消订单),定时任务

扩展:

死信队列:存放「消费次数超过最大重试次数的消息」的队列

问:每个 tcp 连接都会占用一个端口吗

不会,一个服务往往是一个进程,一个进程只占用一个端口。进程内的一个tcp连接,只占一个文件描述符

问:重复消费和消费不成功有什么好的解决方案吗

(1)业务要支持幂性等去重操作。(2)MQ支持业务ACK,保证可达。

完整参考:

58到家MQ如何快速实现流量削峰填谷

1分钟实现“延迟消息”功能

消息总线能否实现消息必达?

刘海丰老师的《架构设计面试精讲》:08 | MQ:如何回答消息队列的丢失、重复与积压问题

10w定时任务,如何高效触发超时