zl程序教程

您现在的位置是:首页 >  后端

当前栏目

《Spring 5 官方文档》26. JMS(三)

2023-09-11 14:16:08 时间
26.6 注解驱动的监听端点

异步接收消息的最简单的方法是使用注解监听端点的基础架构。简而言之,它允许你暴露托管一个 bean 的方法作为一个 JMS 的监听端点。

@Component

public class MyService {

 @JmsListener(destination = "myDestination")

 public void processOrder(String data) { ... }

上述示例的想法是,每当javax.jms.Destination “myDestination” 上有消息可用时,就调用相应地processOrder方法(在这种情况下,JMS消息的内容类似于MessageListenerAdapter提供的内容)。

注解端点的基础架构使用JmsListenerContainerFactory为每个注解方法创建一个消息监听容器。这样的容器没有针对应用上下文进行注册,但是可以使用JmsListenerEndpointRegistry bean 进行简单的管理。

@JmsListener是 Java 8上的可重复注解,因此可以通过向其添加额外的@JmsListener声明将多个JMS目的地关联到同一个方法。 在 Java 6和7上,你可以使用@JmsListeners注解。


26.6.1 启用监听端点的注解

要启用对@JmsListener注解的支持,请将@EnableJms添加到你的一个@Configuration类上。

@Configuration

@EnableJms

public class AppConfig {

 @Bean

 public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {

 DefaultJmsListenerContainerFactory factory =

 new DefaultJmsListenerContainerFactory();

 factory.setConnectionFactory(connectionFactory());

 factory.setDestinationResolver(destinationResolver());

 factory.setConcurrency("3-10");

 return factory;

默认情况下,基础架构将查找名为jmsListenerContainerFactory的 bean 作为用于创建消息监听容器的工厂源。 在这种情况下,忽略 JMS 基础架构的设置,processOrder方法可以在一个线程池中被调用,线程池核心数为3,最大线程数为10。

对于使用的每个注解都可以自定义监听容器工厂,或通过实现JmsListenerConfigurer接口来显示的配置默认值。仅在存在没有指定容器工厂的端点时,默认值才是必须的。有关详细信息和示例,请参阅 javadoc。

如果您喜欢XML配置,请使用 jms:annotation-driven 元素。

 jms:annotation-driven/ 

 bean id="jmsListenerContainerFactory"

 property name="connectionFactory" ref="connectionFactory"/ 

 property name="destinationResolver" ref="destinationResolver"/ 

 property name="concurrency" value="3-10"/ 

 /bean 

26.6.2 编程式端点注册

JmsListenerEndpoint提供了 JMS 端点的模型,并负责为该模型配置容器。除了使用注解之外,基础架构也允许你用编程的方式来配置端点。

@Configuration

@EnableJms

public class AppConfig implements JmsListenerConfigurer {

 @Override

 public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {

 SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();

 endpoint.setId("myJmsEndpoint");

 endpoint.setDestination("anotherQueue");

 endpoint.setMessageListener(message - {

 // processing

 registrar.registerEndpoint(endpoint);

本例中我们使用SimpleJmsListenerEndpoint来提供MessageListener,你也可以建立自己的端点变体并自定义调用机制。

应该注意的是,你完全可以不使用@JmsListener,而仅通过JmsListenerConfigurer来注册所有端点。

26.6.3 注解式端点方式签名

到目前为止,我们已经在我们的端点注入了一个简单的String,但实际上它可以有一个非常灵活的方法签名。现在让我们重写它并注入一个带有自定义头部的Order:

@Component

public class MyService {

 @JmsListener(destination = "myDestination")

 public void processOrder(Order order, @Header("order_type") String orderType) {

可以向 JMS 监听端点中注入的主要元素包括:

原始的javax.jms.Message或任意子类(当然,它与传入的消息类型相匹配)。 可选的javax.jms.Session来操作 JMS 原生 API,来发送自定义回复。 代表着接收消息的org.springframework.messaging.Message。注意此消息同时包含自定义和JmsHeaders定义的标准头。 @Header注解的方法参数被用来提取一个特定的头部值,包括标准 JMS 头。 @Headers注解参数必须指定给一个java.util.Map,用来获取所有头。 不被支持的(如Message、Session等)、且无注解的元素被视为有效载荷。可以明确的给它们添加@Payload注解。也可以通过添加@Valid注解来开启校验。

注入 Spring 的Message抽象可以获取特定消息的所有信息,而无需依赖特定传输 API。

@JmsListener(destination = "myDestination")

public void processOrder(Message Order order) { ... }

可以自己扩展DefaultMessageHandlerMethodFactory来处理额外的方法参数。同时你也可以自定义转换和校验规则。

例如,如果我们需要在处理之前确保我们的Order有效,我们可以使用@Valid对有效负载进行注解,并配置必要的验证器,如下所示:

@Configuration

@EnableJms

public class AppConfig implements JmsListenerConfigurer {

 @Override

 public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {

 registrar.setMessageHandlerMethodFactory(myJmsHandlerMethodFactory());

 @Bean

 public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {

 DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();

 factory.setValidator(myValidator());

 return factory;

26.6.4 响应管理

MessageListenerAdapter允许你的方法有非空返回值。此时返回值将被封装在一个 javax.jms.Message中,发往原始消息的JMSReplyTo头中定义的目的地或者监听自己默认的目的地。监听默认的目的地可以通过@SendTo注解来设置。

假定现在我们的processOrder方法将返回一个OrderStatus,下面将展示如何自动地发送响应:

@JmsListener(destination = "myDestination")

@SendTo("status")

public OrderStatus processOrder(Order order) {

 // order processing

 return status;

如果你有多个@JmsListener注解方法,您还可以将@SendTo注解放在 class 上以共享默认响应目的地。

如果您需要以独立传输的方式设置额外的头信息,则可以返回一个Message,如下所示:

@JmsListener(destination = "myDestination")

@SendTo("status")

public Message OrderStatus processOrder(Order order) {

 // order processing

 return MessageBuilder

 .withPayload(status)

 .setHeader("code", 1234)

 .build();

如果响应的目的地是运行时实时计算的,可以将响应结果封装在一个JmsResponse中,直接指定一个目的地。前面的例子可以重写如下:

@JmsListener(destination = "myDestination")

public JmsResponse Message OrderStatus processOrder(Order order) {

 // order processing

 Message OrderStatus response = MessageBuilder

 .withPayload(status)

 .setHeader("code", 1234)

 .build();

 return JmsResponse.forQueue(response, "status");

26.7 JMS 命名空间的支持

Spring 引入了 XML 命名空间以简化 JMS 的配置。使用 JMS 命名空间元素时,需要引用如下的 JMS Schema:

 ?xml version="1.0" encoding="UTF-8"? 

 beans xmlns="http://www.springframework.org/schema/beans"

 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

 xmlns:jms="http://www.springframework.org/schema/jms"

 xsi:schemaLocation="

 http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

 http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd" 

 !-- bean definitions here -- 

 /beans 

命名空间由三个顶级元素组成:,和。可以使用注解驱动的监听端点。 和定义共享监听容器的配置,并且包含了子元素。下面是一个基本配置的示例,包含两个监听器。

 jms:listener-container 

 jms:listener destination="queue.orders" ref="orderService" method="placeOrder"/ 

 jms:listener destination="queue.confirmations" ref="confirmationLogger" method="log"/ 

 /jms:listener-container 

上面的例子等同于在第26.4.4节“MessageListenerAdapter”的示例,定义两个不同的监听器容器和两个不同的MessageListenerAdapter。除了上面显示的属性之外,listener元素也包含几个可选属性。下面的表格列出了所有的属性:

表26.1 JMS listner 的元素属性
method 处理器中被调用的方法名。如果 ref 指向MessageListener或者 Spring SessionAwareMessageListener,则这个属性可以被忽略。
response-destination 默认的响应目的地是发送响应消息抵达的目的地。 这用于请求消息没有包含JMSReplyTo域的情况。响应目的地类型被监听容器的destination-type属性决定。记住:这仅仅适用于有返回值的监听器方法,因为每个结果对象都会被转化成响应消息。
concurrency 监听器启动的会话/消费者的并发数量。可以是表示最大数量(例如“5”)的简单数字,也可以是表示下限以及上限(例如“3-5”)的范围。 请注意,指定的最小值只是一个提示,在运行时可能会被忽略。默认值是容器提供的值。

listener-container / 元素也接受几个可选属性。这允许自定义各种策略(例如,taskExecutor和destinationResolver)以及基本的 JMS 设置和资源引用。使用这些属性,可以定义很广泛的定制监听容器,同时仍享有命名空间的便利。

作为一个通过factory-id属性指定要暴露的 bean 的 id 的JmsListenerContainerFactory,自动暴露了这些设置。

jms:listener-container connection-factory="myConnectionFactory"

 task-executor="myTaskExecutor"

 destination-resolver="myDestinationResolver"

 transaction-manager="myTransactionManager"

 concurrency="10" 

 jms:listener destination="queue.orders" ref="orderService" method="placeOrder"/ 

 jms:listener destination="queue.confirmations" ref="confirmationLogger" method="log"/ 

 /jms:listener-container 

下面的表格描述了所有可用的属性。参考AbstractMessageListenerContainer类和具体子类的 Javadoc 来了解每个属性的细节。这部分的 Javadoc 也提高那个了事务选择和消息传输场景的讨论。

表26.2 JMS listner-container 的元素属性
container-type 监听容器的类型。 可用的选项有:default,simple,default102或simple102(默认值为“default”)。
container-class 自定义监听容器的实现类作为完全限定类名。默认是 Spring 的标准DefaultMessageListenerContainer或SimpleMessageListenerContainer,取决于container-type属性。
factory-id 通过对JmsListenerContainerFactory指定 id,暴露该元素的设置,以便它们可以与其他端点一起使用。
connection-factory 对 JMS ConnectionFactory bean 的引用(默认 bean 名称为connectionFactory)。
message-converter 对 JMS 消息转换为监听器方法参数的MessageConverter策略的引用。 默认是一个SimpleMessageConverter。
error-handler 对处理任何未捕获异常的ErrorHandler策略的引用,异常可能发生在MessageListener的执行期间。
destination-type 监听器的 JMS 目标类型:queue,topic,durableTopic,sharedTopic 或 sharedDurableTopic。 这样可以间接启用容器的pubSubDomain,subscriptionDurable和subscriptionShared属性。 默认是队列(即禁用这3个属性)。
response-destination-type 响应的JMS目标类型:queue、topic。 默认值为destination-type属性的值。
cache JMS资源的缓存级别:none, connection, session, consumer 或者 auto。 默认情况下(auto),缓存级别有效的是“consumer”。除非已经指定了外部事务管理器,在这种情况下,有效的默认值为 none(假设 Java EE 风格的事务管理,其中给定的ConnectionFactory是 XA-aware 池)。
acknowledge 原生JMS确认模式:auto,client,dups-ok 或 transacted。transacted 值激活了本地交易的会话。 或者,指定下面描述的transaction-manager属性。默认为 auto。
transaction-manager 对外部PlatformTransactionManager的引用(通常是基于XA的事务协调器,例如 Spring 的JtaTransactionManager)。如果未指定,将使用本地确认(请参阅acknowledge属性)。
concurrency 每个监听器启动的会话/消费者的并发数量。 可以是表示最大数量(例如“5”)的简单数字,也可以是表示下限以及上限(例如“3-5”)的范围。 请注意,指定的最小值只是一个提示,在运行时可能会被忽略。 默认值为1;在 topic 监听器或者 queue 的次序很重要的情况下,将并发限制为1;一般的queue可以考虑提高并发数。
prefetch 要加载到单个会话的消息的最大数量。请注意,提高此数量可能会导致并发消费者的饥饿!
receive-timeout 用于接收调用的超时时间(以毫秒为单位)。 默认值为1000 ms(1秒);-1表示没有超时限制。
back-off 指定用于计算恢复尝试间隔的BackOff实例。 如果BackOffExecution实现返回BackOffExecution#STOP,监听容器将不再进一步尝试恢复。 当设置此属性时,将忽略recovery-interval值。 默认值为FixedBackOff,间隔为5000 ms,即5秒。
recovery-interval 指定恢复尝试之间的间隔(以毫秒为单位)。是以指定间隔创建FixedBackOff的便捷方式。 有关更多恢复选项,请考虑指定BackOff实例。 默认值为5000 ms,即5秒。
phase 此容器应在其中开始和停止的生命周期阶段。 值越小,容器就越早启动,并且更晚停止。 默认值为Integer。MAX_VALUE,意味着容器将尽可能晚地启动并尽快停止。

使用 jms Schema 支持来配置基于 JCA 的监听器容器很相似。

 jms:jca-listener-container resource-adapter="myResourceAdapter"

 destination-resolver="myDestinationResolver"

 transaction-manager="myTransactionManager"

 concurrency="10" 

 jms:listener destination="queue.orders" ref="myMessageListener"/ 

 /jms:jca-listener-container 

JCA 可用的配置选项描述如下表:

表26.3 JMS jca-listner-container 的元素属性
resource-adapter 对 JCA ResourceAdapter bean 的引用(默认 bean 名称是resourceAdapter)。
activation-spec-factory 对JmsActivationSpecFactory的引用。 默认情况是自动检测 JMS 供应商及其ActivationSpec类(请参阅DefaultJmsActivationSpecFactory)
message-converter 对 JMS 消息转换为监听器方法参数的MessageConverter策略的引用。 默认是一个SimpleMessageConverter。
error-handler 对处理任何未捕获异常的ErrorHandler策略的引用,异常可能发生在MessageListener的执行期间。
destination-type 监听器的JMS目标类型:queue,topic,durableTopic,sharedTopic或sharedDurableTopic。 这样可以间接启用容器的pubSubDomain,subscriptionDurable和subscriptionShared属性。 默认是队列(即禁用这3个属性)。
response-destination-type 响应的JMS目标类型:“queue”,“topic”。 默认值为“destination-type”属性的值。
acknowledge 原生JMS确认模式:auto,client,dups-ok 或 transacted。transacted 值激活了本地交易的会话。 或者,指定下面描述的transaction-manager属性。 默认为 auto。
transaction-manager 对 Spring JtaTransactionManager或javax.transaction.TransactionManager的引用,为每个传入消息启动 XA 事务。 如果未指定,将使用本地确认(请参阅acknowledge属性)。
concurrency 每个监听器启动的会话/消费者的并发数量。 可以是表示最大数量(例如“5”)的简单数字,也可以是表示下限以及上限(例如“3-5”)的范围。 请注意,指定的最小值只是一个提示,在使用 JCA 监听容器的运行时可能会被忽略。 默认值为1;
prefetch 要加载到单个会话的消息的最大数量。 请注意,提高此数量可能会导致并发消费者的饥饿!
Spring Boot与消息(JMS、AMQP、RabbitMQ) 1.概述。 大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力。 消息服务中两个重要概念: 消息代理(message broker)和目的地(destination)。 当消息发送者发送
Spring消息之JMS. 一、概念 异步消息简介     与远程调用机制以及REST接口类似,异步消息也是用于应用程序之间通信的。     RMI、Hessian、Burlap、HTTP invoker和Web服务在应用程序之间的通信机制是同步的,即客户端应用程序直接与远程服务相交互,并且一直等到远程过程完成后才继续执行。
阿里特邀专家徐雷Java Spring Boot开发实战系列课程(第18讲):制作Java Docker镜像与推送到DockerHub和阿里云Docker仓库 立即下载