[五]RabbitMQ-客户端源码之AMQChannel
/** * Protected API - called by nextCommand to check possibly handle an incoming Command before it is returned to the caller of nextCommand. If this method * returns true, the command is considered handled and is not passed back to nextCommands caller; if it returns false, nextCommand returns the command as * usual. This is used in subclasses to implement handling of Basic.Return and Basic.Deliver messages, as well as Channel.Close and Connection.Close. * @param command the command to handle asynchronously * @return true if we handled the command; otherwise the caller should consider it "unhandled" public abstract boolean processAsync(Command command) throws IOException;
有关processAsync()这个方法的会在介绍ChannelN类的时候详细阐述([八]RabbitMQ-客户端源码之ChannelN)。
protected final Object _channelMutex = new Object(); /** The connection this channel is associated with. */ private final AMQConnection _connection; /** This channels channel number. */ private final int _channelNumber; /** Command being assembled */ private AMQCommand _command = new AMQCommand(); /** The current outstanding RPC request, if any. (Could become a queue in future.) */ private RpcContinuation _activeRpc = null; /** Whether transmission of content-bearing methods should be blocked */ public volatile boolean _blockContent = false;
_channelMutex这个是内部用来当对象锁的,没有实际的意义,可忽略 _connection是指AMQConnection这个对象。 _channelNumber是指channel number, 这个应该不用多解释了吧。通道编号为0的代表全局连接中的所有帧,1-65535代表特定通道的帧. _command是内部处理使用的对象,调用AMQCommand的方法来处理一些东西。 _activeRpc是指当前未处理完的rpc请求(the current outstanding rpc request)。 _blockContent 是在Channel.Flow里用到的,其余情况都是false
在AMQChannel的构造函数中,只有两个参数:AMQConnection connection以及int channelNumber.
* Private API - When the Connection receives a Frame for this * channel, it passes it to this method. * @param frame the incoming frame * @throws IOException if an error is encountered public void handleFrame(Frame frame) throws IOException { AMQCommand command = _command; if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line _command = new AMQCommand(); // prepare for the next one handleCompleteInboundCommand(command); * Private API - handle a command which has been assembled * @throws IOException if theres any problem * @param command the incoming command * @throws IOException public void handleCompleteInboundCommand(AMQCommand command) throws IOException { // First, offer the command to the asynchronous-command // handling mechanism, which gets to act as a filter on the // incoming command stream. If processAsync() returns true, // the command has been dealt with by the filter and so should // not be processed further. It will return true for // asynchronous commands (deliveries/returns/other events), // and false for commands that should be passed on to some // waiting RPC continuation. if (!processAsync(command)) { // The filter decided not to handle/consume the command, // so it must be some reply to an earlier RPC. nextOutstandingRpc().handleCommand(command); markRpcFinished(); }
这个在[六]RabbitMQ-客户端源码之AMQCommand有所介绍,主要是用来处理Frame帧的,当调用AMQCommand的handleFrame处理之后返回为true是,即处理完毕时继续调用handleCompleteInboundCommand方法。这其中也牵涉到AMQConnection的MainLoop内部类,具体可以看看:[六]RabbitMQ-客户端源码之AMQCommand。
这个方法在AMQConnection.start()方法中有过使用:_channel0.enqueueRpc(conStartBroker)。这个方法就是将参数付给成员变量_activeRpc,至于这个RpcContinuation到底是个什么gui,我们下面再讲。
继续下一个方法:
这个方法是判断一下当前的_activeRpc是否为null,为null则为false,否则为true。看方法的名字应该猜出大半。
下面一个方法:
* Protected API - sends a {@link Method} to the broker and waits for the * next in-bound Command from the broker: only for use from * non-connection-MainLoop threads! public AMQCommand rpc(Method m) throws IOException, ShutdownSignalException return privateRpc(m); public AMQCommand rpc(Method m, int timeout) throws IOException, ShutdownSignalException, TimeoutException { return privateRpc(m, timeout); private AMQCommand privateRpc(Method m) throws IOException, ShutdownSignalException SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation(); rpc(m, k); // At this point, the request method has been sent, and we // should wait for the reply to arrive. // Calling getReply() on the continuation puts us to sleep // until the connections reader-thread throws the reply over // the fence. return k.getReply(); private AMQCommand privateRpc(Method m, int timeout) throws IOException, ShutdownSignalException, TimeoutException { SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation(); rpc(m, k); return k.getReply(timeout); public void rpc(Method m, RpcContinuation k) throws IOException synchronized (_channelMutex) { ensureIsOpen(); quiescingRpc(m, k); public void quiescingRpc(Method m, RpcContinuation k) throws IOException synchronized (_channelMutex) { enqueueRpc(k); quiescingTransmit(m); }
主要是看最后一个方法——quiescingRpc.这个方法说白就两行代码:
enqueueRpc(k);是将由privateRpc等方法内部创建的SimpleBlockingRpcContinuation对象附给当前的AQMChannel对象的成员变量_activeRpc
关于quiescingTransmit(m)就要接下去看了:
public void quiescingTransmit(Method m) throws IOException { synchronized (_channelMutex) { quiescingTransmit(new AMQCommand(m)); public void quiescingTransmit(AMQCommand c) throws IOException { synchronized (_channelMutex) { if (c.getMethod().hasContent()) { while (_blockContent) { try { _channelMutex.wait(); } catch (InterruptedException e) {} // This is to catch a situation when the thread wakes up during // shutdown. Currently, no command that has content is allowed // to send anything in a closing state. ensureIsOpen(); c.transmit(this); }
上面代码只需要看: c.transmit(this);这一句,其余的都是摆设。看到这里,就调用了AMQCommand的transmit方法,这个transmit方法就是讲AMQChannel中封装的内容发给broker,然后等待broker返回,进而通过之前附值的_activeRpc来处理回传的帧。
虽然之前在AMQConnection([二]RabbitMQ-客户端源码之AMQConnection)中详细讲述了start()方法,但是这里还是要来拿这个来举例这个AMQChannel中的rpc怎么使用
在AMQConnection中有这么一段代码:
Method method = (challenge == null) ? new AMQP.Connection.StartOk.Builder() .clientProperties(_clientProperties) .mechanism(sm.getName()) .response(response) .build() : new AMQP.Connection.SecureOk.Builder().response(response).build(); try { Method serverResponse = _channel0.rpc(method, HANDSHAKE_TIMEOUT/2).getMethod(); if (serverResponse instanceof AMQP.Connection.Tune) { connTune = (AMQP.Connection.Tune) serverResponse; } else { challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge(); response = sm.handleChallenge(challenge, this.username, this.password); }
客户端将Method封装成Connection.StartOk帧之后等待broker返回Connection.Tune帧。
此时调用了AMQChannel的rpc(Method m, int timeout)方法,其间接调用了AMQChannel的privateRpc(Method, int timeout)方法。代码详情上面已经罗列出来。
注意privateRpc(Method, int timeout)方法的最有一句返回:return k.getReply(timeout);这句代码的意思是SimpleBlockingRpcContinuation对象在等待broker的返回,确切的来说是MainLoop线程处理之后返回,即AMQChannel类中handleCompleteInboundCommand方法的nextOutstandingRpc().handleCommand(command)这行代码。
AQMChannel还有些其他的内容,都是边缘性的东西,这里还剩下个RpcContinuation要着重阐述下的:
void handleCommand(AMQCommand command); void handleShutdownSignal(ShutdownSignalException signal); public static abstract class BlockingRpcContinuation T implements RpcContinuation { public final BlockingValueOrException T, ShutdownSignalException _blocker = new BlockingValueOrException T, ShutdownSignalException public void handleCommand(AMQCommand command) { _blocker.setValue(transformReply(command)); public void handleShutdownSignal(ShutdownSignalException signal) { _blocker.setException(signal); public T getReply() throws ShutdownSignalException return _blocker.uninterruptibleGetValue(); public T getReply(int timeout) throws ShutdownSignalException, TimeoutException return _blocker.uninterruptibleGetValue(timeout); public abstract T transformReply(AMQCommand command); public static class SimpleBlockingRpcContinuation extends BlockingRpcContinuation AMQCommand public AMQCommand transformReply(AMQCommand command) { return command; }
RPCContinuation只是一个接口,而BlockingRpcContinuation这个抽象类缺似乎略有门道。而SimpleBlockingRpcContinuation只是将BlockingRpcContinuation中的handleCommand方法便成为:
BlockingRpcContinuation类主要操纵了BlockingValueOrException _blocker这个成员变量。再接下深究BlockingValueOrException其实是继承了BlockingCell,对其做了一下简单的封装。最后来看下BlockingCell是个什么鬼, 截取部分代码如下:
其实这个就是capacity为1的BlockingQueue,顾美其名曰BlockingCell,绕了大半圈,原来AMQChannel中的_activeRpc就是个这么玩意儿~
RabbitMQ遵从的是AMQP协议,其broker端代码采用erlang编写,对于没有接触过erlang的同学(包括博主我)来说,想要了解其中的奥秘实在是不容易,大多只能从网上“搜刮”点散碎的知识点来充实一下。
在[八]RabbitMQ-客户端源码之ChannelN中讲述basicConsume的方法时设计到Consumer这个回调函数,Consumer其实是一个接口,真正实现它的是QueueingConsumer和DefaultConsumer,且DefaultConsumer是QueueingConsumer的父类,里面都是空方法。
ChannelN是整个RabbitMQ客户端最核心的一个类了,其包含的功能点甚多,这里需要分类阐述。 首先来看看ChannelN的成员变量:
AMQPImpl类包括AMQP接口(public class AMQImpl implements AMQP)主要囊括了AMQP协议中的通信帧的类别。 这里以Connection.Start帧做一个例子。
AMQCommand是用来处理AMQ命令的,其包含了Method, Content Heaeder和Content Body. 下面是通过wireshark抓包的AMQP协议
Frame是指AMQP协议层面的通信帧(一个正式定义的连接数据包)。 我们来看下Frame类中的成员变量有哪些:
相关文章
- 基于协同过滤的旅游推荐系统设计与实现(论文+源码)_kaic
- Ordinarykringing部分源码01-DCGAN实现—apache配置
- percona-xtrabackup-2.4.28源码编译安装和二进制安装
- 【源码】基于百度EasyDL平台的数字识别分类
- 【源码】c#编写的安卓客户端与Windows服务器程序进行网络通信
- Redis学习之Jedis源码原理分析探究(BIO手写Jedis客户端)
- struts2拦截器的实现原理及源码剖析
- 【源码】"拆" 网络请求库-Volley(上)
- 【Kafka源码】日志处理
- qemu vl.c源码学习
- 第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密
- MVC系列——MVC源码学习:打造自己的MVC框架(三:自定义路由规则)
- Android之开源中国客户端源码分析(二)
- JAVA在图片上添加文字水印(源码+注释详解)
- 从 vue.js 源码角度再看数据绑定
- pybitcointools源码分析之由私钥获取公钥
- [九]RabbitMQ-客户端源码之Consumer
- [七]RabbitMQ-客户端源码之AMQPImpl+Method
- Hhadoop-2.7.0中HDFS写文件源码分析(二):客户端实现(1)
- Alluxio源码分析读数据:打开文件选项OpenFileOptions
- HBase的scan源码分析客户端部分之整体流程(一)