zl程序教程

您现在的位置是:首页 >  IT要闻

当前栏目

rabbitMQ学习-rabbitMQ消息持久化

2023-04-18 16:17:01 时间

Rabbit消息持久化

消息是可以持久化保存的,持久的目的是为了处理任务丢失情况的,采用持久化可以保证消息存储,且消息不被丢失。

队列如何持久化

两个持久化操作都是在生产者中进行的。
我们需要将durable参数设置为持久化

//让队列持久化
boolean durable = true;
channel.queueDeclare(队列名,durable,false,false,null)

但是需要注意的是,就是如果之前声明的队列不是持久化的,需要把原先队列先删除,然后重新创建一个持久化队列,不然就会出现错误。
如下:
com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'ack_queue' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)
在这里插入图片描述
在这里插入图片描述
注意:

持久化后的rabbitMQ重启之后队列消息还是会存在的,未持久化的,那么对不起,他没了。

消息持久化

将消息标记为持久化并不能保证不会丢失消息,尽管他会告诉rabbitMQ将消息保存到磁盘中,但是这里仍然存在当消息刚刚存储池在磁盘的时候,但是还没有存储完,消息还在缓存的一个间隔点,此时并没有真正写入磁盘,持久性保证并不强,但是对于我们简单任务队列而言,这已经就绰绰有余了。

MessageProperties.PERSISTENT_TEXT_PLAIN

public class Task {
    //老样子,队列名称
    public static  final  String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = GetConnection.getChannel();

        //声名队列
        //让队列持久化
        boolean durable = true;
        channel.queueDeclare(QUEUE_NAME,durable,false,false,null);


        //从控制台中获取
        Scanner sc = new Scanner(System.in);
        while (sc.hasNext()){
            String message = sc.next();

            //添加持久化
            //设置生成者发送消息为持久化信息(要求保存到硬盘上)保存在内存中
            //MessageProperties.PERSISTENT_TEXT_PLAIN,指令完成持久化
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
           //未持久化时候的是
            // channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
            System.out.println("生产者发送消息: "+ message);
        }
    }

}

不公平分发:

在某些情况下轮训分发并不好用,具体例子:有两个消费者在处理任务,其中有个消费者1处理任务的速度非常快,而另一个消费者2处理的速度却很慢,这个时候我们还采用轮训分发就会导致这个处理速度快的很大一部分时间处于空闲状态,而处理慢的那个消费者就一直在干活,这种分配方式就不是很好。

能者多劳,多劳多得。

为了避免这种情况,我们有采用不公平分发操作

设置参数channel.basicQos(1);

能者多劳是在消费者中设置的

//能者多劳
int prefetchCount = 1;
channel.basicQos(prefetchCount);

一般我们生活中采用的就是不公平分发。

预取值:

预先分配任务,比如生产者生成7条数据,通过队列分发,通过预取值,预取值是多少,就给属于那条信道的消费者分配多少消息。和消费者处理消息的快慢无关。

注意:

预取值消耗完毕后,之后的值就按照那个消费者快给他分配的信息就多,谁慢,谁分配的信息就少。

预取值是开发人员限制缓冲区大小,避免缓冲区里边无限制的未确定消息问题。

需要注意的是,预取值并不是你直接输入多少条数据他就可以直接堆满的我们设置的预取值的,他可能由于消费费者处理速度影响,比如,你输入10条数据,但是设置预取值为5的最后值只会产生4条数据也说不准,因为另一个太快了,其他的数据都被他消耗完毕了。

  //设置非公平分发
        //int i = 1;
        //预取值
        int i= 5; //设置分发5个消息,也就是预取值为5
        channel.basicQos(i);

//设置
        //int i = 1; //不公平分发
        int i = 2;//此时设置就是预取值了
        channel.basicQos(i);