zl程序教程

您现在的位置是:首页 >  后端

当前栏目

SpringBoot 整合实现ActiveMQ

SpringBoot 实现 整合 activemq
2023-09-11 14:20:19 时间

一、ActiveMQ介绍

ActiveMQ是ASF(Apache Software Foundation)的一款消息中间件(middle-ware),消息中间件主要完成的是消息的接收、存储和转发

主要实现的模式是生产消费模式、订阅发布模式

其主要区别是:
生产消费模式中,生产完消息,消息一经消费,便不再存在。
发布订阅模式中,一条消息可以有多个订阅者,即一条消息的消费者可以有多个。

消息中间件的主要作用:流量削峰、异步处理、应用解耦、日志处理。

合理利用消息中间件,可以大大提升网站的并发量,增强网站的稳定性。类似于ActiveMQ这样的消息中间件产品还有很多:RocketMQ、RabbitMQ和Kafka。其中Kafka是构建微服务系统首选的产品。

消息形式:
1、点对点(queue)
2、一对多(topic)

二、ActiveMQ安装、服务端开启

2.1 安装

安装非常简单,官网下载,选择适合的版本,解压,安装启动即可。
需开启8161和61616端口,8161是用于后台管理的端口,61616是Java连接使用。

后台登录地址http://ip:8161/admin,用户名密码都是admin。

2.2 服务端启动

在本地下载好ActiveMQ,进入bin目录,执行./activemq start即可。这一步需要本机上有可用的JRE环境。
可以通过ActiveMQ可视化管理界面进行队列创建和消息管理,同时也可以验证我们的ActiveMQ是否正常工作。(访问地址:localhost:8161,初始用户名和密码都为admin)。
ActiveMQ管理端界面如图所示。

在这里插入图片描述
上面的启动是一次性的,如果想一直开启服务,可以将Activemq安装为系统服务:
win10下将Activemq安装为系统服务

三、SpringBoot 整合实现ActiveMQ

3.1添加依赖

<!--activemq依赖-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--消息队列连接池:提升效率的连接池 queue方式可以不添加-->
<!--<dependency>-->
<!--   <groupId>org.apache.activemq</groupId>-->
<!--    <artifactId>activemq-pool</artifactId>-->
<!--</dependency>-->
<!-- 如果配置线程池则加入 -->
<dependency>
    <groupId>org.messaginghub</groupId>
    <artifactId>pooled-jms</artifactId>
</dependency>

3.2添加配置文件(yml文件)

server:
  #Spring boot项目访问端口
  port: 8080

spring:
  activemq:
    broker-url: tcp://127.0.0.1:61616
    user: admin
    password: admin
    # 如果是true,则是Topic;如果是false或者默认,则是queue。
    jms:
      pub-sub-domain: false
    pool:
      enabled: false #连接池启动  默认false
#      max-connections: 10 # 最大连接数 默认1
# 使用queue(点对点)方式是,pool.enable要设置为false,默认使用的是queue方式,使用topic(订阅)方式是设置为true,同时要添加spring.jms.pub-sub-domain=true

ActiveMQ,有两种形式,分别为Queue(生产消费),Topic(发布订阅)。

Queue为点对点模式,即有一个消息,才能有一个消费,多个消费者不会重复对应一个消息。
 
Topic为一对多形式,当订阅者订阅后,发布者发布消息所有订阅者都会接受到消息。

3.3 Queue

3.3.1 Queue配置

package com.example.activemqjava.common.activemq.config;

import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.jms.Queue;

/**
 * Queue配置
 * Queue为点对点模式,即有一个消息,才能有一个消费,多个消费者不会重复对应一个消息
 * @author qzz
 */
@Configuration
public class QueueConfig {

    /**
     * 定义存放消息的队列
     * @return
     */
    @Bean
    public Queue queue() {
        return new ActiveMQQueue("my-test");
    }

}

3.3.2 创建生产者

package com.example.activemqjava.common.activemq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import javax.jms.Queue;

/**
 * 生产者
 * @author qzz
 */
@Component
public class Producer {

    @Autowired
    private Queue queue;
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    /**
     * 发送消息
     * @param msg
     */
    public void sendMessage(String msg){
        //方法一:添加消息对消息队列
        jmsMessagingTemplate.convertAndSend(queue,msg);
        //方法二:这种方式不需要手动创建queue,系统会自动创建名为test的队列
//        jmsMessagingTemplate.convertAndSend("test",msg);
    }

    /**
     * 发送消息
     * @param msg
     */
    public void sendMessage(String destination,String msg){
        //方法二:这种方式不需要手动创建queue,系统会自动创建名为test的队列
        if(destination!=null){
            jmsMessagingTemplate.convertAndSend(destination,msg);
        }
    }
}

3.3.3 创建消费者

package com.example.activemqjava.common.activemq;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

/**
 * 消费者
 * @author qzz
 */

@Component
public class Consumer {

    /**
     * 消费消息
     * 使用JmsListener配置消费者监听的队列,其中message是接收到的消息
     *
     * @SendTo("Squeue"):SendTo会将此方法返回的数据,写入到OutQueue中去
     * @param message
     */
    @JmsListener(destination = "my-test")
    public void receiveQueue(String message){
        System.out.println("Consumer接收的消息是:"+message);
    }
}

3.3.4 启动类添加@EnableJms 注解,启动消息队列


/**
 * @author qzz
 * @EnableJms 会启动 jms 的注解扫描即发现 @JmsListener 注释的方法创建消息监听容器,相当于 <jms:annotation-d riven/>
 */
@SpringBootApplication
@EnableJms //启动消息队列
public class ActivemqJavaApplication {

    public static void main(String[] args) {
        SpringApplication.run(ActivemqJavaApplication.class, args);
    }

}

3.3.5 测试

package com.example.activemqjava.controller;

import com.example.activemqjava.common.activemq.Producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 测试  activemq
 * @author qzz
 */
@RestController
@RequestMapping("/test")
public class JmsTestController {

    @Autowired
    private Producer producer;

    /**
     * 发送消息
     */
    @RequestMapping("/sendMessages")
    public void sendMessages(){
        for (int i=0;i<5;i++){
            producer.sendMessage("this is a queue test"+i);
        }
    }
}

运行之后登陆ActiveMQ后台管理界面如下:
在这里插入图片描述
调用 http://localhost:8080/test/sendMessages接口,查看效果:
在这里插入图片描述
刷新ActiveMQ后台管理界面:
在这里插入图片描述

Number Of Pending Messages:消息队列中待处理的消息
Number Of Consumers:消费者的数量
Messages Enqueued:累计进入过消息队列的总量
Messages Dequeued:累计消费过的消息总量

3.4 Topic


server:
  #Spring boot项目访问端口
  port: 8083

spring:
  activemq:
    broker-url: tcp://127.0.0.1:61616
    user: admin
    password: admin
    # 如果是true,则是Topic;如果是false或者默认,则是queue。
    jms:
      pub-sub-domain: true
    pool:
      enabled: true #连接池启动  默认false
      max-connections: 10 # 最大连接数 默认1
# 使用queue(点对点)方式是,pool.enable要设置为false,默认使用的是queue方式,使用topic(订阅)方式是设置为true,同时要添加spring.jms.pub-sub-domain=true

3.4.1 Topic配置

package com.example.activemqjava1.activemq.config;

import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.jms.Topic;

/**
 * Topic配置
 *  Topic:发布/订阅模式,生产者生产了一个消息,可以由多个消费者进行消费
 * @author qzz
 */
@Configuration
public class TopicConfig {

    /**
     * 定义存放消息的topic
     * @return
     */
    @Bean
    public Topic topic() {

        return new ActiveMQTopic("my-topic");
    }

}

3.4.2 发布者

package com.example.activemqjava1.activemq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

import javax.jms.Topic;

/**
 * 发布者
 * @author qzz
 */
@Service
public class TopicProducer {

    @Autowired
    private Topic topic;
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    /**
     * 发送消息
     * @param msg
     */
    public void sendTopic(String msg){
        this.jmsMessagingTemplate.convertAndSend(this.topic,msg);
    }
}

3.4.3 订阅者

package com.example.activemqjava1.activemq;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

/**
 * 订阅者
 * @author qzz
 */

@Component
public class TopicConsumer {

    /**
     * 消费消息
     * 使用JmsListener配置消费者监听的topic,其中message是接收到的消息
     * @param message
     */
    @JmsListener(destination = "my-topic")
    public void receiveTopic(String message){
        System.out.println("TopicConsumer接收的消息是:"+message);
    }
}

3.4.4 测试

package com.example.activemqjava1.controller;

import com.example.activemqjava1.activemq.TopicProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 测试  activemq
 * @author qzz
 */
@RestController
@RequestMapping("/test")
public class JmsTestController {

    @Autowired
    private TopicProducer topicProducer;

    /**
     * 发送消息
     */
    @RequestMapping("/sendMessages")
    public void sendMessages(){
        for (int i=0;i<5;i++){
            topicProducer.sendTopic("this is a topic test"+i);
        }
    }
}

运行之后登陆ActiveMQ后台管理界面如下:
在这里插入图片描述
调用 http://localhost:8083/test/sendMessages接口,查看效果:
在这里插入图片描述
在这里插入图片描述
发现,发送消息成功,但是消息并没有被 监听消费。

原因是:JmsListener注解默认只接收queue消息,如果要接收topic消息,需要设置containerFactory</font>

3.4.5 创建一个配置类,在配置类中提供监听工厂配置

package com.example.activemqjava1.activemq.config;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;

import javax.jms.ConnectionFactory;

/**
 * axtiveMQ配置类:创建监听工厂配置
 * @author qzz
 */
@Configuration
public class ActiveMQConfig {
    @Value("${spring.activemq.broker-url}")
    private String brokerUrl;

    @Value("${spring.activemq.user}")
    private String userName;

    @Value("${spring.activemq.password}")
    private String password;

    @Bean
    public ConnectionFactory connectionFactory(){
        return new ActiveMQConnectionFactory(userName,password,brokerUrl);
    }

    @Bean(name="topicListener")
    public JmsListenerContainerFactory<?> jmsListenerContainerFactory(ConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        //是否支持 发布订阅
        factory.setPubSubDomain(true);
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }
}

3.4.6 修改订阅者中的@JmsListener注解的属性

package com.example.activemqjava1.activemq;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

/**
 * 订阅者
 * @author qzz
 */

@Component
public class TopicConsumer {

    /**
     * 消费消息
     * 使用JmsListener配置消费者监听的topic,其中message是接收到的消息
     * @param message
     */
    @JmsListener(destination = "my-topic",containerFactory = "topicListener")
    public void receiveTopic(String message){
        System.out.println("TopicConsumer接收的消息是:"+message);
    }
}

3.4.7 重启项目,看效果

登录ActiveMQ后台管理界面,删除对应topic,
在这里插入图片描述
然后重启项目,进行测试:

调用 http://localhost:8083/test/sendMessages接口,控制台结果如下:
在这里插入图片描述
在这里插入图片描述

说明 检查消费成功!

四、代码源码

可以点击此处进行下载!

参考资料:https://blog.csdn.net/mycsdn6/article/details/106322223/
https://blog.csdn.net/qq_38403590/article/details/119773671
https://www.cnblogs.com/wuyoucao/p/10947940.html