《Spring 5 官方文档》26. JMS(三)
26.6 注解驱动的监听端点
异步接收消息的最简单的方法是使用注解监听端点的基础架构。简而言之,它允许你暴露托管一个 bean 的方法作为一个 JMS 的监听端点。
@Component
public class MyService {
@JmsListener(destination = "myDestination")
public void processOrder(String data) { ... }
}
上述示例的想法是,每当javax.jms.Destination
“myDestination” 上有消息可用时,就调用相应地processOrder
方法(在这种情况下,JMS消息的内容类似于MessageListenerAdapter
提供的内容)。
注解端点的基础架构使用JmsListenerContainerFactory
为每个注解方法创建一个消息监听容器。这样的容器没有针对应用上下文进行注册,但是可以使用JmsListenerEndpointRegistry
bean 进行简单的管理。
@JmsListener
是 Java 8上的可重复注解,因此可以通过向其添加额外的@JmsListener
声明将多个JMS目的地关联到同一个方法。 在 Java 6和7上,你可以使用@JmsListeners
注解。
26.6.1 启用监听端点的注解
要启用对@JmsListener
注解的支持,请将@EnableJms
添加到你的一个@Configuration
类上。
@Configuration
@EnableJms
public class AppConfig {
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setDestinationResolver(destinationResolver());
factory.setConcurrency("3-10");
return factory;
}
}
默认情况下,基础架构将查找名为jmsListenerContainerFactory
的 bean 作为用于创建消息监听容器的工厂源。 在这种情况下,忽略 JMS 基础架构的设置,processOrder
方法可以在一个线程池中被调用,线程池核心数为3,最大线程数为10。
对于使用的每个注解都可以自定义监听容器工厂,或通过实现JmsListenerConfigurer
接口来显示的配置默认值。仅在存在没有指定容器工厂的端点时,默认值才是必须的。有关详细信息和示例,请参阅 javadoc。
如果您喜欢XML配置,请使用<jms:annotation-driven>
元素。
<jms:annotation-driven/>
<bean id="jmsListenerContainerFactory"
class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destinationResolver" ref="destinationResolver"/>
<property name="concurrency" value="3-10"/>
</bean>
26.6.2 编程式端点注册
JmsListenerEndpoint
提供了 JMS 端点的模型,并负责为该模型配置容器。除了使用注解之外,基础架构也允许你用编程的方式来配置端点。
@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {
@Override
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
endpoint.setId("myJmsEndpoint");
endpoint.setDestination("anotherQueue");
endpoint.setMessageListener(message -> {
// processing
});
registrar.registerEndpoint(endpoint);
}
}
本例中我们使用SimpleJmsListenerEndpoint
来提供MessageListener
,你也可以建立自己的端点变体并自定义调用机制。
应该注意的是,你完全可以不使用@JmsListener
,而仅通过JmsListenerConfigurer
来注册所有端点。
26.6.3 注解式端点方式签名
到目前为止,我们已经在我们的端点注入了一个简单的String
,但实际上它可以有一个非常灵活的方法签名。现在让我们重写它并注入一个带有自定义头部的Order
:
@Component
public class MyService {
@JmsListener(destination = "myDestination")
public void processOrder(Order order, @Header("order_type") String orderType) {
...
}
}
可以向 JMS 监听端点中注入的主要元素包括:
- 原始的
javax.jms.Message
或任意子类(当然,它与传入的消息类型相匹配)。 - 可选的
javax.jms.Session
来操作 JMS 原生 API,来发送自定义回复。 - 代表着接收消息的
org.springframework.messaging.Message
。注意此消息同时包含自定义和JmsHeaders
定义的标准头。 -
@Header
注解的方法参数被用来提取一个特定的头部值,包括标准 JMS 头。 -
@Headers
注解参数必须指定给一个java.util.Map
,用来获取所有头。 - 不被支持的(如
Message
、Session
等)、且无注解的元素被视为有效载荷。可以明确的给它们添加@Payload
注解。也可以通过添加@Valid
注解来开启校验。
注入 Spring 的Message
抽象可以获取特定消息的所有信息,而无需依赖特定传输 API。
@JmsListener(destination = "myDestination")
public void processOrder(Message<Order> order) { ... }
可以自己扩展DefaultMessageHandlerMethodFactory
来处理额外的方法参数。同时你也可以自定义转换和校验规则。
例如,如果我们需要在处理之前确保我们的Order
有效,我们可以使用@Valid
对有效负载进行注解,并配置必要的验证器,如下所示:
@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {
@Override
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myJmsHandlerMethodFactory());
}
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setValidator(myValidator());
return factory;
}
}
26.6.4 响应管理
MessageListenerAdapter
允许你的方法有非空返回值。此时返回值将被封装在一个 javax.jms.Message
中,发往原始消息的JMSReplyTo
头中定义的目的地或者监听自己默认的目的地。监听默认的目的地可以通过@SendTo
注解来设置。
假定现在我们的processOrder
方法将返回一个OrderStatus
,下面将展示如何自动地发送响应:
@JmsListener(destination = "myDestination")
@SendTo("status")
public OrderStatus processOrder(Order order) {
// order processing
return status;
}
如果你有多个
@JmsListener
注解方法,您还可以将@SendTo
注解放在 class 上以共享默认响应目的地。
如果您需要以独立传输的方式设置额外的头信息,则可以返回一个Message
,如下所示:
@JmsListener(destination = "myDestination")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
// order processing
return MessageBuilder
.withPayload(status)
.setHeader("code", 1234)
.build();
}
如果响应的目的地是运行时实时计算的,可以将响应结果封装在一个JmsResponse
中,直接指定一个目的地。前面的例子可以重写如下:
@JmsListener(destination = "myDestination")
public JmsResponse<Message<OrderStatus>> processOrder(Order order) {
// order processing
Message<OrderStatus> response = MessageBuilder
.withPayload(status)
.setHeader("code", 1234)
.build();
return JmsResponse.forQueue(response, "status");
}
26.7 JMS 命名空间的支持
Spring 引入了 XML 命名空间以简化 JMS 的配置。使用 JMS 命名空间元素时,需要引用如下的 JMS Schema:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd">
<!-- bean definitions here -->
</beans>
命名空间由三个顶级元素组成:,和。可以使用注解驱动的监听端点。 和定义共享监听容器的配置,并且包含了子元素。下面是一个基本配置的示例,包含两个监听器。
<jms:listener-container>
<jms:listener destination="queue.orders" ref="orderService" method="placeOrder"/>
<jms:listener destination="queue.confirmations" ref="confirmationLogger" method="log"/>
</jms:listener-container>
上面的例子等同于在第26.4.4节“MessageListenerAdapter”的示例,定义两个不同的监听器容器和两个不同的MessageListenerAdapter
。除了上面显示的属性之外,listener
元素也包含几个可选属性。下面的表格列出了所有的属性:
表26.1 JMS <listner>的元素属性
属性 | 描述 |
---|---|
id | 托管监听容器的 bean 名称。如果未指定,将自动生成一个 bean 名称。 |
destination(必选) | 监听器的目的地名称,由DestinationResolver 的策略决定。 |
ref(必选) | 处理对象的 bean 名称。 |
method | 处理器中被调用的方法名。如果 ref 指向MessageListener 或者 Spring SessionAwareMessageListener ,则这个属性可以被忽略。 |
response-destination | 默认的响应目的地是发送响应消息抵达的目的地。 这用于请求消息没有包含JMSReplyTo 域的情况。响应目的地类型被监听容器的destination-type 属性决定。记住:这仅仅适用于有返回值的监听器方法,因为每个结果对象都会被转化成响应消息。 |
subscription | 持久订阅的名称(如果需要的话)。 |
selector | 监听器的一个可选的消息选择器。 |
concurrency | 监听器启动的会话/消费者的并发数量。可以是表示最大数量(例如“5”)的简单数字,也可以是表示下限以及上限(例如“3-5”)的范围。 请注意,指定的最小值只是一个提示,在运行时可能会被忽略。默认值是容器提供的值。 |
<listener-container />元素也接受几个可选属性。这允许自定义各种策略(例如,taskExecutor
和destinationResolver
)以及基本的 JMS 设置和资源引用。使用这些属性,可以定义很广泛的定制监听容器,同时仍享有命名空间的便利。
作为一个通过factory-id
属性指定要暴露的 bean 的 id 的JmsListenerContainerFactory
,自动暴露了这些设置。
jms:listener-container connection-factory="myConnectionFactory"
task-executor="myTaskExecutor"
destination-resolver="myDestinationResolver"
transaction-manager="myTransactionManager"
concurrency="10">
<jms:listener destination="queue.orders" ref="orderService" method="placeOrder"/>
<jms:listener destination="queue.confirmations" ref="confirmationLogger" method="log"/>
</jms:listener-container>
下面的表格描述了所有可用的属性。参考AbstractMessageListenerContainer
类和具体子类的 Javadoc 来了解每个属性的细节。这部分的 Javadoc 也提高那个了事务选择和消息传输场景的讨论。
表26.2 JMS <listner-container>的元素属性
属性 | 描述 |
---|---|
container-type | 监听容器的类型。 可用的选项有:default,simple,default102或simple102(默认值为“default”)。 |
container-class | 自定义监听容器的实现类作为完全限定类名。默认是 Spring 的标准DefaultMessageListenerContainer 或SimpleMessageListenerContainer ,取决于container-type 属性。 |
factory-id | 通过对JmsListenerContainerFactory 指定 id,暴露该元素的设置,以便它们可以与其他端点一起使用。 |
connection-factory | 对 JMS ConnectionFactory bean 的引用(默认 bean 名称为connectionFactory )。 |
task-executor | 对JMS监听器调用者的 Spring TaskExecutor 的引用。 |
destination-resolver | 对解决 JMS 目标的DestinationResolver 策略的引用。 |
message-converter | 对 JMS 消息转换为监听器方法参数的MessageConverter 策略的引用。 默认是一个SimpleMessageConverter 。 |
error-handler | 对处理任何未捕获异常的ErrorHandler 策略的引用,异常可能发生在MessageListener 的执行期间。 |
destination-type | 监听器的 JMS 目标类型:queue,topic,durableTopic,sharedTopic 或 sharedDurableTopic。 这样可以间接启用容器的pubSubDomain ,subscriptionDurable 和subscriptionShared 属性。 默认是队列(即禁用这3个属性)。 |
response-destination-type | 响应的JMS目标类型:queue、topic。 默认值为destination-type 属性的值。 |
client-id | 监听容器的 JMS 客户 ID。 使用持久订阅时需要指定。 |
cache | JMS资源的缓存级别:none, connection, session, consumer 或者 auto。 默认情况下(auto),缓存级别有效的是“consumer”。除非已经指定了外部事务管理器,在这种情况下,有效的默认值为 none(假设 Java EE 风格的事务管理,其中给定的ConnectionFactory 是 XA-aware 池)。 |
acknowledge | 原生JMS确认模式:auto,client,dups-ok 或 transacted。transacted 值激活了本地交易的会话。 或者,指定下面描述的transaction-manager 属性。默认为 auto。 |
transaction-manager | 对外部PlatformTransactionManager 的引用(通常是基于XA的事务协调器,例如 Spring 的JtaTransactionManager )。如果未指定,将使用本地确认(请参阅acknowledge 属性)。 |
concurrency | 每个监听器启动的会话/消费者的并发数量。 可以是表示最大数量(例如“5”)的简单数字,也可以是表示下限以及上限(例如“3-5”)的范围。 请注意,指定的最小值只是一个提示,在运行时可能会被忽略。 默认值为1;在 topic 监听器或者 queue 的次序很重要的情况下,将并发限制为1;一般的queue可以考虑提高并发数。 |
prefetch | 要加载到单个会话的消息的最大数量。请注意,提高此数量可能会导致并发消费者的饥饿! |
receive-timeout | 用于接收调用的超时时间(以毫秒为单位)。 默认值为1000 ms(1秒);-1表示没有超时限制。 |
back-off | 指定用于计算恢复尝试间隔的BackOff 实例。 如果BackOffExecution 实现返回BackOffExecution#STOP ,监听容器将不再进一步尝试恢复。 当设置此属性时,将忽略recovery-interval 值。 默认值为FixedBackOff ,间隔为5000 ms,即5秒。 |
recovery-interval | 指定恢复尝试之间的间隔(以毫秒为单位)。是以指定间隔创建FixedBackOff 的便捷方式。 有关更多恢复选项,请考虑指定BackOff 实例。 默认值为5000 ms,即5秒。 |
phase | 此容器应在其中开始和停止的生命周期阶段。 值越小,容器就越早启动,并且更晚停止。 默认值为Integer 。MAX_VALUE ,意味着容器将尽可能晚地启动并尽快停止。 |
使用 jms Schema 支持来配置基于 JCA 的监听器容器很相似。
<jms:jca-listener-container resource-adapter="myResourceAdapter"
destination-resolver="myDestinationResolver"
transaction-manager="myTransactionManager"
concurrency="10">
<jms:listener destination="queue.orders" ref="myMessageListener"/>
</jms:jca-listener-container>
JCA 可用的配置选项描述如下表:
表26.3 JMS <jca-listner-container>的元素属性
属性 | 描述 |
---|---|
factory-id | |
resource-adapter | 对 JCA ResourceAdapter bean 的引用(默认 bean 名称是resourceAdapter )。 |
activation-spec-factory | 对JmsActivationSpecFactory 的引用。 默认情况是自动检测 JMS 供应商及其ActivationSpec 类(请参阅DefaultJmsActivationSpecFactory ) |
destination-resolver | 对解决JMS目标的DestinationResolver 策略的引用。 |
message-converter | 对 JMS 消息转换为监听器方法参数的MessageConverter 策略的引用。 默认是一个SimpleMessageConverter 。 |
error-handler | 对处理任何未捕获异常的ErrorHandler 策略的引用,异常可能发生在MessageListener 的执行期间。 |
destination-type | 监听器的JMS目标类型:queue,topic,durableTopic,sharedTopic或sharedDurableTopic。 这样可以间接启用容器的pubSubDomain,subscriptionDurable和subscriptionShared属性。 默认是队列(即禁用这3个属性)。 |
response-destination-type | 响应的JMS目标类型:“queue”,“topic”。 默认值为“destination-type”属性的值。 |
client-id | 监听容器的 JMS 客户 ID。 使用持久订阅时需要指定。 |
acknowledge | 原生JMS确认模式:auto,client,dups-ok 或 transacted。transacted 值激活了本地交易的会话。 或者,指定下面描述的transaction-manager 属性。 默认为 auto。 |
transaction-manager | 对 Spring JtaTransactionManager 或javax.transaction.TransactionManager 的引用,为每个传入消息启动 XA 事务。 如果未指定,将使用本地确认(请参阅acknowledge 属性)。 |
concurrency | 每个监听器启动的会话/消费者的并发数量。 可以是表示最大数量(例如“5”)的简单数字,也可以是表示下限以及上限(例如“3-5”)的范围。 请注意,指定的最小值只是一个提示,在使用 JCA 监听容器的运行时可能会被忽略。 默认值为1; |
prefetch | 要加载到单个会话的消息的最大数量。 请注意,提高此数量可能会导致并发消费者的饥饿! |
相关文章
- AWS RoboMaker – 开发、测试、部署和管理智能机器人应用程序
- 开源机器人操作系统 (ROS) 与 AWS RoboMaker
- 欢迎来到 AWS re:Invent 2018
- 2018 年 re:Invent 大会上的 AWS 预览和预发布 – Midnight Madness
- AWS 参加 2018 年北美 KubeCon + CloudNativeCon
- 11 月 26 日,星期一:re:Invent 大会今日举行开源会议
- Kubernetes Ingress 与 AWS ALB Ingress 控制器的配合使用
- Amazon S3 公有访问阻止 – 为您的账户和存储桶提供又一层保护
- 新增功能 – 使用 Amazon Comprehend 训练自定义文档分类器
- EC2 Auto Scaling 组推出多个实例类型和购买选项
- 新增功能 – CloudFormation 配置偏离检测
- OpenSource | 新品发布- AWS ParallelCluster
- 新增功能 – Amazon Route 53 混合云解析器
- re:Invent 2018 大会上的一些特别会议
- AWS Quest 2:抵达拉斯维加斯
- AWS GovCloud(美国东部)现已开放
- python format()函数详解
- OpenSource | 2018 年 re:Invent 大会上的开放源
- AWS Quest 2 更新:通往 re:Invent 之路已达中点
- OpenSource | Style Dictionary 的维护人员首次参与 Hacktoberfest