rabbitMQ学习-rabbitMQ消息持久化
Rabbit消息持久化
消息是可以持久化保存的,持久的目的是为了处理任务丢失情况的,采用持久化可以保证消息存储,且消息不被丢失。
队列如何持久化
两个持久化操作都是在生产者中进行的。
我们需要将durable参数设置为持久化
//让队列持久化
boolean durable = true;
channel.queueDeclare(队列名,durable,false,false,null)
但是需要注意的是,就是如果之前声明的队列不是持久化的,需要把原先队列先删除,然后重新创建一个持久化队列,不然就会出现错误。
如下:
com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'ack_queue' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)
注意:
持久化后的rabbitMQ重启之后队列消息还是会存在的,未持久化的,那么对不起,他没了。
消息持久化
将消息标记为持久化并不能保证不会丢失消息,尽管他会告诉rabbitMQ将消息保存到磁盘中,但是这里仍然存在当消息刚刚存储池在磁盘的时候,但是还没有存储完,消息还在缓存的一个间隔点,此时并没有真正写入磁盘,持久性保证并不强,但是对于我们简单任务队列而言,这已经就绰绰有余了。
MessageProperties.PERSISTENT_TEXT_PLAIN
public class Task {
//老样子,队列名称
public static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = GetConnection.getChannel();
//声名队列
//让队列持久化
boolean durable = true;
channel.queueDeclare(QUEUE_NAME,durable,false,false,null);
//从控制台中获取
Scanner sc = new Scanner(System.in);
while (sc.hasNext()){
String message = sc.next();
//添加持久化
//设置生成者发送消息为持久化信息(要求保存到硬盘上)保存在内存中
//MessageProperties.PERSISTENT_TEXT_PLAIN,指令完成持久化
channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
//未持久化时候的是
// channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
System.out.println("生产者发送消息: "+ message);
}
}
}
不公平分发:
在某些情况下轮训分发并不好用,具体例子:有两个消费者在处理任务,其中有个消费者1处理任务的速度非常快,而另一个消费者2处理的速度却很慢,这个时候我们还采用轮训分发就会导致这个处理速度快的很大一部分时间处于空闲状态,而处理慢的那个消费者就一直在干活,这种分配方式就不是很好。
能者多劳,多劳多得。
为了避免这种情况,我们有采用不公平分发操作
设置参数channel.basicQos(1);
能者多劳是在消费者中设置的
//能者多劳
int prefetchCount = 1;
channel.basicQos(prefetchCount);
一般我们生活中采用的就是不公平分发。
预取值:
预先分配任务,比如生产者生成7条数据,通过队列分发,通过预取值,预取值是多少,就给属于那条信道的消费者分配多少消息。和消费者处理消息的快慢无关。
注意:
预取值消耗完毕后,之后的值就按照那个消费者快给他分配的信息就多,谁慢,谁分配的信息就少。
预取值是开发人员限制缓冲区大小,避免缓冲区里边无限制的未确定消息问题。
需要注意的是,预取值并不是你直接输入多少条数据他就可以直接堆满的我们设置的预取值的,他可能由于消费费者处理速度影响,比如,你输入10条数据,但是设置预取值为5的最后值只会产生4条数据也说不准,因为另一个太快了,其他的数据都被他消耗完毕了。
//设置非公平分发
//int i = 1;
//预取值
int i= 5; //设置分发5个消息,也就是预取值为5
channel.basicQos(i);
//设置
//int i = 1; //不公平分发
int i = 2;//此时设置就是预取值了
channel.basicQos(i);
相关文章
- EasyCVR对接华为iVS订阅摄像机和用户变更请求接口介绍
- 精选 | 腾讯云CDN内容加速场景有哪些?
- 模块化网络防止基于模型的多任务强化学习中的灾难性干扰
- 用搜索和注意力学习稳健的调度方法
- 用于多变量时间序列异常检测的学习图神经网络
- 助力政企自动化自然生长,华为WeAutomate RPA是怎么做到的?
- 使用腾讯轻量云搭建Fiora聊天室
- TSRC安全测试规范
- 云计算“功守道”
- 助力成本优化,腾讯全场景在离线混部系统Caelus正式开源
- Flink 利器:开源平台 StreamX 简介
- 腾讯云实践 | 一图揭秘腾讯碳中和?解决方案
- 深度学习中的轻量级网络架构总结与代码实现
- 信息系统项目管理师(高项复习笔记三)
- Adobe国际认证让科技赋能时尚
- c++该怎么学习(面试吃土记)
- 面试官问发布订阅模式是在问什么?
- 面试官:请实现一个通用函数把 callback 转成 promise
- 空中悬停、翻滚转身、成功着陆,我用强化学习「回收」了SpaceX的火箭
- 中山大学林倞解读视觉语义理解新趋势:从表达学习到知识及因果融合