zl程序教程

您现在的位置是:首页 >  其他

当前栏目

RabbitMQ笔记

2023-03-07 09:39:51 时间

RabbitMQ服务

RabbitMQ官网地址: https://www.rabbitmq.com/  下载页: https://www.rabbitmq.com/download.html

用Docker启动RabbitMQ服务

根据下载页上的说明

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

 这样会启动一个带命令行日志, 以及web管理界面的RabbitMQ服务.

访问 http://宿主IP:15672 就登录管理后台, 用户是 guest / guest

添加用户, VirtualHost和授权

点击页首的Admin标签, 点击Users可以在界面上添加用户, 其中各种Tags表示这些用户在后台的各种权限, 如果留空, 则用户仅能通过接口进行消息队列的操作

点击Virtual Hosts, 在界面上可以添加新VirtualHost

关于授权, 这个费了我一些时间, 界面上并没有给任何提示, 应该在Users界面, 点击用户表格里的用户名, 然后在用户的管理界面里添加.

Spring Boot 中使用RabbitMQ

依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

只需要添加这个就可以了

配置

spring:
  ...
  rabbitmq:
    addresses: 192.168.1.2:5672
    username: someone
    password: somepawd
    virtualHost: /somevirtual

 在spring下添加rabbitmq节点, 如果是rabbitmq是单例, 可以使用 host + port, 如果是集群, 可以用addresses, 把地址端口写一起, 然后用逗号分开

代码

在启动类中添加以下的Bean. 以下的代码, 会在/somevirtual下面, 创建 ex.someex这个exchange, 和 que.somequeue这个queue, 如果环境需要用现有的不允许新建, 可以将Queue, TopicExchange, Binding这三个Bean删掉.

public class ApplicationBoot {
    public static final String topicExchangeName = "ex.someex";
    public static final String queueName = "que.somequeue";

    public static void main(String[] args) {
        SpringApplication.run(ApplicationBoot.class, args);
    }

    @Bean
    Queue queue() {
        return new Queue(queueName, true, false, true);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(topicExchangeName);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("foo.bar.#");
    }

    @Bean
    SimpleMessageListenerContainer container(
            ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(queueName);
        container.setMessageListener(listenerAdapter);
        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }
}

对应的消息处理

@Component
public class Receiver {
    private static Logger logger = LoggerFactory.getLogger(Receiver.class);
    private CountDownLatch latch = new CountDownLatch(1);

    public void receiveMessage(String message) {
        logger.info("Received: {}", message);
        latch.countDown();
    }

    public CountDownLatch getLatch() {
        return latch;
    }

}

 对应的消息发送

@Component
public class Runner implements CommandLineRunner {
    private final RabbitTemplate rabbitTemplate;
    private final Receiver receiver;

    public Runner(Receiver receiver, RabbitTemplate rabbitTemplate) {
        this.receiver = receiver;
        this.rabbitTemplate = rabbitTemplate;
    }

    @Override
    public void run(String... args) throws Exception {
        System.out.println("Sending message...");
        rabbitTemplate.convertAndSend(ApplicationBoot.topicExchangeName, "foo.bar.baz", "Hello Message");
        receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
    }

}