Spring整合JMS-基于activeMQ实现(二)
Spring 实现 基于 整合 activemq JMS
2023-09-14 09:08:00 时间
Spring整合JMS-基于activeMQ实现(二)
1、消息监听器
在Spring整合JMS的应用中我们在定义消息监听器的时候一共能够定义三种类型的消息监听器,各自是MessageListener、SessionAwareMessageListener、MessageListenerAdapter
1.1 MessageListener
MessageListener是最原始的消息监听器(javax.jms.MessageListener),它是JMS规范中定义的一个接口。定义了一个omMessage方法。仅仅接收一个message參数
public class ConsumerMessageListener implements MessageListener{
public void onMessage(Message message) {//这里我们知道生产者发送的就是一个纯文本消息。所以这里能够直接进行强制转换,或者直接把onMessage方法的參数改成Message的子类TextMessageTextMessage textMessage = (TextMessage)message;System. out.println( "接收到一个纯文本消息" );try {System. out.println( "消息内容是:" + textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}}
1.2 SessionAwareMessageListener
sessionAwareMessageListener是Spring为我们提供的,它不是标准的JMS消息监听器。MessageListener处理接收到的消息时候假设须要返回消息给对方。此时就须要又一次获取connection和session,SessionAwareMessageListener的设计就是为了方便我们在接收到消息后发送一个回复的消息,onMessage方法中接收两个參数,一个Message。一个发送消息的Session。(看红色新增部分)
<beans xmlns= "http://www.springframework.org/schema/beans" xmlns:aop= "http://www.springframework.org/schema/aop"
xmlns:tx= "http://www.springframework.org/schema/tx" xmlns:xsi= "http://www.w3.org/2001/XMLSchema-instance"xmlns:context= "http://www.springframework.org/schema/context" xmlns:jms= "http://www.springframework.org/schema/jms"xsi:schemaLocation= "http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsdhttp://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.1.xsdhttp://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.1.xsdhttp://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.1.xsd ">
<bean id ="connectionFactory" class= "org.springframework.jms.connection.CachingConnectionFactory" ><property name ="targetConnectionFactory"><bean class= "org.apache.activemq.ActiveMQConnectionFactory" ><property name ="brokerURL"><value >tcp://localhost:61616 </value ></property ></bean ></property ><property name ="sessionCacheSize" value= "1" /></bean >
<!-- Spring jmsTemplate queue --><bean id ="jmsTemplate" class= "org.springframework.jms.core.JmsTemplate" ><property name ="connectionFactory" ref= "connectionFactory"></property ><property name ="defaultDestinationName" value= "subject"></property ><property name ="deliveryPersistent" value= "true"></property ><property name ="pubSubDomain" value="false"></ property> <!-- false p2p,true topic --><property name ="sessionAcknowledgeMode" value= "1"></property ><property name ="explicitQosEnabled" value= "true"></property ><property name ="timeToLive" value="604800000"></ property></bean ><!-- 配置Queue,当中value为Queue名称->start --><bean id = "testQueue" class = "org.apache.activemq.command.ActiveMQQueue" ><constructor-arg index = "0" value ="${pur.test.add}" /></bean ><bean id = "sessionAwareQueue" class = "org.apache.activemq.command.ActiveMQQueue" ><constructor-arg index = "0" value= "queue.liupeng.sessionaware" /></bean ><!-- 配置Queue,当中value为Queue名称->end --><!-- 注入AMQ的实现类属性(JmsTemplate和Destination) --><bean id = "amqQueueSender" class = "com.tuniu.scc.purchase.plan.manage.core.amq.AMQQueueSender" ><property name = "jmsTemplate" ref="jmsTemplate" ></property ><property name = "testQueue" ref="testQueue" ></property ><property name = "sessionAwareQueue" ref= "sessionAwareQueue"></property ></bean ><!-- 消息发送必用的发送类 --><bean id = "multiThreadAMQSender" class ="com.tuniu.scc.purchase.plan.manage.core.amq.MultiThreadAMQSender"init-method= "init"><property name = "jmsTemplate" ref="jmsTemplate" ></property ><property name = "multiThreadAMQExecutor" ref= "multiThreadAMQExecutor" ></property ></bean ><!-- 消息监听器->start --><bean id = "consumerMessageListener" class= "com.tuniu.scc.purchase.plan.manage.core.amq.ConsumerMessageListener" /><!-- 消息监听容器 --><bean id = "jmsContainer" class= "org.springframework.jms.listener.DefaultMessageListenerContainer" ><property name = "connectionFactory" ref= "connectionFactory" /><property name = "destination" ref= "testQueue" /> <!-- 消费者队列名称,改动 --><property name = "messageListener" ref= "consumerMessageListener" /></bean ><bean id = "consumerSessionAwareMessageListener" class ="com.tuniu.scc.purchase.plan.manage.core.amq.ConsumerSessionAwareMessageListener" ><property name ="testQueue" ref="testQueue"/> <!-- 接收消息后返回给testQueue队列 --></bean >< bean id= "sessionAwareListenerContainer" class= "org.springframework.jms.listener.DefaultMessageListenerContainer" ><property name ="connectionFactory" ref= "connectionFactory" /><property name ="destination" ref="sessionAwareQueue" /><property name ="messageListener" ref= "consumerSessionAwareMessageListener" /></bean ><!-- 消息监听器->end --></beans>
发送消息:
@Resource
private AMQQueueSender amqQueueSender;
private static final Logger LOG = LoggerFactory.getLogger(AMQController. class);
@UvConfig(method = "testQueue", description = "測试AMQ")@RequestMapping(value = "/testQueue", method = RequestMethod. POST)@TSPServiceInfo(name = "PUR.NM.AMQController.testQueue" , description = "測试AMQ")public void testQueue(HttpServletRequest request, HttpServletResponse response) {try {long beginTime = System. currentTimeMillis();LOG.info( "发送開始");//amqQueueSender.sendMessage("test", StaticProperty.TEST_QUEUE);amqQueueSender.sendMessage( "test", StaticProperty.TEST_SESSIONAWARE_QUEUE );LOG.info( "发送结束,耗时:" +(System.currentTimeMillis()-beginTime)+ "ms");} catch (InterruptedException e) {LOG.error( "測试失败", e);}}
接收消息:
public class ConsumerSessionAwareMessageListener implements SessionAwareMessageListener<TextMessage>{
private Destination testQueue; //返回消息目的队列@Overridepublic void onMessage(TextMessage message, Session session) throws JMSException {System. out.println( "收到一条消息" );System. out.println( "消息内容是:" +message.getText());MessageProducer producer = session.createProducer( testQueue);Message txtMessage = session.createTextMessage("consumerSessionAwareMessageListener..." );producer.send(txtMessage);}public Destination getTestQueue() {return testQueue;}public void setTestQueue(Destination sessionAwareQueue) {this.testQueue = sessionAwareQueue;}}
打印结果:
收到一条消息消息内容是:test接收到一个纯文本消息消息内容是:consumerSessionAwareMessageListener...
1.3 MessageListenerAdapter
MessageListenerAdapter实现了MessageListener接口和SessionAwareMessageListener接口,它的作用主要是将接收到的消息进行类型转换。然后通过反射的形式把它交给一个普通的Java类进行处理。
MessageListenerAdapter会把接收到的消息做例如以下转换:
TextMessage转换为String对象
BytesMessage转换为byte数组
MapMessage转换为Map对象
ObjectMessage转换为相应的Serializable对象
既然前面说到MessageListenerAdapter会把接收到的消息做类型转换再通过反射交给Java类处理,假设真正目标处理器是一个MessageListener或者是一个SessionAwareMessageListener,那么Spring将直接使用接收到的Message对象作为參数调用它们的onMessage方法。而不会利用反射去调用。以下定义的时候为它指定一个目标类
<!-- 消息监听适配器 -->
<bean id ="messageListenerAdapter" class= "org.springframework.jms.listener.adapter.MessageListenerAdapter" >
<property name ="delegate">
<bean class= "com.tuniu.scc.purchase.plan.manage.core.amq.ConsumerListener" />
</property >
<property name ="defaultListenerMethod" value= "receiveMessage"/>
</bean >
<bean id ="messageListenerAdapterContainer" class= "org.springframework.jms.listener.DefaultMessageListenerContainer" >
<property name ="connectionFactory" ref= "connectionFactory" />
<property name ="destination" ref="adapterQueue" />
<property name ="messageListener" ref= "messageListenerAdapter" />
</bean >
上面说到,目标处理器是一个普通Java类的时候,Spring将进行类型转换之后的对象通过反射去调用真正的方法,那么Spring是怎样知道该调用哪个方法的呢?这是通过MessageListenerAdapter的defaultListenerMethod属性来决定的。当没有指定该属性的时候。会默认调用目标处理器的handleMessage方法
编写新队列、发送消息同上
接收消息:
public class ConsumerListener{
public void handleMessage(String message) {System. out.println( "ConsumerListener通过handleMessage接收到一个纯文本消息。消息内容是:" + message);}public void receiveMessage(String message) {System.out.println("ConsumerListener通过receiveMessage接收到一个纯文本消息。消息内容是:" + message);}}
MessageListenerAdapter的另外一个主要功能就是能够自己主动的发送回复的消息
- 方法一:
- 、public void sendMessage(Destination destination, final String message) {
- System.out.println("---------------生产者发送消息-----------------");
- System.out.println("---------------生产者发了一个消息:" + message);
- jmsTemplate.send(destination, new MessageCreator() {
- public Message createMessage(Session session) throws JMSException {
- TextMessage textMessage = session.createTextMessage(message);
- textMessage.setJMSReplyTo(responseDestination); //(省略编写其相应的监听器代码)
- return textMessage;
- }
- });
- }
方法二:
<!-- 消息监听适配器 -->
<bean id ="messageListenerAdapter" class= "org.springframework.jms.listener.adapter.MessageListenerAdapter" >
<property name ="delegate">
<bean class= "com.tuniu.scc.purchase.plan.manage.core.amq.ConsumerListener" />
</property >
<property name ="defaultListenerMethod" value= "receiveMessage"/>
</bean >
<bean id ="messageListenerAdapterContainer" class= "org.springframework.jms.listener.DefaultMessageListenerContainer" >
<property name ="connectionFactory" ref= "connectionFactory" />
<property name ="destination" ref="adapterQueue" />
<property name ="messageListener" ref= "messageListenerAdapter" />
<property name="defaultResponseDestination" ref="defaultResponseQueue"/>
</bean >
相关文章
- Spring batch教程 之 spring batch简介
- 后端必读《Spring Boot实战》,企业级真实应用案例
- 基于SSM的 spring 发送邮件的实现
- 基于Spring Boot及Kotlin的ddd领域驱动实现
- Spring Boot 实现 MySQL 读写分离技术
- 【03】Spring源码-手写篇-手写AOP实现(上)
- Spring Boot、Spring Cloud 自定义配置文件(如何整合配置中心)
- spring中 junit4 和 junit5 使用
- spring-websocket实现聊天室功能
- Spring Boot 实现日志链路追踪,无需引入组件,让日志定位更方便!
- 基于Spring Boot、Vue的考试报名系统设计与实现-前后端分离-课程设计-毕业设计一键部署版
- Spring Boot整合分布式搜索引擎ElasticSearch 实现相关基本操作
- Spring Boot | 集成MapStruct实现不同类型Java对象间的自动转换
- 分布式系统开发实战:基于Spring Security实现安全认证
- Spring Cloud Sleuth的MDC集成实现自定义跟踪
- Spring Cloud Security OAuth2 中实现简化模式(二)
- Spring Cloud Security OAuth2 中实现客户端模式
- Spring Boot 整合Redis 实现优惠卷秒杀 一人一单功能
- Spring基于注解实现事务管理
- Spring DataSource>DBCP & C3P0详解编程语言
- Spring Boot(十三):spring boot小技巧详解编程语言
- spring boot集成mybatis 自动生成实体类和mapper文件、Dao层详解编程语言
- Spring声明式事务管理(基于Annotation注解方式实现)
- Spring框架下整合Redis的实现(spring整合redis)
- Spring整合Redis简单实现高效缓存(spring集成redis)