RabbitMQ消息为什么变成了数字呢?
结论:
(1)Spring Boot的RabbitAutoConfiguration没有配置MessageConverter。
(2)spring-amqp在处理RabbitMQ消息时,会根据contentType选择不同的
MessageConverter来执行解码操作。
(3)spring-amqp的消息解码组件MessagingMessageListenerAdapter有一个可以处理contentType为text/plain、text/xml等的Message。
(4)spring-amqp在发送String类型的消息时,默认的contentType是text/plain。
(5)消息变成数字,是因为没有找到合适的Messageconverter
缘起
需要监听兄弟团队一个RabbitMQ队列。这种监听第三方队列的操作都在一个项目,直接把已有的监听代码copy过来【这样比较快】,把不需要的删除,譬如处理逻辑。
@RabbitListener(queues = PRODUCT_SHELF_STATUS)
public void handleShelfStatusChange(String msg,
@Header(AmqpHeaders.CONTENT_TYPE) String contentType) {
log.info("商品上下架消息 data:{} contentType:{}", JSON.toJSONString(msg), contentType);
try {
handleMsg(msg);
return;
} catch (Exception e) {
log.warn("商品上下架消息 报错了 msg:{} {}", msg, e.getMessage());
}
sendToLogQueueWhenFail(msg);
}
自测
报错了?
打断点看一看
奇怪,发的消息明明是字符串,为什么变成数字了。
BugShooting:站到巨人的肩膀上
搜索了下,居然找到相同的报错。原来是MessageConverter缺失,并看到了解决方案:
核对了下项目,的确没有配Jackson2JsonMessageConverter
但之前的消息监听不都跑得好好的,为什么呢?
BugShooting:Debug【必杀技】
Debug后,找到关键代码:
如果没有配置MessageConverter,MessagingMessageListenerAdapter使用的MessagingMessageConverter会初始化一个SimpleMessageConverter
org.springframework.amqp.support.converter.MessagingMessageConverter#MessagingMessageConverter()
org.springframework.amqp.support.converter.SimpleMessageConverter#fromMessage
其实,还有个地方比较关键,这个在后面讲。
小提示:spring-amqp的版本号
如此说来以前,之前监听消息的contentType可能是默认的text/plain
这样的话,还不能按搜到的方案改。这会导致老逻辑都报错,因为Jackson2JsonMessageConverter只会处理contetType contain json 字符串的Message
org.springframework.amqp.support.converter.Jackson2JsonMessageConverter#fromMessage(org.springframework.amqp.core.Message, java.lang.Object)
如何优雅地改呢?
SpringBoot的亮点就是自动配置、起步依赖。那么有没有一个配置参数,配置一下,contentType 对应的MessageCoverter都有了呢?
先看下SpringBoot对Rabbit的自动配置:
org.springframework.boot.autoconfigure.amqp.RabbitAnnotationDrivenConfiguration#RabbitAnnotationDrivenConfiguration
那么改动就小的改法就有了,直接把期望的MessageConverter初始化到Spring容器中就可以了。
@Bean
public MessageConverter messageConverter() {
ContentTypeDelegatingMessageConverter messageConverter = new ContentTypeDelegatingMessageConverter();
messageConverter.addDelegate(MediaType.APPLICATION_JSON_VALUE, new Jackson2JsonMessageConverter());
messageConverter.addDelegate(MediaType.TEXT_PLAIN_VALUE, new SimpleMessageConverter());
return messageConverter;
}
配置多个MessageConverter,需要借助ContentTypeDelegatingMessageConverter
org.springframework.amqp.support.converter.ContentTypeDelegatingMessageConverter
重启应用,看看上面的配置是否生效。
可以看到,已经生效了。
org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener#extractMessage
新的报错
这个错误比较熟悉,将监听RabbitMQ消息的Argument改为对应的Java DTO就可以了
@RabbitListener(queues = PRODUCT_SHELF_STATUS)
public void handleShelfStatusChange(List<ShelfStatusProductMQDTO> msg,
Message message,
@Header(AmqpHeaders.CONTENT_TYPE) String contentType) {
log.info("商品上下架消息 data:{} contentType:{}", JSON.toJSONString(msg), contentType);
try {
handleMsg(msg);
return;
} catch (Exception e) {
log.warn("商品上下架消息 报错了 msg:{} {}", msg, e.getMessage());
}
// sendToLogQueueWhenFail(msg);
}
至此,问题完美解决。
但是,之前收到的数字到底是什么呢?
收到消息的数字是什么呢?
将RabbitMQ Message中payload的byte[]中的数字,使用英文逗号拼成的字符串
小贴士: Arrays.stream(ObjectUtils.toObjectArray(message.getBody())).map(String::valueOf).collect(Collectors.joining(","))
为什么是这样呢,这个在下面有分析
发送String类型的消息,默认的消息类型是什么?
amqpTemplate.convertAndSend(RabbitMQEnum.TEST.getExchange(), RabbitMQEnum.TEST.getRoutingKey(), JSON.toJSONString(msg))
org.springframework.amqp.rabbit.core.RabbitTemplate#convertMessageIfNecessary
org.springframework.amqp.support.converter.SimpleMessageConverter#createMessage
福利:
本想写个demo,方便小伙伴研究。
但异常没有复现
原来spring-amqp自5.1.2开始已经对这个点进行了优化,即不需要配置额外的MessageConverter,原因在之后的resolveArgument环节,匹配到了RabbitListenerAnnotationBeanPostProcessor$BytesToStringConverter。这个Converter就可以将String类型Payload的byte[]可以正常convert为String字符串。
参数解析代码入口:
org.springframework.messaging.handler.invocation.InvocableHandlerMethod#getMethodArgumentValues
org.springframework.amqp.rabbit.annotation.RabbitListenerAnnotationBeanPostProcessor$BytesToStringConverter
org.springframework.core.convert.support.GenericConversionService#convert(java.lang.Object, org.springframework.core.convert.TypeDescriptor, org.springframework.core.convert.TypeDescriptor)
再看有异常项目中spring amqp的版本是2.0.3.RELEASE
在resolve时,匹配到的converter是ArrayToStringConverter。
org.springframework.core.convert.support.GenericConversionService.ConvertersForPair#getConverter
org.springframework.core.convert.support.ArrayToStringConverter#convert
show the code : https://github.com/helloworldtang/mq
小结:
建议还是对不同的contentType配置特定的MessageConverter,这样有两个好处 (1)代码简洁 (2)提升性能。一步到位,避免额外的数据类型转换。
相关文章
- SpringBoot+RabbitMQ 实现手动消息确认(ACK)
- RabbitMQ模拟消息队列群发邮件
- 分布式--RabbitMQ集成SpringBoot、消息可靠性、死信队列、延迟交换机、集群
- 11-RabbitMQ高级特性-消息可靠性投递
- Docker安装RabbitMQ消息队列
- 消息队列:第五章:RabbitMQ的使用
- iOS友盟消息推送总是推送失败或者token无效[通俗易懂]
- 11-RabbitMQ高级特性-消息如何保证100%的投递成功
- 19-RabbitMQ消息一致性问题
- vue消息提示组件封装
- 字节跳动3-3大牛力荐!RabbitMQ实战指南:消息队列面试必刷手册
- rabbitmq使用案例_RabbitMq
- API通讯消息进化史
- RabbitMQ消息中间件技术精讲11 高级篇四 confirm 确认消息
- RabbitMQ:基本消息模型
- 19-RabbitMQ应用问题-消息补偿
- RabbitMq TTL+死信队列 延迟消息问题记录
- 【RabbitMQ】Fanout、Direct、Topic、消息转换器
- RabbitMQ消息应答
- 使用 Spring Cloud Bus 在微服务之间传递消息
- 详解redis是如何实现队列消息的ack
- RabbitMQ详解(二)——消息通信的概念大数据
- activemq获取消息的详细信息详解编程语言
- Python3 自定义请求头消息headers详解编程语言
- java spring boot消息队列 RabbitMQ详解编程语言
- 前期主出货 4G 版,消息称华为 P50 Pro 确定有麒麟 9000 5G 版本,超大杯 P50 Pro+ 还未入网
- 17 个方面,综合对比 Kafka、RabbitMQ、RocketMQ、ActiveMQ 四个分布式消息队列
- 腾讯回应微视裁员并停止投放:不实消息
- 如何监控Redis中的消息队列(怎么监控redis的队列)
- 红色之火Redis队列消息机制(redis 队列消息)
- 以 Redis 为基础的聊天消息持久存储实践(redis 聊天消息存储)