springboot+rabbitmq 之 消费端配置
Chapter1 直接上代码
@Slf4j @Component public class UserSettlementConsumer { @RabbitHandler @RabbitListener(queues = "${spring.rabbitmq.queue-name}") public void testListener(String msg) { log.info("消息出队:"+msg); } }
可以看出来,RabbitMQ主要是借助于@RabbitHandler和@RabbitListener这两个注解来实现消息队列的消费。
@RabbitHandler的javadoc注释:Annotation that marks a method to be the target of a Rabbit message listener within a class that is annotated with RabbitListener.
@RabbitListener的javadoc注释:Annotation that marks a method to be the target of a Rabbit message listener on the specified queues() (or bindings()).
Chapter2 在应用程序服务启动时,如果指定的消息在broker中不存在,则会导致mq初始化失败
需要说明的是,在应用程序服务启动时,如果指定的消息在broker中不存在,则会导致mq初始化失败,服务也无法启动。错误信息:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'test.queue' in vhost '/', class-id=50, method-id=10)
如下是详细的错误信息:
2021-06-15 15:13:14.997 [][] [main] INFO o.s.a.rabbit.connection.CachingConnectionFactory:497 - Created new connection: connectionFactory#53ea380b:0/SimpleConnection@5b1cedfd [delegate=amqp://rabbitadmin@192.168.40.20:5672/, localPort= 1142] 2021-06-15 15:13:15.155 [][] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#3-1] WARN o.s.amqp.rabbit.listener.BlockingQueueConsumer:707 - Failed to declare queue: test.queue 2021-06-15 15:13:15.155 [][] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#3-1] WARN o.s.amqp.rabbit.listener.BlockingQueueConsumer:641 - Queue declaration failed; retries left=3 org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[test.queue] at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:713) at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.passiveDeclarations(BlockingQueueConsumer.java:597) at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:584) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1338) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1183) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: null at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:126) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:122) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:144) at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:1006) at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:52) at org.springframework.amqp.rabbit.connection.PublisherCallbackChannelImpl.queueDeclarePassive(PublisherCallbackChannelImpl.java:363) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1190) at com.sun.proxy.$Proxy252.queueDeclarePassive(Unknown Source) at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:692) ... 5 common frames omitted Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'test.queue' in vhost '/', class-id=50, method-id=10) at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:494) at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:288) at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:138) ... 15 common frames omitted
错误描述很明显,是要求指定的队列必须存在。
一种方式是手动在mq的broker端创建(强烈不推荐,尤其是生产环境,我们程序员往往没权限访问控制台);另一种方式是通过如下的程序配置(推荐):
- 一是利用@Bean注解来声明queue。
- 二是借助于@RabbitListener的成员:queuesToDeclare
- 三是借助于@RabbitListener的成员:bindings
■ @Bean声明方式如下:
@Configuration public class DirectRabbitConfig { @Value("${spring.rabbitmq.queue-name}") private String mqName; @Bean public Queue myQueue() { return new Queue(mqName, true); } }
■ org.springframework.amqp.rabbit.annotation.RabbitListener#queuesToDeclare 可以在队列不存在时直接创建:
@Component public class UserSettlementConsumer { @RabbitHandler @RabbitListener(queuesToDeclare ={@Queue(name = "${spring.rabbitmq.queue-name}",durable = "true")}) public void testListener(String msg) { log.info("消息出队:"+msg); } }
如下是spring-rabbit-2.2.0.RELEASE.jar里org.springframework.amqp.rabbit.annotation.RabbitListener#queuesToDeclare的javadoc:
package org.springframework.amqp.rabbit.annotation; public @interface RabbitListener { /** * The queues for this listener. * If there is a {@link org.springframework.amqp.rabbit.core.RabbitAdmin} in the * application context, the queue will be declared on the broker with default * binding (default exchange with the queue name as the routing key). * Mutually exclusive with {@link #bindings()} and {@link #queues()}. * @return the queue(s) to declare. * @see org.springframework.amqp.rabbit.listener.MessageListenerContainer * @since 2.0 */ Queue[] queuesToDeclare() default {}; }
■ org.springframework.amqp.rabbit.annotation.RabbitListener#binding类型是@QueueBinding,可以同时创建exchange、queue和绑定关系。
@Component public class UserSettlementConsumer { @RabbitHandler @RabbitListener(bindings = @QueueBinding( value = @Queue("${spring.rabbitmq.queue-name}"), exchange = @Exchange(value = "${spring.rabbitmq.exchange-name}", type = ExchangeTypes.FANOUT), key = "${spring.rabbitmq.queue-name}")) public void testListener(String msg) { log.info("消息出队:"+msg); } }
Chapter3 回过头来介绍RabbitListener#queues()
package org.springframework.amqp.rabbit.annotation; public @interface RabbitListener { /** * The queues for this listener. * The entries can be 'queue name', 'property-placeholder keys' or 'expressions'. * Expression must be resolved to the queue name or {@code Queue} object. * The queue(s) must exist, or be otherwise defined elsewhere as a bean(s) with * a {@link org.springframework.amqp.rabbit.core.RabbitAdmin} in the application * context. * Mutually exclusive with {@link #bindings()} and {@link #queuesToDeclare()}. * @return the queue names or expressions (SpEL) to listen to from target * @see org.springframework.amqp.rabbit.listener.MessageListenerContainer */ String[] queues() default {}; }
从以上RabbitListener#queues()的javadoc内容可以看出来如下三点信息,其中第2条指明了队列必须存在:
- queues的取值可以是常量(如 MessageQueueConstant.USER_QUEUE),可以是属性占位符(如 "${spring.rabbitmq.queue-name}"),可以是SpEL表达式(如 "#{configToolkitProp['zk.address']}"、"#{userQueue.name}")
- 所指定的队列必须存在,或者是ApplicationContext里的一个具有org.springframework.amqp.rabbit.core.RabbitAdmin的bean。
- queues()与bindings()和queuesToDeclare()是互斥的。设定了queues(),就不能再设定bindings()和queuesToDeclare()了。
Chapter4 关于reply-code=404, reply-text=NOT_FOUND
在服务运行过程中,如果broker里的交换机或队列被删掉了,同样会导致无法生产消息。如下是错误信息。这时,只能重启应用服务或者手动在broker里创建所需的交换机和队列。
2021-06-17 15:13:18.782 [AMQP Connection 192.168.40.20:5672] ERROR o.s.a.rabbit.connection.CachingConnectionFactory:1468 - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'test-USER_SETTLEMENT_EXCHANGE' in vhost '/', class-id=60, method-id=40)
Chapter5 重复消费控制
自动消费消息时,如果消费端出现异常并且没有处理的话,消息会重新入队并致使重复消费。如果不做控制的话,可能会导致无限重复消费。可做如下配置:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true
max-attempts: 5
initial-interval: 5000
相关文章
- springboot的自动化配置是什么_spring三种配置方式
- springboot项目里面,发送http请求的get方法,post方法,ssl方法的工具类
- SpringBoot跨域配置「建议收藏」
- 自定义SpringBoot自动配置类
- SpringBoot之静态资源的访问与管理
- SpringBoot框架:第一章:基础配置和介绍
- 一个 dubbo 和 springboot 的兼容性问题
- GitHub经典教材!阿里P8的这份SpringBoot精髓到底厉害在哪里?
- [Web开发]《SpringBoot + MySQL + MyBatisPlus》
- 基于ssm框架基于springboot框架的设计进来
- SpringBoot(一)自动配置
- idea创建springboot父子工程_Springboot框架
- SpringBoot 中 HikariCP 的相关配置
- SpringBoot系列之数据库初始化-jpa配置方式
- SpringBoot 验证码生成+SMTP邮箱服务配置
- Java 毕业设计,基于 SpringBoot 的高校招生管理系统
- Springboot读取配置的10种方式
- 杨校老师项目之基于SpringBoot+React框架开发的医院挂号管理系统
- SpringBoot 中使用HikariPool 报错Possibly consider using a shorter maxLifetime value.详解程序员
- SpringBoot集成Quartz实现定时器详解编程语言
- SpringBoot Redis序列化配置详解编程语言
- springboot自带定时器实现定时任务的开启关闭以及定时时间可以配置详解编程语言