SpringBoot 整合实现ActiveMQ
一、ActiveMQ介绍
ActiveMQ是ASF(Apache Software Foundation)的一款消息中间件(middle-ware),消息中间件主要完成的是消息的接收、存储和转发
。
主要实现的模式是生产消费模式、订阅发布模式
。
其主要区别是:
生产消费模式中,生产完消息,消息一经消费,便不再存在。
发布订阅模式中,一条消息可以有多个订阅者,即一条消息的消费者可以有多个。
消息中间件的主要作用
:流量削峰、异步处理、应用解耦、日志处理。
合理利用消息中间件,可以大大提升网站的并发量,增强网站的稳定性。类似于ActiveMQ这样的消息中间件产品还有很多:RocketMQ、RabbitMQ和Kafka。其中Kafka是构建微服务系统首选的产品。
消息形式:
1、点对点(queue)
2、一对多(topic)
二、ActiveMQ安装、服务端开启
2.1 安装
安装非常简单,官网下载,选择适合的版本,解压,安装启动即可。
需开启8161和61616端口,8161是用于后台管理的端口,61616是Java连接使用。
后台登录地址http://ip:8161/admin,用户名密码都是admin。
2.2 服务端启动
在本地下载好ActiveMQ,进入bin目录,执行./activemq start即可。这一步需要本机上有可用的JRE环境。
可以通过ActiveMQ可视化管理界面进行队列创建和消息管理,同时也可以验证我们的ActiveMQ是否正常工作。(访问地址:localhost:8161,初始用户名和密码都为admin)。
ActiveMQ管理端界面如图所示。
上面的启动是一次性的,如果想一直开启服务,可以将Activemq安装为系统服务:
win10下将Activemq安装为系统服务
三、SpringBoot 整合实现ActiveMQ
3.1添加依赖
<!--activemq依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--消息队列连接池:提升效率的连接池 queue方式可以不添加-->
<!--<dependency>-->
<!-- <groupId>org.apache.activemq</groupId>-->
<!-- <artifactId>activemq-pool</artifactId>-->
<!--</dependency>-->
<!-- 如果配置线程池则加入 -->
<dependency>
<groupId>org.messaginghub</groupId>
<artifactId>pooled-jms</artifactId>
</dependency>
3.2添加配置文件(yml文件)
server:
#Spring boot项目访问端口
port: 8080
spring:
activemq:
broker-url: tcp://127.0.0.1:61616
user: admin
password: admin
# 如果是true,则是Topic;如果是false或者默认,则是queue。
jms:
pub-sub-domain: false
pool:
enabled: false #连接池启动 默认false
# max-connections: 10 # 最大连接数 默认1
# 使用queue(点对点)方式是,pool.enable要设置为false,默认使用的是queue方式,使用topic(订阅)方式是设置为true,同时要添加spring.jms.pub-sub-domain=true
ActiveMQ,有两种形式,分别为Queue(生产消费),Topic(发布订阅)。
Queue为点对点模式,即有一个消息,才能有一个消费,多个消费者不会重复对应一个消息。
Topic为一对多形式,当订阅者订阅后,发布者发布消息所有订阅者都会接受到消息。
3.3 Queue
3.3.1 Queue配置
package com.example.activemqjava.common.activemq.config;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.jms.Queue;
/**
* Queue配置
* Queue为点对点模式,即有一个消息,才能有一个消费,多个消费者不会重复对应一个消息
* @author qzz
*/
@Configuration
public class QueueConfig {
/**
* 定义存放消息的队列
* @return
*/
@Bean
public Queue queue() {
return new ActiveMQQueue("my-test");
}
}
3.3.2 创建生产者
package com.example.activemqjava.common.activemq;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import javax.jms.Queue;
/**
* 生产者
* @author qzz
*/
@Component
public class Producer {
@Autowired
private Queue queue;
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
/**
* 发送消息
* @param msg
*/
public void sendMessage(String msg){
//方法一:添加消息对消息队列
jmsMessagingTemplate.convertAndSend(queue,msg);
//方法二:这种方式不需要手动创建queue,系统会自动创建名为test的队列
// jmsMessagingTemplate.convertAndSend("test",msg);
}
/**
* 发送消息
* @param msg
*/
public void sendMessage(String destination,String msg){
//方法二:这种方式不需要手动创建queue,系统会自动创建名为test的队列
if(destination!=null){
jmsMessagingTemplate.convertAndSend(destination,msg);
}
}
}
3.3.3 创建消费者
package com.example.activemqjava.common.activemq;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/**
* 消费者
* @author qzz
*/
@Component
public class Consumer {
/**
* 消费消息
* 使用JmsListener配置消费者监听的队列,其中message是接收到的消息
*
* @SendTo("Squeue"):SendTo会将此方法返回的数据,写入到OutQueue中去
* @param message
*/
@JmsListener(destination = "my-test")
public void receiveQueue(String message){
System.out.println("Consumer接收的消息是:"+message);
}
}
3.3.4 启动类添加@EnableJms 注解,启动消息队列
/**
* @author qzz
* @EnableJms 会启动 jms 的注解扫描即发现 @JmsListener 注释的方法创建消息监听容器,相当于 <jms:annotation-d riven/>
*/
@SpringBootApplication
@EnableJms //启动消息队列
public class ActivemqJavaApplication {
public static void main(String[] args) {
SpringApplication.run(ActivemqJavaApplication.class, args);
}
}
3.3.5 测试
package com.example.activemqjava.controller;
import com.example.activemqjava.common.activemq.Producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 测试 activemq
* @author qzz
*/
@RestController
@RequestMapping("/test")
public class JmsTestController {
@Autowired
private Producer producer;
/**
* 发送消息
*/
@RequestMapping("/sendMessages")
public void sendMessages(){
for (int i=0;i<5;i++){
producer.sendMessage("this is a queue test"+i);
}
}
}
运行之后登陆ActiveMQ后台管理界面如下:
调用 http://localhost:8080/test/sendMessages接口,查看效果:
刷新ActiveMQ后台管理界面:
Number Of Pending Messages:消息队列中待处理的消息
Number Of Consumers:消费者的数量
Messages Enqueued:累计进入过消息队列的总量
Messages Dequeued:累计消费过的消息总量
3.4 Topic
server:
#Spring boot项目访问端口
port: 8083
spring:
activemq:
broker-url: tcp://127.0.0.1:61616
user: admin
password: admin
# 如果是true,则是Topic;如果是false或者默认,则是queue。
jms:
pub-sub-domain: true
pool:
enabled: true #连接池启动 默认false
max-connections: 10 # 最大连接数 默认1
# 使用queue(点对点)方式是,pool.enable要设置为false,默认使用的是queue方式,使用topic(订阅)方式是设置为true,同时要添加spring.jms.pub-sub-domain=true
3.4.1 Topic配置
package com.example.activemqjava1.activemq.config;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.jms.Topic;
/**
* Topic配置
* Topic:发布/订阅模式,生产者生产了一个消息,可以由多个消费者进行消费
* @author qzz
*/
@Configuration
public class TopicConfig {
/**
* 定义存放消息的topic
* @return
*/
@Bean
public Topic topic() {
return new ActiveMQTopic("my-topic");
}
}
3.4.2 发布者
package com.example.activemqjava1.activemq;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import javax.jms.Topic;
/**
* 发布者
* @author qzz
*/
@Service
public class TopicProducer {
@Autowired
private Topic topic;
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
/**
* 发送消息
* @param msg
*/
public void sendTopic(String msg){
this.jmsMessagingTemplate.convertAndSend(this.topic,msg);
}
}
3.4.3 订阅者
package com.example.activemqjava1.activemq;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/**
* 订阅者
* @author qzz
*/
@Component
public class TopicConsumer {
/**
* 消费消息
* 使用JmsListener配置消费者监听的topic,其中message是接收到的消息
* @param message
*/
@JmsListener(destination = "my-topic")
public void receiveTopic(String message){
System.out.println("TopicConsumer接收的消息是:"+message);
}
}
3.4.4 测试
package com.example.activemqjava1.controller;
import com.example.activemqjava1.activemq.TopicProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 测试 activemq
* @author qzz
*/
@RestController
@RequestMapping("/test")
public class JmsTestController {
@Autowired
private TopicProducer topicProducer;
/**
* 发送消息
*/
@RequestMapping("/sendMessages")
public void sendMessages(){
for (int i=0;i<5;i++){
topicProducer.sendTopic("this is a topic test"+i);
}
}
}
运行之后登陆ActiveMQ后台管理界面如下:
调用 http://localhost:8083/test/sendMessages接口,查看效果:
发现,发送消息成功,但是消息并没有被 监听消费。
原因是:JmsListener注解默认只接收queue消息,如果要接收topic消息,需要设置containerFactory</font>
3.4.5 创建一个配置类,在配置类中提供监听工厂配置
package com.example.activemqjava1.activemq.config;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import javax.jms.ConnectionFactory;
/**
* axtiveMQ配置类:创建监听工厂配置
* @author qzz
*/
@Configuration
public class ActiveMQConfig {
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
@Value("${spring.activemq.user}")
private String userName;
@Value("${spring.activemq.password}")
private String password;
@Bean
public ConnectionFactory connectionFactory(){
return new ActiveMQConnectionFactory(userName,password,brokerUrl);
}
@Bean(name="topicListener")
public JmsListenerContainerFactory<?> jmsListenerContainerFactory(ConnectionFactory connectionFactory){
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
//是否支持 发布订阅
factory.setPubSubDomain(true);
factory.setConnectionFactory(connectionFactory);
return factory;
}
}
3.4.6 修改订阅者中的@JmsListener注解的属性
package com.example.activemqjava1.activemq;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/**
* 订阅者
* @author qzz
*/
@Component
public class TopicConsumer {
/**
* 消费消息
* 使用JmsListener配置消费者监听的topic,其中message是接收到的消息
* @param message
*/
@JmsListener(destination = "my-topic",containerFactory = "topicListener")
public void receiveTopic(String message){
System.out.println("TopicConsumer接收的消息是:"+message);
}
}
3.4.7 重启项目,看效果
登录ActiveMQ后台管理界面,删除对应topic,
然后重启项目,进行测试:
调用 http://localhost:8083/test/sendMessages接口,控制台结果如下:
说明 检查消费成功!
四、代码源码
可以点击此处进行下载!
参考资料:https://blog.csdn.net/mycsdn6/article/details/106322223/
https://blog.csdn.net/qq_38403590/article/details/119773671
https://www.cnblogs.com/wuyoucao/p/10947940.html
相关文章
- springboot中配置urlrewrite实现url伪静态强化网站seo
- 简单才是美! SpringBoot+JPA
- 补习系列(17)-springboot mongodb 内嵌数据库
- SpringBoot 如何进行对象复制
- SpringBoot 跨域 Access-Control-Allow-Origin
- SpringBoot配置属性之其他
- 【SpringBoot笔记15】SpringBoot结合MyBatis实现多数据源配置
- 基于springboot+thymeleaf+mybatis-plus实现增删改查&分页查询
- 【项目实战】SpringBoot整合Freemarker模板引擎实现Email邮件发送功能
- 【SpringBoot19】SpringBoot中整合Ehcache实现热点数据缓存
- Springboot集成百度地图实现定位打卡功能
- SpringBoot实例②springboot+jdbcTemplate小例子实现增删查改
- 基于Java+SpringBoot+Vue前后端分离电影院售票订票系统设计与实现(有演示视频)
- SpringBoot中使用Easyexcel实现Excel导入导出功能(三)
- 【项目实战】Springboot工程整合Dubbo框架,实现RPC远程调用
- 【springboot】自动加载分析
- springboot日志的实现方式(两种log4j2.properties和log4j2.yml)