[三]RabbitMQ-客户端源码之ChannelManager
关于ChannelManager,官方注解:Manages a set of channels, indexed by channel number (1.. _channelMax)。
ChannelManager类的代码量不是很多,主要用来管理Channel的,channelNumber=0的除外,应为channelNumber=0是留给Connection的特殊的channelNumber。
下面是ChannelManager的成员变量:
/** Monitor for code _channelMap /code and code channelNumberAllocator /code */ private final Object monitor = new Object(); /** Mapping from code b 1.._channelMax /b /code to {@link ChannelN} instance */ private final Map Integer, ChannelN _channelMap = new HashMap Integer, ChannelN private final IntAllocator channelNumberAllocator; private final ConsumerWorkService workService; private final Set CountDownLatch shutdownSet = new HashSet CountDownLatch /** Maximum channel number available on this connection. */ private final int _channelMax; private final ThreadFactory threadFactory;
这上面的成员变量下面会有涉及。
/** Object that manages a set of channels */ private volatile ChannelManager _channelManager;
AMQConnection中start()的_channelManager中对其初始化:
protected ChannelManager instantiateChannelManager(int channelMax, ThreadFactory threadFactory) { return new ChannelManager(this._workService, channelMax, threadFactory); }
再调用其构造函数:
public ChannelManager(ConsumerWorkService workService, int channelMax, ThreadFactory threadFactory) { if (channelMax == 0) { // The framing encoding only allows for unsigned 16-bit integers // for the channel number channelMax = (1 16) - 1; _channelMax = channelMax; channelNumberAllocator = new IntAllocator(1, channelMax); this.workService = workService; this.threadFactory = threadFactory; }
这里的ConsumerWorkService也在AMQConnection的start()方法中初始化——initializeConsumerWorkService():
private void initializeConsumerWorkService() { this._workService = new ConsumerWorkService(executor, threadFactory, shutdownTimeout); }
再回到构造函数。
channelMax参数是在client接收到broker的Connection.Tune帧中的“Channel-Max”参数之后设置的,如果为0则表示没有限制,这里就会设置为默认的最大值:2的16次方-1。
threadFactory参数是指:Executors.defaultThreadFactory();
关于ConsumerWorkService请参考文章末尾处。
public Channel createChannel(int channelNumber) throws IOException { ensureIsOpen(); ChannelManager cm = _channelManager; if (cm == null) return null; return cm.createChannel(this, channelNumber); public Channel createChannel() throws IOException { ensureIsOpen(); ChannelManager cm = _channelManager; if (cm == null) return null; return cm.createChannel(this); }
这里就是调用了ChannelManager的createChannel方法。
下面是ChannelManager中关于创建Channel的代码:
public ChannelN createChannel(AMQConnection connection) throws IOException { ChannelN ch; synchronized (this.monitor) { int channelNumber = channelNumberAllocator.allocate(); if (channelNumber == -1) { return null; } else { ch = addNewChannel(connection, channelNumber); ch.open(); // now that its been safely added return ch; public ChannelN createChannel(AMQConnection connection, int channelNumber) throws IOException { ChannelN ch; synchronized (this.monitor) { if (channelNumberAllocator.reserve(channelNumber)) { ch = addNewChannel(connection, channelNumber); } else { return null; ch.open(); // now that its been safely added return ch; private ChannelN addNewChannel(AMQConnection connection, int channelNumber) throws IOException { if (_channelMap.containsKey(channelNumber)) { // That numbers already allocated! Cant do it // This should never happen unless something has gone // badly wrong with our implementation. throw new IllegalStateException("We have attempted to " + "create a channel with a number that is already in " + "use. This should never happen. " + "Please report this as a bug."); ChannelN ch = instantiateChannel(connection, channelNumber, this.workService); _channelMap.put(ch.getChannelNumber(), ch); return ch; protected ChannelN instantiateChannel(AMQConnection connection, int channelNumber, ConsumerWorkService workService) { return new ChannelN(connection, channelNumber, workService); }
上面有两个createChannel方法,一个是带了channelNumber的,一个是自动分片channelNumber的,分别对应AMQConnection中的两个方法。最后都调用addNewChannel方法。
注意两个createChannel方法中都有这样一句代码:
* Package method: open the channel. * This is only called from {@link ChannelManager}. * @throws IOException if any problem is encountered public void open() throws IOException { // wait for the Channel.OpenOk response, and ignore it exnWrappingRpc(new Channel.Open(UNSPECIFIED_OUT_OF_BAND)); }
这样就调用了AMQChannel的rpc方法,向broker发送了一个Channel.Open帧。
addNewChannel方法实际上是创建了一个ChannelN对象,然后置其于ChannelManager中的_channelMap中,方便管理。
channelNumberAllocator是channelNumber的分配器,其原理是采用BitSet来实现channelNumber的分配,有兴趣的同学可以深究进去看看。
关于ChannelN类会有专门一篇博文来讲述,其实整个RabbitMQ-client的代码最关键的就是ChannelN这个类,需要着重讲述。
细心的朋友可能会发现关于ConsumerWorkService这个,我并没有做什么阐述。这个主要牵涉到Channel层面的处理,涉及到的类有AMQConnection, ChannelN, ConsumerDispatcher等。ConsumerWorkService是在AMQConnection中初始化,在ChannelManager中引用。至于这里怎么理解,在ChannelN中这么解释:
service for managing this channel’s consumer callbacks。意思是管理消费回调的服务。
综述,ChannelManager主要用来管理Channel, 包括channelNumber与Channel之间的映射关系。
java B2B2C 源码 springmvc mybatis多租户电子商城系统- Stream重新入队(RabbitMQ) 本文将介绍RabbitMQ的binder提供的重试功能:重新入队 准备一个会消费失败的例子,可以直接沿用前文的工程,也可以新建一个,然后创建如下代码的逻辑: @EnableBinding(TestApplication.
【直播预告】:Java Spring Boot开发实战系列课程【第11讲】:消息中间件 RabbitMQ 与api源码解析 mq消息中间件在高并发系统架构中扮演关键角色,阿里双11高并发使用了mq技术。本次课程一起学习最新Java Spring Boot 2.0、RabbitMQ中间件的最新特性与实战应用,同样会分析核心api源码。
RabbitMQ遵从的是AMQP协议,其broker端代码采用erlang编写,对于没有接触过erlang的同学(包括博主我)来说,想要了解其中的奥秘实在是不容易,大多只能从网上“搜刮”点散碎的知识点来充实一下。
在[八]RabbitMQ-客户端源码之ChannelN中讲述basicConsume的方法时设计到Consumer这个回调函数,Consumer其实是一个接口,真正实现它的是QueueingConsumer和DefaultConsumer,且DefaultConsumer是QueueingConsumer的父类,里面都是空方法。
ChannelN是整个RabbitMQ客户端最核心的一个类了,其包含的功能点甚多,这里需要分类阐述。 首先来看看ChannelN的成员变量:
AMQPImpl类包括AMQP接口(public class AMQImpl implements AMQP)主要囊括了AMQP协议中的通信帧的类别。 这里以Connection.Start帧做一个例子。
相关文章
- 客户关系管理系统的设计与实现(论文+源码)_kaic
- 基于web网上订餐系统的设计与实现(论文+源码)_kaic
- 网上积分兑换商城的设计与实现(论文+源码)_kaic
- 下载mysql的源码包
- winserver的consul部署实践与.net core客户端使用(附demo源码)
- 下拉菜单源码30则
- 【Nacos】Nacos源码分析(二):服务端和客户端实例注册
- Android布局优化之ViewStub、include、merge使用与源码分析
- 框架源码系列三:手写Spring AOP(AOP分析、AOP概念学习、切面实现、织入实现)
- 源码安装 mininet 及 图形化依赖
- 以太坊同步模式源码解析
- 03篇 Nacos Client服务发现源码分析
- 小程序支付详解+源码(客户端+服务端)
- [八]RabbitMQ-客户端源码之ChannelN
- [六]RabbitMQ-客户端源码之AMQCommand
- HBase的scan源码分析客户端部分之整体流程(一)