zl程序教程

您现在的位置是:首页 >  后端

当前栏目

MOM系列文章之 - Spring Jms Integration 解读

Spring 系列 解读 文章 Integration JMS
2023-09-14 09:00:28 时间
       前阵子对Spring Jms实现进行了一些扩展,借此机会系统化地研究了一下Spring对JMS的支持,整理成文,希望大家能够喜欢!        本文打算从两个维度(编程API和包结构)进行阐述,希望大家读完,能对Spring在JMS层面上做的事情有一个大致了解。当然喜欢扣细节的朋友,也欢迎提出你的疑惑!     第一部分:编程API        首先,让我们来看下

       前阵子对Spring Jms实现进行了一些扩展,借此机会系统化地研究了一下Spring对JMS的支持,整理成文,希望大家能够喜欢!

       本文打算从两个维度(编程API和包结构)进行阐述,希望大家读完,能对Spring在JMS层面上做的事情有一个大致了解。当然喜欢扣细节的朋友,也欢迎提出你的疑惑!

    第一部分:编程API

       首先,让我们来看下Spring中我们最最经常用到的JmsTemplate,上图

                                                                     

        从继承关系上,我们先来看下接口 JmsOperations,基本上可以归纳出这几类方法:

      Conveniencemethods for sending messages

      Conveniencemethods for sending auto-converted messages

      Conveniencemethods for receiving messages

      Conveniencemethods for receiving auto-converted messages

      Conveniencemethods for browsing messages


        但要注意的是这里面的方法throws出来的异常非JMS 1.1里面的标准JMSException,而是被转译过的JmsException。同时可以看出这个接口

充分遵循了CQRS原则。一个MQ其实就是Wrapper后的Queue,数据结构的知识告诉我们,queue有两种存储结构:Array and  LinkedList。Array擅长随机读取,LinkedList则擅长删除更新操作,一旦底层采用 了LinkedList结构,Brower就是个大问题,这个要格外注意一下。

        再来看下JmsDestinationAccessor,该类继承自JmsAccessor(该类实现了InitializingBean,不解释),注意里面的DestinationResolver类,主要是从简单的String类型的名字解析成具体的Destination,其默认的实现DynamicDestinationResolver基本上已经够用了。举个例子,倘若你要扩展将其解析成zookeeper可识别的Location,可以考虑实现该类。

         好,终于轮到JmsTemplate了,先贴一段Javadoc(这里面有两个地方需要先了解下)

This template uses a org.springframework.jms.support.destination.DynamicDestinationResolver and a SimpleMessageConverter as default strategies for resolving a destination name or converting a message, respectively. These defaults can be overridden through the "destinationResolver" and "messageConverter" bean properties.

        直白,不解释了。。。。。。

NOTE: The ConnectionFactory used with this template should return pooled Connections (or a single shared Connection) as well as pooled Sessions and MessageProducers. Otherwise, performance of ad-hoc JMS operations is going to suffer.

        池化工厂,理由也很充分了。Spring只提供了SingleConnectionFactory,至于池化么,具体的Broker自己去实现,像AMQ在其内部就有基于Commons pool类库的PooledConnectionFactory。

        ok,下面我们深入JmsTemplate,了解其中几个重要的方法:

/**

 * Execute the action specified by the given action object within a

 * JMS Session. Generalized version of {@code execute(SessionCallback)},

 * allowing the JMS Connection to be started on the fly.

 * p Use {@code execute(SessionCallback)} for the general case.

 * Starting the JMS Connection is just necessary for receiving messages,

 * which is preferably achieved through the {@code receive} methods.

 * @param action callback object that exposes the Session

 * @param startConnection whether to start the Connection

 * @return the result object from working with the Session

 * @throws JmsException if there is any problem

 * @see #execute(SessionCallback)

 * @see #receive

 public T T execute(SessionCallback T action, boolean startConnection) throws JmsException {

 Assert.notNull(action, "Callback object must not be null");

 Connection conToClose = null;

 Session sessionToClose = null;

 try {

 //通过事务同步管理器获取与当前线程绑定的Resouce,这里是JmsResourceHolder

 Session sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(

 getConnectionFactory(), this.transactionalResourceFactory, startConnection);

 if (sessionToUse == null) {

 conToClose = createConnection();

 sessionToClose = createSession(conToClose);

 if (startConnection) {

 conToClose.start();

 sessionToUse = sessionToClose;

 if (logger.isDebugEnabled()) {

 logger.debug("Executing callback on JMS Session: " + sessionToUse);

 return action.doInJms(sessionToUse);

 catch (JMSException ex) {

 //注意这里的妙处 - 异常转译

 throw convertJmsAccessException(ex);

 finally {

 JmsUtils.closeSession(sessionToClose);

 ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), startConnection);

 }


/**

 * Send the given JMS message.

 * @param session the JMS Session to operate on

 * @param destination the JMS Destination to send to

 * @param messageCreator callback to create a JMS Message

 * @throws JMSException if thrown by JMS API methods

 protected void doSend(Session session, Destination destination, MessageCreator messageCreator)

 throws JMSException {

 Assert.notNull(messageCreator, "MessageCreator must not be null");

 MessageProducer producer = createProducer(session, destination);

 try {

 Message message = messageCreator.createMessage(session);

 if (logger.isDebugEnabled()) {

 logger.debug("Sending created message: " + message);

 doSend(producer, message);

 // Check commit - avoid commit call within a JTA transaction.

 if (session.getTransacted() isSessionLocallyTransacted(session)) {

 // Transacted session created by this template - commit.

 JmsUtils.commitIfNecessary(session);

 finally {

 JmsUtils.closeMessageProducer(producer);

 }


public void convertAndSend(

 Destination destination, final Object message, final MessagePostProcessor postProcessor)

 throws JmsException {

 send(destination, new MessageCreator() {

 public Message createMessage(Session session) throws JMSException {

 Message msg = getRequiredMessageConverter().toMessage(message, session);

 return postProcessor.postProcessMessage(msg);//注意这里不是对消息发送的后置处理,而是对消息Converter的后置处理(消息发送前的一个Hook)

 }


/**

 * Actually receive a JMS message.

 * @param session the JMS Session to operate on

 * @param consumer the JMS MessageConsumer to receive with

 * @return the JMS Message received, or {@code null} if none

 * @throws JMSException if thrown by JMS API methods

 protected Message doReceive(Session session, MessageConsumer consumer) throws JMSException {

 try {

 // Use transaction timeout (if available).

 long timeout = getReceiveTimeout();

 JmsResourceHolder resourceHolder =

 (JmsResourceHolder) TransactionSynchronizationManager.getResource(getConnectionFactory());

 if (resourceHolder != null resourceHolder.hasTimeout()) {

 timeout = Math.min(timeout, resourceHolder.getTimeToLiveInMillis());

 Message message = doReceive(consumer, timeout);

 if (session.getTransacted()) {

 // Commit necessary - but avoid commit call within a JTA transaction.

 if (isSessionLocallyTransacted(session)) {

 // Transacted session created by this template - commit.

 JmsUtils.commitIfNecessary(session);

 else if (isClientAcknowledge(session)) {

 // Manually acknowledge message, if any.

 if (message != null) {

 message.acknowledge();

 return message;

 finally {

 JmsUtils.closeMessageConsumer(consumer);

 }

     关键代码处已经有注释了,这里就不再赘述了,掌握了这几个核心方法,这个类就算拿下了。

     恩,从编程API的角度来看,差不多就这些内容了。

   第二部分:包结构

     下面,我们从包结构的角度再来进一步了解一下Spring对Jms的集成,如下图:


                                                

      org.springframework.jms包里面提供了一些JMS规范异常的runtime版本,看看jms2在这方面的改进,就知道spring在这方面已然是先驱了。

      org.springframework.jms.config包里面放置了对Jms schema的解析,这是spring为我们提供的一个非常有用的特性,schema用的好的话,也可以做到面向接口编程,扩展性极好。这方面感兴趣的同学,推荐阅读这里http://openwebx.org/docs/Webx3_Guide_Book.html#d0e574,深入了解下Webx是怎么利用Schema实现OCP原则的。

      org.springframework.jms.connection包里面放置了一些与Connection相关的工具类(ConnectionFactoryUtils),基础类(JmsResourceHolder等)。这里重点关注一下JmsTransactionManager(extendsAbstractPlatformTransactionManager,其中的doXXX方法非常有看点),这个类也是JMS本地事务处理的一个核心工作类,如下:

                                             


       org.springframework.jms.core包里面主要是spring封装的一些回调接口,如BrowserCallback,MessageCreator,MessagePostProcessor,ProducerCallback,SessionCallback,当然我们之前分析过的JmsTemplate也在这个包里面。

       org.springframework.jms.core.support包里面就一个抽象类JmsGatewaySupport,暂时没怎么用,就是在afterPropertiesSet方法里面内置了一个initGateway方法,用来做一些定制化操作(custominitialization behavior)。

         org.springframework.jms.listener和org.springframework.jms.listener.adapter包,我们要重点关注一下,刚才编程式API主要介绍了消息的发送,消息的接受是怎么处理的呢,主要看这两个包里面的类。类图如下:

       

             

   

          我们先来了解下SimpleMessageListenerContainer的核心方法:

           

/**

 * Create a MessageConsumer for the given JMS Session,

 * registering a MessageListener for the specified listener.

 * @param session the JMS Session to work on

 * @return the MessageConsumer

 * @throws JMSException if thrown by JMS methods

 * @see #executeListener

 protected MessageConsumer createListenerConsumer(final Session session) throws JMSException {

 Destination destination = getDestination();

 if (destination == null) {

 destination = resolveDestinationName(session, getDestinationName());

 MessageConsumer consumer = createConsumer(session, destination);

 if (this.taskExecutor != null) {

 consumer.setMessageListener(new MessageListener() {

 public void onMessage(final Message message) {

 taskExecutor.execute(new Runnable() {

 public void run() {

 processMessage(message, session);

 else {

 consumer.setMessageListener(new MessageListener() {

 public void onMessage(Message message) {

 processMessage(message, session);

 return consumer;

 }
        怎么样,很简单吧?非常简单的调度算法,也没有失败重连等高级功能。如果需要这些功能,怎么办?ok,是时候DefaultMessageListenerContainer出场了,一个功能相对比较丰富的Listener容器,和SimpleMessageListenerContainer不同,它使用AsyncMessageListenerInvoker执行一个looped的MessageConsumer.receive()调用来接收消息,注意这里的Executor,默认是SimpleAsyncTaskExecutor,文档里写的很清楚:
NOTE: This implementation does not reuse threads! Consider a thread-pooling TaskExecutor implementation instead, in particular for executing a large number of short-lived tasks.
        来看看这个类里面几个重要的成员变量,首先是concurrentConsumers和maxConcurrentConsumers。通过设置setConcurrency方法,可以scale up number of consumers between the minimum number ofconsumers(concurrentConsumers)and the maximum number of consumers(maxConcurrentConsumers)。那么单个消费任务如何消费消息呢,这里又有一个变量需要注意一下,即idleTaskExecutionLimit,官方的解释很清楚了:

Within each task execution, a number of message reception attempts (according to the "maxMessagesPerTask" setting) will each wait for an incoming message (according to the "receiveTimeout" setting). If all of those receive attempts in a given task return without a message, the task is considered idle with respect to received messages. Such a task may still be rescheduled; however, once it reached the specified "idleTaskExecutionLimit", it will shut down (in case of dynamic scaling).
         接下来,我们来看这个类里面最最重要的调度方法,在其内部类AsyncMessageListenerInvoker里面,如下:

public void run() {

 synchronized (lifecycleMonitor) {

 activeInvokerCount++;

 lifecycleMonitor.notifyAll();

 boolean messageReceived = false;

 try {

 if (maxMessagesPerTask 0) {

 messageReceived = executeOngoingLoop();

 else {

 int messageCount = 0;

 while (isRunning() messageCount maxMessagesPerTask) {

 messageReceived = (invokeListener() || messageReceived);

 messageCount++;

 catch (Throwable ex) {

 clearResources();

 if (!this.lastMessageSucceeded) {

 // We failed more than once in a row - sleep for recovery interval

 // even before first recovery attempt.

 sleepInbetweenRecoveryAttempts();

 this.lastMessageSucceeded = false;

 boolean alreadyRecovered = false;

 synchronized (recoveryMonitor) {

 if (this.lastRecoveryMarker == currentRecoveryMarker) {

 handleListenerSetupFailure(ex, false);

 recoverAfterListenerSetupFailure();

 currentRecoveryMarker = new Object();

 else {

 alreadyRecovered = true;

 if (alreadyRecovered) {

 handleListenerSetupFailure(ex, true);

 finally {

 synchronized (lifecycleMonitor) {

 decreaseActiveInvokerCount();

 lifecycleMonitor.notifyAll();

 if (!messageReceived) {

 this.idleTaskExecutionCount++;

 else {

 this.idleTaskExecutionCount = 0;

 synchronized (lifecycleMonitor) {

 if (!shouldRescheduleInvoker(this.idleTaskExecutionCount) || !rescheduleTaskIfNecessary(this)) {

 // Were shutting down completely.

 scheduledInvokers.remove(this);

 if (logger.isDebugEnabled()) {

 logger.debug("Lowered scheduled invoker count: " + scheduledInvokers.size());

 lifecycleMonitor.notifyAll();

 clearResources();

 else if (isRunning()) {

 int nonPausedConsumers = getScheduledConsumerCount() - getPausedTaskCount();

 if (nonPausedConsumers 1) {

 logger.error("All scheduled consumers have been paused, probably due to tasks having been rejected. " +

 "Check your thread pool configuration! Manual recovery necessary through a start() call.");

 else if (nonPausedConsumers getConcurrentConsumers()) {

 logger.warn("Number of scheduled consumers has dropped below concurrentConsumers limit, probably " +

 "due to tasks having been rejected. Check your thread pool configuration! Automatic recovery " +

 "to be triggered by remaining consumers.");

 private boolean executeOngoingLoop() throws JMSException {

 boolean messageReceived = false;

 boolean active = true;

 while (active) {

 synchronized (lifecycleMonitor) {

 boolean interrupted = false;

 boolean wasWaiting = false;

 while ((active = isActive()) !isRunning()) {

 if (interrupted) {

 throw new IllegalStateException("Thread was interrupted while waiting for " +

 "a restart of the listener container, but container is still stopped");

 if (!wasWaiting) {

 decreaseActiveInvokerCount();

 wasWaiting = true;

 try {

 lifecycleMonitor.wait();

 catch (InterruptedException ex) {

 // Re-interrupt current thread, to allow other threads to react.

 Thread.currentThread().interrupt();

 interrupted = true;

 if (wasWaiting) {

 activeInvokerCount++;

 if (scheduledInvokers.size() maxConcurrentConsumers) {

 active = false;

 if (active) {

 messageReceived = (invokeListener() || messageReceived);

 return messageReceived;

 }

      差不多这个类就介绍到这里,继续往下看吧~

      org.springframework.jms.listener.endpoint包里面提供了一些JavaEE特性 – 对JCA的支持,这里就不展开了。

      org.springframework.jms.support,org.springframework.jms.support.converter,org.springframework.jms.support.destination则分别提供了Jms工具类JmsUtils(依我来看,JmsAccessor类可以考虑放到core包里面,而把一些工具类抽到这里来),针对消息转换器(主要包括三类转换,Object - Message,XML - Message,Json - Message),Destination的支持,难度不大,这里也就不展开讨论了。

      org.springframework.jms.remoting包则告诉我们底层可以通过JMS走远程服务,类似RMI的Remoting。

      ok,差不多就这些内容。看了这么多,最后我们再总结一下Spring对JMS封装的不足之处吧:

     (1) Spring对JMS的封装停留在JMS 1.1规范上(1.0.2中的支持Deprecated了),JMS 2的支持在最新的4.0 版本中未曾找见;

     (2) 消息发送 接收的时候没有预留钩子方法。比方说我们有这样的需求 - 跟踪消息走向,在消息发送完后向本地的agent写一点数据,agent定时,定量推送数据去server端做统计运算,展示等。这个时候就没有out-of-box的方法可以去实现,当然变通的方法也有不少,但不适合和开源版本融合;

     (3) 缺少一些容错策略,比方说消息发送失败,如何处理?

     (4) 缺少连接复用,一种很重要的提升性能策略。


       如果有不明白的地方,欢迎大家留言讨论!


参考资料:

http://docs.spring.io/spring/docs/4.0.0.RELEASE/spring-framework-reference/htmlsingle/#jms

      
Spring Boot与消息(JMS、AMQP、RabbitMQ) 1.概述。 大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力。 消息服务中两个重要概念: 消息代理(message broker)和目的地(destination)。 当消息发送者发送
Spring消息之JMS. 一、概念 异步消息简介     与远程调用机制以及REST接口类似,异步消息也是用于应用程序之间通信的。     RMI、Hessian、Burlap、HTTP invoker和Web服务在应用程序之间的通信机制是同步的,即客户端应用程序直接与远程服务相交互,并且一直等到远程过程完成后才继续执行。
阿里特邀专家徐雷Java Spring Boot开发实战系列课程(第18讲):制作Java Docker镜像与推送到DockerHub和阿里云Docker仓库 立即下载