zl程序教程

您现在的位置是:首页 >  后端

当前栏目

02_RabbitMQ【work模式】

模式RabbitMQ 02 work
2023-09-14 09:14:08 时间

RabbitMQ Tutorials — RabbitMQicon-default.png?t=M666https://rabbitmq.com/getstarted.html

1.​​​​​​​ work模式

 同一个消息只能被一个客户端所获取

2. 生产者

发送100条消息到队列,并且每次发送消息后sleep i*10 毫秒;

public class Send {
    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道s
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        for (int i = 0; i < 50; i++) {
            // 消息内容
            String message = "" + i;
            channel.basicPublish("", QUEUE_NAME, null, new Product(i, "哈哈").toString().getBytes());
            System.out.println(" [x] Sent '" + message + "'");

            Thread.sleep(i * 3);
        }
        channel.close();
        connection.close();
    }
}
public class Product implements Serializable {

    private Integer id;
    private String productName;
    private Integer status;
    private Double price;
    private String productDesc;
    private String caption;

    public Product() {
    }

    public Product(Integer id, String productName) {
        this.id = id;
        this.productName = productName;
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getProductName() {
        return productName;
    }

    public void setProductName(String productName) {
        this.productName = productName;
    }

    public Integer getStatus() {
        return status;
    }

    public void setStatus(Integer status) {
        this.status = status;
    }

    public Double getPrice() {
        return price;
    }

    public void setPrice(Double price) {
        this.price = price;
    }

    public String getProductDesc() {
        return productDesc;
    }

    public void setProductDesc(String productDesc) {
        this.productDesc = productDesc;
    }

    public String getCaption() {
        return caption;
    }

    public void setCaption(String caption) {
        this.caption = caption;
    }

    @Override
    public String toString() {
        return "Product{" +
                "id=" + id +
                "---productName=" + productName +
                '}';
    }
}

3. ​​​​​​​消费者1

从队列中获取消息,每次获取到消息后,休眠10毫秒,这个消费者消费消息比较快。 

public class Recv1 {
    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);
        //channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        //   channel.basicConsume(QUEUE_NAME, false, consumer);
        channel.basicConsume(QUEUE_NAME, true, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            //休眠
            Thread.sleep(10);
            // 返回确认状态
            //    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

4. ​​​​​​​消费者2

从队列中获取消息,每次获取到消息后,休眠1000毫秒,这个消费者消费消息比较慢。

public class Recv2 {
    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);
        //channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        //  channel.basicConsume(QUEUE_NAME, false, consumer);
        channel.basicConsume(QUEUE_NAME, true, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            //休眠
            Thread.sleep(50);
            // 返回确认状态
            //     channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

        }
    }
}

5. 测试结果

结果:消费者1和消费者2获取到的消息是一样多,各50个。  这样是否合理?  --  不合理的。

6​​​​​​​. work模式中的能者多劳

实际这种情况更加的合理。

7.​​​​​​​​​​​​​​ 自动和手动反馈消息消费状态

自动:

从服务端获取到消息后,就认为该消息已经成功消息,无论客户端是否出异常。

手动:

从服务端获取消息后,服务端要标记为该不可用状态,等待客户端的反馈,如果客户端一直没有反馈,该消息一直被标记为不可用,如果接收到客户端的反馈,服务端就将该消息删除。 

如何选择手动和自动?  --  根据不同的需求做出不同选择。