zl程序教程

您现在的位置是:首页 >  其他

当前栏目

《Spring 5 官方文档》26. JMS(三)

2023-03-14 10:29:49 时间

26.6 注解驱动的监听端点

异步接收消息的最简单的方法是使用注解监听端点的基础架构。简而言之,它允许你暴露托管一个 bean 的方法作为一个 JMS 的监听端点。

@Component
public class MyService {

	@JmsListener(destination = "myDestination")
	public void processOrder(String data) { ... }
}

上述示例的想法是,每当javax.jms.Destination “myDestination” 上有消息可用时,就调用相应地processOrder方法(在这种情况下,JMS消息的内容类似于MessageListenerAdapter提供的内容)。

注解端点的基础架构使用JmsListenerContainerFactory为每个注解方法创建一个消息监听容器。这样的容器没有针对应用上下文进行注册,但是可以使用JmsListenerEndpointRegistry bean 进行简单的管理。

@JmsListener是 Java 8上的可重复注解,因此可以通过向其添加额外的@JmsListener声明将多个JMS目的地关联到同一个方法。 在 Java 6和7上,你可以使用@JmsListeners注解。

26.6.1 启用监听端点的注解

要启用对@JmsListener注解的支持,请将@EnableJms添加到你的一个@Configuration类上。

@Configuration
@EnableJms
public class AppConfig {

	@Bean
	public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
		DefaultJmsListenerContainerFactory factory =
				new DefaultJmsListenerContainerFactory();
		factory.setConnectionFactory(connectionFactory());
		factory.setDestinationResolver(destinationResolver());
		factory.setConcurrency("3-10");
		return factory;
	}
}

默认情况下,基础架构将查找名为jmsListenerContainerFactory的 bean 作为用于创建消息监听容器的工厂源。 在这种情况下,忽略 JMS 基础架构的设置,processOrder方法可以在一个线程池中被调用,线程池核心数为3,最大线程数为10。

对于使用的每个注解都可以自定义监听容器工厂,或通过实现JmsListenerConfigurer接口来显示的配置默认值。仅在存在没有指定容器工厂的端点时,默认值才是必须的。有关详细信息和示例,请参阅 javadoc。

如果您喜欢XML配置,请使用<jms:annotation-driven>元素。

<jms:annotation-driven/>

   <bean id="jmsListenerContainerFactory"
           class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
       <property name="connectionFactory" ref="connectionFactory"/>
       <property name="destinationResolver" ref="destinationResolver"/>
       <property name="concurrency" value="3-10"/>
   </bean>

26.6.2 编程式端点注册

JmsListenerEndpoint提供了 JMS 端点的模型,并负责为该模型配置容器。除了使用注解之外,基础架构也允许你用编程的方式来配置端点。

@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {

	@Override
	public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
		SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
		endpoint.setId("myJmsEndpoint");
		endpoint.setDestination("anotherQueue");
		endpoint.setMessageListener(message -> {
			// processing
		});
		registrar.registerEndpoint(endpoint);
	}
}

本例中我们使用SimpleJmsListenerEndpoint来提供MessageListener,你也可以建立自己的端点变体并自定义调用机制。

应该注意的是,你完全可以不使用@JmsListener,而仅通过JmsListenerConfigurer来注册所有端点。

26.6.3 注解式端点方式签名

到目前为止,我们已经在我们的端点注入了一个简单的String,但实际上它可以有一个非常灵活的方法签名。现在让我们重写它并注入一个带有自定义头部的Order

@Component
public class MyService {

   	@JmsListener(destination = "myDestination")
   	public void processOrder(Order order, @Header("order_type") String orderType) {
       	...
   	}
}

可以向 JMS 监听端点中注入的主要元素包括:

  • 原始的javax.jms.Message或任意子类(当然,它与传入的消息类型相匹配)。
  • 可选的javax.jms.Session来操作 JMS 原生 API,来发送自定义回复。
  • 代表着接收消息的org.springframework.messaging.Message。注意此消息同时包含自定义和JmsHeaders定义的标准头。
  • @Header注解的方法参数被用来提取一个特定的头部值,包括标准 JMS 头。
  • @Headers注解参数必须指定给一个java.util.Map,用来获取所有头。
  • 不被支持的(如MessageSession等)、且无注解的元素被视为有效载荷。可以明确的给它们添加@Payload注解。也可以通过添加@Valid注解来开启校验。

注入 Spring 的Message抽象可以获取特定消息的所有信息,而无需依赖特定传输 API。

@JmsListener(destination = "myDestination")
public void processOrder(Message<Order> order) { ... }

可以自己扩展DefaultMessageHandlerMethodFactory来处理额外的方法参数。同时你也可以自定义转换和校验规则。

例如,如果我们需要在处理之前确保我们的Order有效,我们可以使用@Valid对有效负载进行注解,并配置必要的验证器,如下所示:

@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {

   	@Override
   	public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
       	registrar.setMessageHandlerMethodFactory(myJmsHandlerMethodFactory());
   	}

   	@Bean
   	public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
       	DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
       	factory.setValidator(myValidator());
       	return factory;
   	}
}

26.6.4 响应管理

MessageListenerAdapter允许你的方法有非空返回值。此时返回值将被封装在一个 javax.jms.Message中,发往原始消息的JMSReplyTo头中定义的目的地或者监听自己默认的目的地。监听默认的目的地可以通过@SendTo注解来设置。

假定现在我们的processOrder方法将返回一个OrderStatus,下面将展示如何自动地发送响应:

@JmsListener(destination = "myDestination")
@SendTo("status")
public OrderStatus processOrder(Order order) {
   	// order processing
   	return status;
}

如果你有多个@JmsListener注解方法,您还可以将@SendTo注解放在 class 上以共享默认响应目的地。

如果您需要以独立传输的方式设置额外的头信息,则可以返回一个Message,如下所示:

@JmsListener(destination = "myDestination")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
   	// order processing
   	return MessageBuilder
       	    .withPayload(status)
           	.setHeader("code", 1234)
           	.build();
}

如果响应的目的地是运行时实时计算的,可以将响应结果封装在一个JmsResponse中,直接指定一个目的地。前面的例子可以重写如下:

@JmsListener(destination = "myDestination")
public JmsResponse<Message<OrderStatus>> processOrder(Order order) {
   	// order processing
   	Message<OrderStatus> response = MessageBuilder
       	    .withPayload(status)
           	.setHeader("code", 1234)
           	.build();
	return JmsResponse.forQueue(response, "status");
}

26.7 JMS 命名空间的支持

Spring 引入了 XML 命名空间以简化 JMS 的配置。使用 JMS 命名空间元素时,需要引用如下的 JMS Schema:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
		xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
		xmlns:jms="http://www.springframework.org/schema/jms"
		xsi:schemaLocation="
			http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
			http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">

	<!-- bean definitions here -->

</beans>

命名空间由三个顶级元素组成:,和。可以使用注解驱动的监听端点。 和定义共享监听容器的配置,并且包含了子元素。下面是一个基本配置的示例,包含两个监听器。

<jms:listener-container>

	<jms:listener destination="queue.orders" ref="orderService" method="placeOrder"/>

	<jms:listener destination="queue.confirmations" ref="confirmationLogger" method="log"/>

</jms:listener-container>

上面的例子等同于在第26.4.4节“MessageListenerAdapter”的示例,定义两个不同的监听器容器和两个不同的MessageListenerAdapter。除了上面显示的属性之外,listener元素也包含几个可选属性。下面的表格列出了所有的属性:

表26.1 JMS <listner>的元素属性

属性 描述
id 托管监听容器的 bean 名称。如果未指定,将自动生成一个 bean 名称。
destination(必选) 监听器的目的地名称,由DestinationResolver的策略决定。
ref(必选) 处理对象的 bean 名称。
method 处理器中被调用的方法名。如果 ref 指向MessageListener或者 Spring SessionAwareMessageListener,则这个属性可以被忽略。
response-destination 默认的响应目的地是发送响应消息抵达的目的地。 这用于请求消息没有包含JMSReplyTo域的情况。响应目的地类型被监听容器的destination-type属性决定。记住:这仅仅适用于有返回值的监听器方法,因为每个结果对象都会被转化成响应消息。
subscription 持久订阅的名称(如果需要的话)。
selector 监听器的一个可选的消息选择器。
concurrency 监听器启动的会话/消费者的并发数量。可以是表示最大数量(例如“5”)的简单数字,也可以是表示下限以及上限(例如“3-5”)的范围。 请注意,指定的最小值只是一个提示,在运行时可能会被忽略。默认值是容器提供的值。

<listener-container />元素也接受几个可选属性。这允许自定义各种策略(例如,taskExecutordestinationResolver)以及基本的 JMS 设置和资源引用。使用这些属性,可以定义很广泛的定制监听容器,同时仍享有命名空间的便利。

作为一个通过factory-id属性指定要暴露的 bean 的 id 的JmsListenerContainerFactory,自动暴露了这些设置。

jms:listener-container connection-factory="myConnectionFactory"
		task-executor="myTaskExecutor"
		destination-resolver="myDestinationResolver"
		transaction-manager="myTransactionManager"
		concurrency="10">

	<jms:listener destination="queue.orders" ref="orderService" method="placeOrder"/>

	<jms:listener destination="queue.confirmations" ref="confirmationLogger" method="log"/>

</jms:listener-container>

下面的表格描述了所有可用的属性。参考AbstractMessageListenerContainer类和具体子类的 Javadoc 来了解每个属性的细节。这部分的 Javadoc 也提高那个了事务选择和消息传输场景的讨论。

表26.2 JMS <listner-container>的元素属性

属性 描述
container-type 监听容器的类型。 可用的选项有:default,simple,default102或simple102(默认值为“default”)。
container-class 自定义监听容器的实现类作为完全限定类名。默认是 Spring 的标准DefaultMessageListenerContainerSimpleMessageListenerContainer,取决于container-type属性。
factory-id 通过对JmsListenerContainerFactory指定 id,暴露该元素的设置,以便它们可以与其他端点一起使用。
connection-factory 对 JMS ConnectionFactory bean 的引用(默认 bean 名称为connectionFactory)。
task-executor 对JMS监听器调用者的 Spring TaskExecutor的引用。
destination-resolver 对解决 JMS 目标的DestinationResolver策略的引用。
message-converter 对 JMS 消息转换为监听器方法参数的MessageConverter策略的引用。 默认是一个SimpleMessageConverter
error-handler 对处理任何未捕获异常的ErrorHandler策略的引用,异常可能发生在MessageListener的执行期间。
destination-type 监听器的 JMS 目标类型:queue,topic,durableTopic,sharedTopic 或 sharedDurableTopic。 这样可以间接启用容器的pubSubDomainsubscriptionDurablesubscriptionShared属性。 默认是队列(即禁用这3个属性)。
response-destination-type 响应的JMS目标类型:queue、topic。 默认值为destination-type属性的值。
client-id 监听容器的 JMS 客户 ID。 使用持久订阅时需要指定。
cache JMS资源的缓存级别:none, connection, session, consumer 或者 auto。 默认情况下(auto),缓存级别有效的是“consumer”。除非已经指定了外部事务管理器,在这种情况下,有效的默认值为 none(假设 Java EE 风格的事务管理,其中给定的ConnectionFactory是 XA-aware 池)。
acknowledge 原生JMS确认模式:auto,client,dups-ok 或 transacted。transacted 值激活了本地交易的会话。 或者,指定下面描述的transaction-manager属性。默认为 auto。
transaction-manager 对外部PlatformTransactionManager的引用(通常是基于XA的事务协调器,例如 Spring 的JtaTransactionManager)。如果未指定,将使用本地确认(请参阅acknowledge属性)。
concurrency 每个监听器启动的会话/消费者的并发数量。 可以是表示最大数量(例如“5”)的简单数字,也可以是表示下限以及上限(例如“3-5”)的范围。 请注意,指定的最小值只是一个提示,在运行时可能会被忽略。 默认值为1;在 topic 监听器或者 queue 的次序很重要的情况下,将并发限制为1;一般的queue可以考虑提高并发数。
prefetch 要加载到单个会话的消息的最大数量。请注意,提高此数量可能会导致并发消费者的饥饿!
receive-timeout 用于接收调用的超时时间(以毫秒为单位)。 默认值为1000 ms(1秒);-1表示没有超时限制。
back-off 指定用于计算恢复尝试间隔的BackOff实例。 如果BackOffExecution实现返回BackOffExecution#STOP,监听容器将不再进一步尝试恢复。 当设置此属性时,将忽略recovery-interval值。 默认值为FixedBackOff,间隔为5000 ms,即5秒。
recovery-interval 指定恢复尝试之间的间隔(以毫秒为单位)。是以指定间隔创建FixedBackOff的便捷方式。 有关更多恢复选项,请考虑指定BackOff实例。 默认值为5000 ms,即5秒。
phase 此容器应在其中开始和停止的生命周期阶段。 值越小,容器就越早启动,并且更晚停止。 默认值为IntegerMAX_VALUE,意味着容器将尽可能晚地启动并尽快停止。

使用 jms Schema 支持来配置基于 JCA 的监听器容器很相似。

<jms:jca-listener-container resource-adapter="myResourceAdapter"
		destination-resolver="myDestinationResolver"
		transaction-manager="myTransactionManager"
		concurrency="10">

	<jms:listener destination="queue.orders" ref="myMessageListener"/>

</jms:jca-listener-container>

JCA 可用的配置选项描述如下表:

表26.3 JMS <jca-listner-container>的元素属性

属性 描述
factory-id
resource-adapter 对 JCA ResourceAdapter bean 的引用(默认 bean 名称是resourceAdapter)。
activation-spec-factory JmsActivationSpecFactory的引用。 默认情况是自动检测 JMS 供应商及其ActivationSpec类(请参阅DefaultJmsActivationSpecFactory
destination-resolver 对解决JMS目标的DestinationResolver策略的引用。
message-converter 对 JMS 消息转换为监听器方法参数的MessageConverter策略的引用。 默认是一个SimpleMessageConverter
error-handler 对处理任何未捕获异常的ErrorHandler策略的引用,异常可能发生在MessageListener的执行期间。
destination-type 监听器的JMS目标类型:queue,topic,durableTopic,sharedTopic或sharedDurableTopic。 这样可以间接启用容器的pubSubDomain,subscriptionDurable和subscriptionShared属性。 默认是队列(即禁用这3个属性)。
response-destination-type 响应的JMS目标类型:“queue”,“topic”。 默认值为“destination-type”属性的值。
client-id 监听容器的 JMS 客户 ID。 使用持久订阅时需要指定。
acknowledge 原生JMS确认模式:auto,client,dups-ok 或 transacted。transacted 值激活了本地交易的会话。 或者,指定下面描述的transaction-manager属性。 默认为 auto。
transaction-manager 对 Spring JtaTransactionManagerjavax.transaction.TransactionManager的引用,为每个传入消息启动 XA 事务。 如果未指定,将使用本地确认(请参阅acknowledge属性)。
concurrency 每个监听器启动的会话/消费者的并发数量。 可以是表示最大数量(例如“5”)的简单数字,也可以是表示下限以及上限(例如“3-5”)的范围。 请注意,指定的最小值只是一个提示,在使用 JCA 监听容器的运行时可能会被忽略。 默认值为1;
prefetch 要加载到单个会话的消息的最大数量。 请注意,提高此数量可能会导致并发消费者的饥饿!