zl程序教程

您现在的位置是:首页 >  Java

当前栏目

RabbitMQ的基本使用

2023-02-18 16:39:32 时间

RabbitMQ的基本使用

添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

定义消息接收一

/*
    接收MQ消息
*/
@Slf4j
@Component
public class MqReceiver {

    @RabbitListener(queues = "spoon-queues")
    public void process(String message) {
        log.info("MqReceiver: {}", message);
    }

}

定义消息接收二(自动创建队列)

/*
    接收MQ消息
*/
@Slf4j
@Component
public class MqReceiver {

    @RabbitListener(queuesToDeclare = @Queue("spoon-queues"))
    public void process(String message) {
        log.info("MqReceiver: {}", message);
    }

}

定义消息接收三(自动创建队列+Exchange和Queues绑定)

/*
    接收MQ消息
*/
@Slf4j
@Component
public class MqReceiver {

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue("spoon-queues"),
            exchange = @Exchange("spoon-exchange")
    ))
    public void process(String message) {
        log.info("MqReceiver: {}", message);
    }

}

定义消息发送

/*
    发送MQ消息
 */
@SpringBootTest
class RabbitMqTestApplicationTests {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Test
    void contextLoads() {
        amqpTemplate.convertAndSend("spoon-queues", "now : " + new Date());
    }

}

定义消息接收三演示

消息接收方

/*
    数码供应商 接收消息
 */
@RabbitListener(bindings = @QueueBinding(
        exchange = @Exchange("spoon-order"),
        key = "computer",
        value = @Queue("queues-computer-order")
))
public void processComputer(String message) {
    log.info("Computer MqReceiver: {}", message);
}

/*
    水果供应商 接收消息
 */
@RabbitListener(bindings = @QueueBinding(
        exchange = @Exchange("spoon-order"),
        key = "fruit",
        value = @Queue("queues-fruit-order")
))
public void processFruit(String message) {
    log.info("Fruit MqReceiver: {}", message);
}

消息发送方

amqpTemplate.convertAndSend("spoon-order", "computer", "now : " + new Date());

错误提示

  1. Failed to declare queue(s):[spoon-queues]: 消息队列未创建
  2. SimpleMessageConverter only supports String, byte[] and Serializable payloads, received: 传输对象需序列化