zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

Redis学习笔记之延时队列详解数据库

2023-06-13 09:20:12 时间
1.2 实现方式

这些情况都可以使用延时队列来做,实现延时队列比较场景的有使用消息队列MQ来实现,比如RocketMQ等等,也可以使用Redis来实现,本博客主要介绍一下Redis实现延时队列

二、Redis延时队列 2.1 Redis列表实现

Redis实现延时队列可以通过其数据结构列表(list)来实现,顺便复习一下Redis的列表,实现列表,Redis可以通过队列和栈来实现:

/* 队列:First in first out */ 

//加两个value 

 rpush keynames key1 key2 

//计算 

 llen keynames 

 lpop keynames 

key1 

 lpop keynames 

key2 

//rpush会自动过期的 

 rpop keynames 

NULL 

/* 栈:First in last out */ 

//同样,加两个元素 

 rpush keynames key1 key2 

 rpop keynames 

key2 

 rpop keynames 

key1 

对于Redis的基本数据结构,可以参考我之前的博客:https://blog.csdn.net/u014427391/article/details/82860694

然后怎么实现延时?Thread睡眠或者线程join?这种方法是可以实现,不过假如用户一多?10个请求就要延时10N了,这种情况系统性能不好的话就会出现线程阻塞了的情况。

队列空了的情况?就会出现pop 的死循环,这种情况很可怕,很吃系统CPU,虽然可以通过线程睡眠方法来缓解,但不是最好的方法

这时候就要介绍一下Redis的blpop/brpop来替换lpop/rpop,blpop/brpop阻塞读在队列没有数据的时候,会立即进入休眠状态,一旦数据到来,则立刻醒过来。消息的延迟几乎为零

2.2 Redis集合实现

Redis的有序集合(zset)也可以用于实现延时队列,消息作为value,时间作为score,这里顺便复习一下Redis的有序集合

//9.0是score也就是权重 

 zadd keyname 9.0 math 

 zadd keyname 9.2 history 

//顺序 

 zrange keyname 0 -1 

1) history 

2) math 

//逆序 

 zrevrange keyname 0 -1 

1) math 

2) history 

//相当于count() 

 zcard keyname 

获取指定key的score 

 zscore keyname math 

然后多个线程的环境怎么保证任务不被多个线程抢了?这里可以使用Redis的zrem命令来实现

Redis Zrem 命令用于移除有序集中的一个或多个成员,不存在的成员将被忽略。

当 key 存在但不是有序集类型时,返回一个错误。

注意: 在 Redis 2.4 版本以前, ZREM 每次只能删除一个元素。

下面给出来自《Redis 深度历险:核心原理与应用实践》小册的例子:例子就是用有序集合和zrem来实现的

import java.lang.reflect.Type; 

import java.util.Set; 

import java.util.UUID; 

import com.alibaba.fastjson.JSON; 

import com.alibaba.fastjson.TypeReference; 

import redis.clients.jedis.Jedis; 

public class RedisDelayingQueue T { 

 static class TaskItem T { 

 public String id; 

 public T msg; 

 // fastjson 序列化对象中存在 generic 类型时,需要使用 TypeReference 

 private Type TaskType = new TypeReference TaskItem T () { 

 }.getType(); 

 private Jedis jedis; 

 private String queueKey; 

 public RedisDelayingQueue(Jedis jedis, String queueKey) { 

 this.jedis = jedis; 

 this.queueKey = queueKey; 

 public void delay(T msg) { 

 TaskItem T task = new TaskItem T 

 task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid 

 task.msg = msg; 

 String s = JSON.toJSONString(task); // fastjson 序列化 

 jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延时队列 ,5s 后再试 

 public void loop() { 

 while (!Thread.interrupted()) { 

 // 只取一条 

 Set String values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1); 

 if (values.isEmpty()) { 

 try { 

 Thread.sleep(500); // 歇会继续 

 } catch (InterruptedException e) { 

 break; 

 continue; 

 String s = values.iterator().next(); 

 if (jedis.zrem(queueKey, s) 0) { // 抢到了 

 TaskItem T task = JSON.parseObject(s, TaskType); // fastjson 反序列化 

 this.handleMsg(task.msg); 

 public void handleMsg(T msg) { 

 System.out.println(msg); 

 public static void main(String[] args) { 

 Jedis jedis = new Jedis(); 

 RedisDelayingQueue String queue = new RedisDelayingQueue (jedis, "q-demo"); 

 Thread producer = new Thread() { 

 public void run() { 

 for (int i = 0; i i++) { 

 queue.delay("codehole" + i); 

 Thread consumer = new Thread() { 

 public void run() { 

 queue.loop(); 

 producer.start(); 

 consumer.start(); 

 try { 

 producer.join(); 

 Thread.sleep(6000); 

 consumer.interrupt(); 

 consumer.join(); 

 } catch (InterruptedException e) { 

}

不过在多线程环境,是很难做控制的,上面例子也有缺陷,下面引用小册的说法:

上面的算法中同一个任务可能会被多个进程取到之后再使用 zrem 进行争抢,那些没抢到的进程都是白取了一次任务,这是浪费。可以考虑使用 lua scripting 来优化一下这个逻辑,将 zrangebyscore 和 zrem 一同挪到服务器端进行原子化操作,这样多个进程之间争抢任务时就不会出现这种浪费了。

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/5079.html