zl程序教程

您现在的位置是:首页 >  工具

当前栏目

基于Docker安装RabbitMQ及基本使用

Docker安装RabbitMQ 基于 基本 使用
2023-09-11 14:21:10 时间

Rabbit 默认的端口5672

默认启动方式如下(可登录容器后使用简单的命令进行管理)

docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3

rabbit cli 工具

使用带管理面板的方式启动(推荐),默认账号密码guest / guest,访问地址http://ip:port

docker run -d --hostname my-rabbit --name some-rabbit -p 8080:15672 -p 5672:5672 rabbitmq:3-management

常用命令

# 创建一个消息队列,durable=true 
rabbitmqadmin declare queue name=队列2 durable=true

# 查看消息队列列表
rabbitmqadmin list queues

# 向“队列2”中发布一条消息
rabbitmqadmin publish routing_key=队列2 payload="{name:mmc}"

# 获取指定消息队列的队头数据,默认不消费
rabbitmqadmin get queue=队列2
# 获取指定消息队列的队头数据,ackmode=ack_requeue_false 消费数据
rabbitmqadmin get queue=队列2 ackmode=ack_requeue_false

Java Client 测试

参考文档 https://www.rabbitmq.com/api-guide.html#license

基础的功能为新增队列、删除队列、发布 Exchange、发布消息、获取指定队列的队头数据等,具体的操作代码如下所示:

import com.rabbitmq.client.*;
import org.junit.Test;

import java.io.IOException;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;

public class RabbitMQ {
    /**
     * 新增队列
     */
    @Test
    public void addQueue() throws IOException, TimeoutException, URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
        // 连接消息队列
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri("amqp://guest:guest@localhost:5672");
        Connection connection = connectionFactory.newConnection();
        // 使用channel操作队列
        Channel channel = connection.createChannel();
        // 使用queueDeclare发布新的队列
        System.out.println(channel.queueDeclare("测试队列新增2", true, false, false, null));
        // 关闭资源
        channel.close();
        connection.close();
    }

    /**
     * 删除队列
     */
    @Test
    public void deleteQueue() throws IOException, TimeoutException, URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
        // 连接消息队列
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri("amqp://guest:guest@localhost:5672");
        Connection connection = connectionFactory.newConnection();
        // 使用channel操作队列
        Channel channel = connection.createChannel();
        // 使用queueDelete删除队列
        System.out.println(channel.queueDelete("测试队列新增2"));
        // 关闭资源
        channel.close();
        connection.close();
    }

    /**
     * 发布 Exchange
     */
    @Test
    public void declareExchange() throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException, IOException, TimeoutException {
        // 连接消息队列
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri("amqp://guest:guest@localhost:5672");
        Connection connection = connectionFactory.newConnection();
        // 使用channel操作队列
        Channel channel = connection.createChannel();
        System.out.println(channel.exchangeDeclare("新增Exchange2", "direct"));
        // 关闭资源
        channel.close();
        connection.close();
    }

    /**
     * 发布消息
     */
    @Test
    public void publishMessage() throws IOException, TimeoutException, URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
        // 连接消息队列
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri("amqp://guest:guest@localhost:5672");
        Connection connection = connectionFactory.newConnection();
        // 使用channel操作队列
        Channel channel = connection.createChannel();
        byte[] messageBodyBytes = "Hello Word".getBytes();
        // MessageProperties.PERSISTENT_TEXT_PLAIN 指定消息持久化
        // channel.basicPublish("", "q1", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
        // basicPublish("ExchangeName", "RoutingKey", 消息参数配置, 消息内容 byte[])
        channel.basicPublish("", "测试队列新增2", null, messageBodyBytes);
        // 关闭资源
        channel.close();
        connection.close();
    }

    /**
     * 获取指定队列的队头数据
     */
    @Test
    public void receiveMessage() throws IOException, TimeoutException, URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
        // 连接消息队列
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUri("amqp://guest:guest@localhost:5672");
        Connection connection = connectionFactory.newConnection();
        // 使用channel操作队列
        Channel channel = connection.createChannel();
        boolean autoAck = false; // false 只获取队头元素,不删除;true 获取队头元素并删除
        GetResponse response = channel.basicGet("测试队列新增2", autoAck); // 第一个参数为队列名称
        if (response != null) {
            byte[] body = response.getBody();
            System.out.println(new String(body));
        }
        // 关闭资源
        channel.close();
        connection.close();
    }
}

SpringBoot 整合 rabbitMQ

Demo1 :使用默认交换机

pom.xml 中导入spring-rabbit依赖

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

文件application.properties 配置,rabbitMQ默认配置如下,若未修改可无需添加如下配置

spring.rabbitmq.host=localhost
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.port=5672

发送消息,使用下面的单元测试来发送单条数据到消息队列中

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest
@RunWith(SpringRunner.class)
public class SpringbootRabbitMQ {
    @Autowired
    private AmqpTemplate amqpTemplate;

    @Test
    public void sendMessage() {
        amqpTemplate.convertAndSend("", "q1", "Hello World");
    }
}

接受消息,监听队列中的数据,当队列不为空的时候自动获取队列中的数据

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class RabbitMQListener {
    @RabbitListener(queues = "q1")
    void listen(String in) {
        System.out.println(in);
    }
}

Demo2:使用交换机并通过对象来存储数据

自定义交换机、队列的名字 Constant.java

public class Constant {
    public static final String TOPIC_EXCHANGE = "topic.exchange";
    public static final String TOPIC_QUEUE = "topic.queue";
}

编写RabbitConfig.java 配置文件,配置用到的交换机、队列,并将交换机与队列进行绑定

import cn.getcharzp.common.Constant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {
    @Bean()
    public TopicExchange topicExchange() {
        return new TopicExchange(Constant.TOPIC_EXCHANGE);
    }

    @Bean
    public Queue topicQueue() {
        return new Queue(Constant.TOPIC_QUEUE);
    }

    @Bean
    public Binding exchangeBindTopic(Queue topicQueue, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueue).to(topicExchange).with(Constant.TOPIC_QUEUE);
    }
}

编辑发送、接受消息的实体类,我这里用的是User.java

import java.io.Serializable;

public class User implements Serializable {
    private String name;
    private Integer age;
    private String password;

    public User(String name, Integer age, String password) {
        this.name = name;
        this.age = age;
        this.password = password;
    }

    @Override
    public String toString() {
        return "User{" +
                "name='" + name + '\'' +
                ", age=" + age +
                ", password='" + password + '\'' +
                '}';
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }
}

编辑发送消息的单元测试,实际场景根据实际业务来书写,主要使用AmqpTemplate来进行消息的发送

import cn.getcharzp.common.Constant;
import cn.getcharzp.pojo.User;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest
@RunWith(SpringRunner.class)
public class SpringbootRabbitMQ {
    @Autowired
    private AmqpTemplate amqpTemplate;

    @Test
    public void sendMessage() {
        try {
            amqpTemplate.convertAndSend(Constant.TOPIC_EXCHANGE, Constant.TOPIC_QUEUE, new User("GetcharZp", 20, "abcdef"));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

通过@RabbitListener实现消息的接受

import cn.getcharzp.common.Constant;
import cn.getcharzp.pojo.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class RabbitMQListener {
    @RabbitListener(queues = Constant.TOPIC_QUEUE)
    void listen(User user) {
        try {
            System.out.println(user.toString());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

GoClient 测试

参考文档 : https://www.rabbitmq.com/tutorials/tutorial-one-go.html

基础功能为声明队列、向队列中发布数据、从队列中取数据

package test

import (
   "github.com/streadway/amqp"
   "log"
   "testing"
)

var conn, err = amqp.Dial("amqp://guest:guest@localhost:5672/")
var ch, _ = conn.Channel()

func failOnError(err error, msg string) {
   if err != nil {
      log.Fatalf("%s: %s", msg, err)
   }
}

// 测试连接 RabbitMQ
func TestConnect(t *testing.T) {
   failOnError(err, "Failed to connect to RabbitMQ")
   defer conn.Close()
}

// 向 Hello 队列中发送 Hello World ! 数据
func TestSend(t *testing.T) {
   q, err := ch.QueueDeclare("Hello", false, false, false, false, nil)
   failOnError(err, "Failed to declare a queue")
   body := "Hello World !"
   err = ch.Publish(
      "",     // exchange
      q.Name, // routing key
      false,  // mandatory
      false,  // immediate
      amqp.Publishing{
         ContentType: "text/plain",
         Body:        []byte(body),
      })
   failOnError(err, "Failed to publish a message")
}

// 从 Hello 队列中取数据
func TestReceive(t *testing.T)  {
   q, err := ch.QueueDeclare(
      "Hello", // name
      false,   // durable
      false,   // delete when unused
      false,   // exclusive
      false,   // no-wait
      nil,     // arguments
   )
   failOnError(err, "Failed to declare a queue")
   msgs, err := ch.Consume(
      q.Name, // queue
      "",     // consumer
      true,   // auto-ack
      false,  // exclusive
      false,  // no-local
      false,  // no-wait
      nil,    // args
   )
   failOnError(err, "Failed to register a consumer")

   forever := make(chan bool)

   go func() {
      for d := range msgs {
         log.Printf("Received a message: %s", d.Body)
      }
   }()

   log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
   <-forever
}