zl程序教程

您现在的位置是:首页 >  工具

当前栏目

[九]RabbitMQ-客户端源码之Consumer

2023-09-27 14:29:21 时间
在[八]RabbitMQ-客户端源码之ChannelN中讲述basicConsume的方法时设计到Consumer这个回调函数,Consumer其实是一个接口,真正实现它的是QueueingConsumer和DefaultConsumer,且DefaultConsumer是QueueingConsumer的父类,里面都是空方法。

[八]RabbitMQ-客户端源码之ChannelN中讲述basicConsume的方法时设计到Consumer这个回调函数,Consumer其实是一个接口,真正实现它的是QueueingConsumer和DefaultConsumer,且DefaultConsumer是QueueingConsumer的父类,里面都是空方法。在用户使用时可以简单的采用QueueingConsumer或者采用DefaultConsumer来重写某些方法。

这里先来看下消费者客户端的关键代码:

 QueueingConsumer consumer = new QueueingConsumer(channel);

 channel.basicQos(32);

 channel.basicConsume(QUEUE_NAME, false, "consumer_zzh",consumer)

 while (true) {

 QueueingConsumer.Delivery delivery = consumer.nextDelivery();

 String message = new String(delivery.getBody());

 System.out.println(" [X] Received " + message + "");

 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

 }

可以看到QueueingConsumer作为channel.basicConsume的回调函数,之后再进行处理。

在AMQConnection中有关MainLoop的主线程,专门用来”第一线”的处理Broker发送回客户端从帧。当Basic.Consume/.ConsumeOk开启消费模式之后,Broker主动的向客户端发送Basic.Delivery帧,MainLoop线程一步步的调用,最后到ChannelN的processAsync()方法中有:


if (method instanceof Basic.Deliver) {

 processDelivery(command, (Basic.Deliver) method);

 return true;

} 

之后调用processDelivery方法:


protected void processDelivery(Command command, Basic.Deliver method) {

 Basic.Deliver m = method;

 Consumer callback = _consumers.get(m.getConsumerTag());

 if (callback == null) {

 if (defaultConsumer == null) {

 throw new IllegalStateException("Unsolicited delivery -" + " see Channel.setDefaultConsumer to handle this" + " case.");

 else {

 callback = defaultConsumer;

 Envelope envelope = new Envelope(m.getDeliveryTag(), m.getRedelivered(),m.getExchange(),m.getRoutingKey());

 try {

 this.dispatcher.handleDelivery(callback, m.getConsumerTag(),envelope, (BasicProperties) command.getContentHeader(),command.getContentBody());

 } catch (Throwable ex) {

 getConnection().getExceptionHandler().handleConsumerException(this, ex,callback,m.getConsumerTag(), "handleDelivery");

}

这个方法首先根据consumerTag从ChannelN中的_consumer这个HashMap中获取相应的Consumer回调函数,然后调用这个回调函数的handleDeliver()方法进行处理,这里有些同学会有疑问,明明是调用ConsumerDispatcher dispatcher的handleDeliver()方法,其实这里只是包了一层皮,ConsumerDispatcher的handleDeliver()方法就是调用了Consumer的handleDeliver()方法。

我们接下去看看QueueingConsumer这个实现Consumer接口的类是怎么处理的:


这里的queue就是一个LinkedBlockingQueue,客户端程序通过调用nextDelivery()方法来获取数据:


public Delivery nextDelivery()

 throws InterruptedException, ShutdownSignalException, ConsumerCancelledException

 return handle(_queue.take());

private Delivery handle(Delivery delivery) {

 if (delivery == POISON ||

 delivery == null (_shutdown != null || _cancelled != null)) {

 if (delivery == POISON) {

 _queue.add(POISON);

 if (_shutdown == null _cancelled == null) {

 throw new IllegalStateException(

 "POISON in queue, but null _shutdown and null _cancelled. " +

 "This should never happen, please report as a BUG");

 if (null != _shutdown)

 throw Utility.fixStackTrace(_shutdown);

 if (null != _cancelled)

 throw Utility.fixStackTrace(_cancelled);

 return delivery;

}

这个nextDelivery方法说白就是一个LinkedBlockingQueue的take()操作,也就是一个可能会阻塞等待的操作。


java B2B2C 源码 springmvc mybatis多租户电子商城系统- Stream重新入队(RabbitMQ) 本文将介绍RabbitMQ的binder提供的重试功能:重新入队 准备一个会消费失败的例子,可以直接沿用前文的工程,也可以新建一个,然后创建如下代码的逻辑: @EnableBinding(TestApplication.
【直播预告】:Java Spring Boot开发实战系列课程【第11讲】:消息中间件 RabbitMQ 与api源码解析 mq消息中间件在高并发系统架构中扮演关键角色,阿里双11高并发使用了mq技术。本次课程一起学习最新Java Spring Boot 2.0、RabbitMQ中间件的最新特性与实战应用,同样会分析核心api源码。
RabbitMQ遵从的是AMQP协议,其broker端代码采用erlang编写,对于没有接触过erlang的同学(包括博主我)来说,想要了解其中的奥秘实在是不容易,大多只能从网上“搜刮”点散碎的知识点来充实一下。
ChannelN是整个RabbitMQ客户端最核心的一个类了,其包含的功能点甚多,这里需要分类阐述。 首先来看看ChannelN的成员变量:
AMQPImpl类包括AMQP接口(public class AMQImpl implements AMQP)主要囊括了AMQP协议中的通信帧的类别。 这里以Connection.Start帧做一个例子。
AMQCommand是用来处理AMQ命令的,其包含了Method, Content Heaeder和Content Body. 下面是通过wireshark抓包的AMQP协议