zl程序教程

您现在的位置是:首页 >  其他

当前栏目

RocketMQ 面试 33 连问,答完面试官主动要给我提薪资待遇...

面试官面试 ... rocketmq 主动 33 薪资 待遇
2023-09-11 14:16:30 时间

这不是秋招马上开始了嘛,这个月我每天会分享一个技术栈的高频面试题,而这些面试题都是取自于我五月份时整理的一些面试文档,希望对最近有面试或者有跳槽打算的同学有所帮助,需要这些文档的同学文章下方即可免费获取

目前已经更新了 Nginx、MySQL 和 RabbitMQ 感兴趣的朋友也可以去看一下

废话不多说,开始今天的面试之旅

 

RocketMQ 由哪些角色组成,每个角色作用和特点是什么?

角色作用 Nameserver 无状态,动态列表;这也是和 zookeeper 的重要区别之一。zookeeper 是有状态的。Producer 消息生产者,负责发消息到 Broker。Broker 就是 MQ 本身,负责收发消息、持久化消息等。Consumer 消息消费者,负责从 Broker 上拉取消息进行消费,消费完进行 ack。

RocketMQ 中的 Topic 和 JMS 的 queue 有什么区别?

queue 就是来源于数据结构的 FIFO 队列。而 Topic 是个抽象的概念,每个 Topic 底层对应 N 个 queue,而数据也真实存在 queue 上的。

RocketMQ Broker 中的消息被消费后会立即删除吗?

「不会」,每条消息都会持久化到 CommitLog 中,每个 Consumer 连接到 Broker 后会维持消费进度信息,当有消息消费后只是当前 Consumer 的消费进度(CommitLog 的 offset)更新了。

4.6 版本默认 48 小时后会删除不再使用的 CommitLog 文件

  • 检查这个文件最后访问时间

  • 判断是否大于过期时间

  • 指定时间删除,默认凌晨 4 点

源码如下:

/** * {@link org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#isTimeToDelete()} */private boolean isTimeToDelete() {    // when = "04";    String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen();    // 是04点,就返回true    if (UtilAll.isItTimeToDo(when)) {        return true;    } // 不是04点,返回false    return false;}/** * {@link org.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#deleteExpiredFiles()} */private void deleteExpiredFiles() {    // isTimeToDelete()这个方法是判断是不是凌晨四点,是的话就执行删除逻辑。    if (isTimeToDelete()) {        // 默认是72,但是broker配置文件默认改成了48,所以新版本都是48。        long fileReservedTime = 48 * 60 * 60 * 1000;        deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(72 * 60 * 60 * 1000, xx, xx, xx);    }}                                                                     /** * {@link org.apache.rocketmq.store.CommitLog#deleteExpiredFile()} */public int deleteExpiredFile(xxx) {    // 这个方法的主逻辑就是遍历查找最后更改时间+过期时间,小于当前系统时间的话就删了(也就是小于48小时)。    return this.mappedFileQueue.deleteExpiredFileByTime(72 * 60 * 60 * 1000, xx, xx, xx);}

复制代码

RocketMQ 消费模式有几种?

消费模型由 Consumer 决定,消费维度为 Topic。

  • 集群消费一条消息只会被同 Group 中的一个 Consumer 消费多个 Group 同时消费一个 Topic 时,每个 Group 都会有一个 Consumer 消费到数据

  • 广播消费 消息将对一 个 Consumer Group 下的各个 Consumer 实例都消费一遍。即即使这些 Consumer 属于同一个 Consumer Group ,消息也会被 Consumer Group 中的每个 Consumer 都消费一次。

RocketMQ 消息是 push 还是 pull?

RocketMQ 没有真正意义的 push,都是 pull,虽然有 push 类,但实际底层实现采用的是「长轮询机制」,即拉取方式

broker 端属性 longPollingEnable 标记是否开启长轮询。默认开启

源码如下:

// {@link org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage()}// 看到没,这是一只披着羊皮的狼,名字叫PushConsumerImpl,实际干的确是pull的活。// 拉取消息,结果放到pullCallback里this.pullAPIWrapper.pullKernelImpl(pullCallback);

复制代码

为什么要主动拉取消息而不使用事件监听方式?

事件驱动方式是建立好长连接,由事件(发送数据)的方式来实时推送。

如果 broker 主动推送消息的话有可能 push 速度快,消费速度慢的情况,那么就会造成消息在 Consumer 端堆积过多,同时又不能被其他 Consumer 消费的情况。而 pull 的方式可以根据当前自身情况来 pull,不会造成过多的压力而造成瓶颈。所以采取了 pull 的方式。

Broker 如何处理拉取请求的?

Consumer 首次请求 Broker Broker 中是否有符合条件的消息

  • 有响应 Consumer 等待下次 Consumer 的请求

  • 没有 DefaultMessageStore#ReputMessageService#run 方法 PullRequestHoldService 来 Hold 连接,每个 5s 执行一次检查 pullRequestTable 有没有消息,有的话立即推送每隔 1ms 检查 commitLog 中是否有新消息,有的话写入到 pullRequestTable 当有新消息的时候返回请求挂起 consumer 的请求,即不断开连接,也不返回数据使用 consumer 的 offset,

RocketMQ 如何做负载均衡?

通过 Topic 在多 Broker 中分布式存储实现。

「producer 端」

发送端指定 message queue 发送消息到相应的 broker,来达到写入时的负载均衡

  • 提升写入吞吐量,当多个 producer 同时向一个 broker 写入数据的时候,性能会下降

  • 消息分布在多 broker 中,为负载消费做准备

默认策略是随机选择:

  • producer 维护一个 index

  • 每次取节点会自增

  • index 向所有 broker 个数取余

  • 自带容错策略

其他实现:

  • SelectMessageQueueByHash

  • hash 的是传入的 args

  • SelectMessageQueueByRandom

  • SelectMessageQueueByMachineRoom 没有实现

也可以自定义实现「MessageQueueSelector」接口中的 select 方法

MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);

复制代码

「consumer 端」

采用的是平均分配算法来进行负载均衡。

「其他负载均衡算法」

  • 平均分配策略(「默认」)(AllocateMessageQueueAveragely)

  • 环形分配策略(AllocateMessageQueueAveragelyByCircle)

  • 手动配置分配策略(AllocateMessageQueueByConfig)

  • 机房分配策略(AllocateMessageQueueByMachineRoom)

  • 一致性哈希分配策略(AllocateMessageQueueConsistentHash)

  • 靠近机房策略(AllocateMachineRoomNearby)

当消费负载均衡 consumer 和 queue 不对等的时候会发生什么?

Consumer 和 queue 会优先平均分配,如果 Consumer 少于 queue 的个数,则会存在部分 Consumer 消费多个 queue 的情况,如果 Consumer 等于 queue 的个数,那就是一个 Consumer 消费一个 queue,如果 Consumer 个数大于 queue 的个数,那么会有部分 Consumer 空余出来,白白的浪费了。

消息重复消费如何解决?

影响消息正常发送和消费的「重要原因是网络的不确定性。」

  • 「引起重复消费的原因」「ACK」正常情况下在 consumer 真正消费完消息后应该发送 ack,通知 broker 该消息已正常消费,从 queue 中剔除当 ack 因为网络原因无法发送到 broker,broker 会认为词条消息没有被消费,此后会开启消息重投机制把消息再次投递到 consumer「消费模式」在 CLUSTERING 模式下,消息在 broker 中会保证相同 group 的 consumer 消费一次,但是针对不同 group 的 consumer 会推送多次

  • 「解决方案」「数据库表」处理消息前,使用消息主键在表中带有约束的字段中 insert「Map」单机时可以使用 map ConcurrentHashMap -> putIfAbsent guava cache「Redis」分布式锁搞起来。

如何让 RocketMQ 保证消息的顺序消费?

首先多个 queue 只能保证单个 queue 里的顺序,queue 是典型的 FIFO,天然顺序。多个 queue 同时消费是无法绝对保证消息的有序性的。所以总结如下:

同一 topic,同一个 QUEUE,发消息的时候一个线程去发送消息,消费的时候 一个线程去消费一个 queue 里的消息。

怎么保证消息发到同一个 queue?

Rocket MQ 给我们提供了 MessageQueueSelector 接口,可以自己重写里面的接口,实现自己的算法,举个最简单的例子:判断 i % 2 == 0,那就都放到 queue1 里,否则放到 queue2 里。

for (int i = 0; i < 5; i++) {    Message message = new Message("orderTopic", ("hello!" + i).getBytes());    producer.send(        // 要发的那条消息        message,        // queue 选择器 ,向 topic中的哪个queue去写消息        new MessageQueueSelector() {            // 手动 选择一个queue            @Override            public MessageQueue select(                // 当前topic 里面包含的所有queue                List<MessageQueue> mqs,                // 具体要发的那条消息                Message msg,                // 对应到 send() 里的 args,也就是2000前面的那个0                Object arg) {                // 向固定的一个queue里写消息,比如这里就是向第一个queue里写消息                if (Integer.parseInt(arg.toString()) % 2 == 0) {                    return mqs.get(0);                } else {                    return mqs.get(1);                }            }        },        // 自定义参数:0        // 2000代表2000毫秒超时时间        i, 2000);}

复制代码

RocketMQ 如何保证消息不丢失?

首先在如下三个部分都可能会出现丢失消息的情况:

  • Producer 端

  • Broker 端

  • Consumer 端

Producer 端如何保证消息不丢失

  • 采取 send()「同步发消息」,发送结果是同步感知的。

  • 发送失败后可以「重试」,设置重试次数。默认 3 次。producer.setRetryTimesWhenSendFailed(10);❞

  • 「集群部署」,比如发送失败了的原因可能是当前 Broker 宕机了,重试的时候会发送到其他 Broker 上。

Broker 端如何保证消息不丢失

  • 修改刷盘策略为同步刷盘。默认情况下是异步刷盘的。flushDiskType = SYNC_FLUSH❞

  • 集群部署,主从模式,高可用。

Consumer 端如何保证消息不丢失

  • 完全消费正常后在进行手动 ack 确认。

RocketMQ 的消息堆积如何处理?

首先要找到是什么原因导致的消息堆积,是 Producer 太多了,Consumer 太少了导致的还是说其他情况,总之先定位问题。

然后看下消息消费速度是否正常,正常的话,可以通过上线更多 Consumer 临时解决消息堆积问题

如果 Consumer 和 Queue 不对等,上线了多台也在短时间内无法消费完堆积的消息怎么办?

  • 准备一个临时的 topic

  • queue 的数量是堆积的几倍

  • queue 分布到多 Broker 中

  • 上线一台 Consumer 做消息的搬运工,把原来 Topic 中的消息挪到新的 Topic 里,不做业务逻辑处理,只是挪过去

  • 上线 N 台 Consumer 同时消费临时 Topic 中的数据

  • 改 bug

  • 恢复原来的 Consumer,继续消费之前的 Topic

堆积消息会超时删除吗?

「不会」;RocketMQ 中的消息只会在 commitLog 被删除的时候才会消失。也就是说未被消费的消息不会存在超时删除这情况。

堆积的消息会不会进死信队列?

「不会」,消息在消费失败后会进入重试队列(%RETRY%+ConsumerGroup),18 次才会进入死信队列(%DLQ%+ConsumerGroup)。

源码如下:

public class MessageStoreConfig {    // 每隔如下时间会进行重试,到最后一次时间重试失败的话就进入死信队列了。 private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";}

复制代码

RocketMQ 在分布式事务支持这块机制的底层原理?

分布式系统中的事务可以使用「TCC」(Try、Confirm、Cancel)、「2pc」来解决分布式系统中的消息原子性

RocketMQ 4.3+ 提供分布事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终一致

「RocketMQ 实现方式:」

  • 「Half Message」:预处理消息,当 broker 收到此类消息后,会存储到 RMQ_SYS_TRANS_HALF_TOPIC 的消息消费队列中

  • 「检查事务状态」:Broker 会开启一个定时任务,消费 RMQ_SYS_TRANS_HALF_TOPIC 队列中的消息,每次执行任务会向消息发送者确认事务执行状态(提交、回滚、未知),如果是未知,Broker 会定时去回调在重新检查。

  • 「超时」:如果超过回查次数,默认回滚消息。

也就是他并未真正进入 Topic 的 queue,而是用了临时 queue 来放所谓的 half message,等提交事务后才会真正的将 half message 转移到 topic 下的 queue。

高吞吐量下如何优化生产者和消费者的性能?

  • 开发消息批量拉取业务逻辑批量处理同一 group 下,多机部署,并行消费单个 Consumer 提高消费线程个数批量消费

  • 运维网卡调优 jvm 调优多线程与 cpu 调优 Page Cache

RocketMQ 是如何保证数据的高容错性的?

  • 在不开启容错的情况下,轮询队列进行发送,如果失败了,重试的时候过滤失败的 Broker

  • 如果开启了容错策略,会通过 RocketMQ 的预测机制来预测一个 Broker 是否可用

  • 如果上次失败的 Broker 可用那么还是会选择该 Broker 的队列

  • 如果上述情况失败,则随机选择一个进行发送

  • 在发送消息的时候会记录一下调用的时间与是否报错,根据该时间去预测 broker 的可用时间

其实就是 send 消息的时候 queue 的选择。源码在如下:

org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue()

复制代码

任何一台 Broker 突然宕机了怎么办?

Broker 主从架构以及多副本策略。Master 收到消息后会同步给 Slave,这样一条消息就不止一份了,Master 宕机了还有 slave 中的消息可用,保证了 MQ 的可靠性和高可用性。而且 Rocket MQ4.5.0 开始就支持了 Dlegder 模式,基于 raft 的,做到了真正意义的 HA。

Broker 把自己的信息注册到哪个 NameServer 上?

每个 Broker 向所有的 NameServer 上注册自己的信息,即每个 NameServer 上有所有的 Broker 信息

RocketMQ 如何分布式存储海量消息的?

RocketMQ 进程一般称为 Broker,集群部署的各个 Broker 收到不同的消息,然后存储在自己本地的磁盘文件中。

任何一台 Broker 突然宕机了怎么办?还能使用吗?消息会不会丢?

RocketMQ 的解决思路是 Broker 主从架构以及多副本策略。

Master 收到消息后会同步给 Slave,这样一条消息就不止一份了,Master 宕机了还有 slave 中的消息可用,保证了 MQ 的可靠性和高可用新。

怎么知道有哪些 Broker ?如何知道要连那个 Broker?

有个 NameServer 的概念,是独立部署在几台机器上的,然后所有的 Broker 都会把自己注册到 NameServer 上去,NameServer 就知道集群里有哪些 Broker 了!

发送消息到 Broker,会找 NameServer 去获取路由信息 系统要从 Broker 获取消息,也会找 NameServer 获取路由信息,去找到对应的 Broker 获取消息。

NameServer 到底可以部署几台机器?为什么要集群化部署?

部署多台,保证高可用性。

集群化部署是为了高可用性,NameServer 是集群里非常关键的一个角色,如果部署一台 NameServer,宕机会导致 RocketMQ 集群出现故障,所以 NameServer 一定会多机器部署,实现一个集群,起到高可用的效果。

系统如何从 NameServer 获取 Broker 信息?

系统主动去 NameServer 上拉取 Broker 信息及其他相关信息。

如果 Broker 宕了,NameServer 是怎么感知到的?

Broker 会定时(30s)向 NameServer 发送心跳 然后 NameServer 会定时(10s)运行一个任务,去检查一下各个 Broker 的最近一次心跳时间,如果某个 Broker 超过 120s 都没发送心跳了,那么就认为这个 Broker 已经挂掉了。

Broker 挂了,系统是怎么感知到的?

主要是通过拉取 NameServer 上 Broker 的信息。但是,因为 Broker 心跳、NameServer 定时任务、生产者和消费者拉取 Broker 信息,这些操作都是周期性的,所以不会实时感知,所以存在发送消息和消费消息失败的情况,现在 我们先知道,对于生产者而言,他是有 一套容错机制的。

Master Broker 是如何将消息同步给 Slave Broker 的?

RocketMQ 自身的 Master-Slave 模式采取的是 Pull 模式拉取消息。

消费消息时是从 Master 获取还是 Slave 获取?

可能从 Master Broker 获取消息,也有可能从 Slave Broker 获取消息

  1. 消费者的系统在获取消息的时候会先发送请求到 Master Broker 上去,请求获取一批消息,此时 Master Broker 是会返回一批消息给消费者系统的

  2. Master Broker 在返回消息给消费者系统的时候,会根据当时 Master Broker 的 负载情况和 Slave Broker 的 同步情况,向消费者系统建议下一次拉取消息的时候是从 Master Broker 拉取还是从 Slave Broker 拉取。

如果 Slave Broker 挂掉了,会对整个系统有影响吗?

有一点影响,但是影响不太大,因为消息写入全部是发送到 Master Broker 的,获取消息也可以 Master 获取,少了 Slave Broker,会导致所有读写压力都集中在 Master Broker

Master Broker 突然挂了,这样会怎么样?

「RocketMQ 4.5 版本之前」,用 Slave Broker 同步数据,尽量保证数据不丢失,但是一旦 Master 故障了,Slave 是没法自动切换成 Master 的。所以在这种情况下,如果 Master Broker 宕机了,这时就得手动做一些运维操作,把 Slave Broker 重新修改一些配置,重启机器给调整为 Master Broker,这是有点麻烦的,而且会导致中间一段时间不可用。

「RocketMQ 4.5 之后」支持了一种叫做 Dledger 机制,基于 Raft 协议实现的一个机制。我们可以让一个 Master Broker 对应多个 Slave Broker, 一旦 Master Broker 宕机了,在多个 Slave 中通过 Dledger 技术 将一个 Slave Broker 选为新的 Master Broker 对外提供服务。在生产环境中可以是用 Dledger 机制实现自动故障切换,只要 10 秒或者几十秒的时间就可以完成