Redis延迟队列和分布式延迟队列的简答实现
最近,又重新学习了下Redis,Redis不仅能快还能慢,简直利器,今天就为大家介绍一下Redis延迟队列和分布式延迟队列的简单实现。
在我们的工作中,很多地方使用延迟队列,比如订单到期没有付款取消订单,制订一个提醒的任务等都需要延迟队列,那么我们需要实现延迟队列。我们本文的梗概如下,同学们可以选择性阅读。
1. 实现一个简单的延迟队列。我们知道目前JAVA可以有DelayedQueue,我们首先开一个DelayQueue的结构类图。DelayQueue实现了Delay、BlockingQueue接口。也就是DelayQueue是一种阻塞队列。
我们在看一下Delay的类图。Delayed接口也实现了Comparable接口,也就是我们使用Delayed的时候需要实现CompareTo方法。因为队列中的数据需要排一下先后,根据我们自己的实现。Delayed接口里边有一个方法就是getDelay方法,用于获取延迟时间,判断是否时间已经到了延迟的时间,如果到了延迟的时间就可以从队列里边获取了。
我们创建一个Message类,实现了Delayed接口,我们主要把getDelay和compareTo进行实现。在Message的构造方法的地方传入延迟的时间,单位是毫秒,计算好触发时间fireTime。同时按照延迟时间的升序进行排序。我重写了里边的toString方法,用于将Message按照我写的方法进行输出。
package com.hqs.delayQueue.bean; import java.util.concurrent.BlockingQueue; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; * @author huangqingshi * @Date 2020-04-18 public class Message implements Delayed { private String body; private long fireTime; public String getBody() { return body; public long getFireTime() { return fireTime; public Message(String body, long delayTime) { this.body = body; this.fireTime = delayTime + System.currentTimeMillis(); public long getDelay(TimeUnit unit) { return unit.convert(this.fireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); public int compareTo(Delayed o) { return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS)); @Override public String toString() { return System.currentTimeMillis() + ":" + body; public static void main(String[] args) throws InterruptedException { System.out.println(System.currentTimeMillis() + ":start"); BlockingQueue Message queue = new DelayQueue (); Message message1 = new Message("hello", 1000 * 5L); Message message2 = new Message("world", 1000 * 7L); queue.put(message1); queue.put(message2); while (queue.size() 0) { System.out.println(queue.take()); }
里边的main方法里边声明了两个Message,一个延迟5秒,一个延迟7秒,时间到了之后会将接取出并且打印。输出的结果如下,正是我们所期望的。
1587218430786:start
1587218435789:hello
1587218437793:world
这个方法实现起来真的非常简单。但是缺点也是很明显的,就是数据在内存里边,数据比较容易丢失。那么我们需要采用Redis实现分布式的任务处理。
2. 使用Redis的list实现分布式延迟队列。本地需要安装一个Redis,我自己是使用Docker构建一个Redis,非常快速,命令也没多少。我们直接启动Redis并且暴露6379端口。进入之后直接使用客户端命令即可查看和调试数据。
docker pull redis docker run -itd --name redisLocal -p 6379:6379 redis docker exec -it redisLocal /bin/bash redis-cli
我本地采用spring-boot的方式连接redis,pom文件列一下,供大家参考。
xml version="1.0" encoding="UTF-8" project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" modelVersion 4.0.0 /modelVersion parent groupId org.springframework.boot /groupId artifactId spring-boot-starter-parent /artifactId version 2.2.6.RELEASE /version relativePath/ !-- lookup parent from repository -- /parent groupId com.hqs /groupId artifactId delayQueue /artifactId version 0.0.1-SNAPSHOT /version name delayQueue /name description Demo project for Spring Boot /description properties java.version 1.8 /java.version /properties dependencies dependency groupId org.springframework.boot /groupId artifactId spring-boot-starter /artifactId /dependency dependency groupId org.springframework.boot /groupId artifactId spring-boot-starter-test /artifactId scope test /scope exclusions exclusion groupId org.junit.vintage /groupId artifactId junit-vintage-engine /artifactId /exclusion /exclusions /dependency dependency groupId org.springframework.boot /groupId artifactId spring-boot-starter-data-redis /artifactId /dependency dependency groupId org.springframework.boot /groupId artifactId spring-boot-starter-web /artifactId /dependency dependency groupId redis.clients /groupId artifactId jedis /artifactId version 2.9.0 /version /dependency dependency groupId org.springframework.boot /groupId artifactId spring-boot-devtools /artifactId scope runtime /scope /dependency dependency groupId org.projectlombok /groupId artifactId lombok /artifactId optional true /optional /dependency /dependencies build plugins plugin groupId org.springframework.boot /groupId artifactId spring-boot-maven-plugin /artifactId /plugin /plugins /build /project
加上Redis的配置放到application.properties里边即可实现Redis连接,非常的方便。
# redis
redis.host=127.0.0.1
redis.port=6379
redis.password=
redis.maxIdle=100
redis.maxTotal=300
redis.maxWait=10000
redis.testOnBorrow=true
redis.timeout=100000
接下来实现一个基于Redis的list数据类型进行实现的一个类。我们使用RedisTemplate操作Redis,这个里边封装好我们所需要的Redis的一些方法,用起来非常方便。这个类允许延迟任务做多有10W个,也是避免数据量过大对Redis造成影响。如果在线上使用的时候也需要考虑延迟任务的多少。太多几百万几千万的时候可能数据量非常大,我们需要计算Redis的空间是否够。这个代码也是非常的简单,一个用于存放需要延迟的消息,采用offer的方法。另外一个是启动一个线程, 如果消息时间到了,那么就将数据lpush到Redis里边。
package com.hqs.delayQueue.cache; import com.hqs.delayQueue.bean.Message; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; import java.util.concurrent.BlockingQueue; * @author huangqingshi * @Date 2020-04-18 @Slf4j public class RedisListDelayedQueue{ private static final int MAX_SIZE_OF_QUEUE = 100000; private RedisTemplate String, String redisTemplate; private String queueName; private BlockingQueue Message delayedQueue; public RedisListDelayedQueue(RedisTemplate String, String redisTemplate, String queueName, BlockingQueue Message delayedQueue) { this.redisTemplate = redisTemplate; this.queueName = queueName; this.delayedQueue = delayedQueue; init(); public void offerMessage(Message message) { if(delayedQueue.size() MAX_SIZE_OF_QUEUE) { throw new IllegalStateException("超过队列要求最大值,请检查"); try { log.info("offerMessage:" + message); delayedQueue.offer(message); } catch (Exception e) { log.error("offMessage异常", e); public void init() { new Thread(() - { while(true) { try { Message message = delayedQueue.take(); redisTemplate.opsForList().leftPush(queueName, message.toString()); } catch (InterruptedException e) { log.error("取消息错误", e); }).start(); }
接下来我们看一下,我们写一个测试的controller。大家看一下这个请求/redis/listDelayedQueue的代码位置。我们也是生成了两个消息,然后把消息放到队列里边,另外我们在启动一个线程任务,用于将数据从Redis的list中获取。方法也非常简单。
package com.hqs.delayQueue.controller; import com.hqs.delayQueue.bean.Message; import com.hqs.delayQueue.cache.RedisListDelayedQueue; import com.hqs.delayQueue.cache.RedisZSetDelayedQueue; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.ResponseBody; import java.util.Set; import java.util.concurrent.*; * @author huangqingshi * @Date 2020-04-18 @Slf4j @Controller public class DelayQueueController { private static final int CORE_SIZE = Runtime.getRuntime().availableProcessors(); //注意RedisTemplate用的String,String,后续所有用到的key和value都是String的 @Autowired RedisTemplate String, String redisTemplate; private static ThreadPoolExecutor taskExecPool = new ThreadPoolExecutor(CORE_SIZE, CORE_SIZE, 0, TimeUnit.SECONDS, new LinkedBlockingDeque ()); @GetMapping("/redisTest") @ResponseBody public String redisTest() { redisTemplate.opsForValue().set("a","b",60L, TimeUnit.SECONDS); System.out.println(redisTemplate.opsForValue().get("a")); return "s"; @GetMapping("/redis/listDelayedQueue") @ResponseBody public String listDelayedQueue() { Message message1 = new Message("hello", 1000 * 5L); Message message2 = new Message("world", 1000 * 7L); String queueName = "list_queue"; BlockingQueue Message delayedQueue = new DelayQueue (); RedisListDelayedQueue redisListDelayedQueue = new RedisListDelayedQueue(redisTemplate, queueName, delayedQueue); redisListDelayedQueue.offerMessage(message1); redisListDelayedQueue.offerMessage(message2); asyncListTask(queueName); return "success"; @GetMapping("/redis/zSetDelayedQueue") @ResponseBody public String zSetDelayedQueue() { Message message1 = new Message("hello", 1000 * 5L); Message message2 = new Message("world", 1000 * 7L); String queueName = "zset_queue"; BlockingQueue Message delayedQueue = new DelayQueue (); RedisZSetDelayedQueue redisZSetDelayedQueue = new RedisZSetDelayedQueue(redisTemplate, queueName, delayedQueue); redisZSetDelayedQueue.offerMessage(message1); redisZSetDelayedQueue.offerMessage(message2); asyncZSetTask(queueName); return "success"; public void asyncListTask(String queueName) { taskExecPool.execute(() - { for(;;) { String message = redisTemplate.opsForList().rightPop(queueName); if(message != null) { log.info(message); public void asyncZSetTask(String queueName) { taskExecPool.execute(() - { for(;;) { Long nowTimeInMs = System.currentTimeMillis(); System.out.println("nowTimeInMs:" + nowTimeInMs); Set String messages = redisTemplate.opsForZSet().rangeByScore(queueName, 0, nowTimeInMs); if(messages != null messages.size() != 0) { redisTemplate.opsForZSet().removeRangeByScore(queueName, 0, nowTimeInMs); for (String message : messages) { log.info("asyncZSetTask:" + message + " " + nowTimeInMs); log.info(redisTemplate.opsForZSet().zCard(queueName).toString()); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
我就不把运行结果写出来了,感兴趣的同学自己自行试验。当然这个方法也是从内存中拿出数据,到时间之后放到Redis里边,还是会存在程序启动的时候,任务进行丢失。我们继续看另外一种方法更好的进行这个问题的处理。
3.使用Redis的zSet实现分布式延迟队列。我们需要再写一个ZSet的队列处理。下边的offerMessage主要是把消息直接放入缓存中。采用Redis的ZSET的zadd方法。zadd(key, value, score) 即将key=value的数据赋予一个score, 放入缓存中。score就是计算出来延迟的毫秒数。
package com.hqs.delayQueue.cache; import com.hqs.delayQueue.bean.Message; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; import java.util.concurrent.BlockingQueue; * @author huangqingshi * @Date 2020-04-18 @Slf4j public class RedisZSetDelayedQueue { private static final int MAX_SIZE_OF_QUEUE = 100000; private RedisTemplate String, String redisTemplate; private String queueName; private BlockingQueue Message delayedQueue; public RedisZSetDelayedQueue(RedisTemplate String, String redisTemplate, String queueName, BlockingQueue Message delayedQueue) { this.redisTemplate = redisTemplate; this.queueName = queueName; this.delayedQueue = delayedQueue; public void offerMessage(Message message) { if(delayedQueue.size() MAX_SIZE_OF_QUEUE) { throw new IllegalStateException("超过队列要求最大值,请检查"); long delayTime = message.getFireTime() - System.currentTimeMillis(); log.info("zset offerMessage" + message + delayTime); redisTemplate.opsForZSet().add(queueName, message.toString(), message.getFireTime()); }
上边的Controller方法已经写好了测试的方法。/redis/zSetDelayedQueue,里边主要使用ZSet的zRangeByScore(key, min, max)。主要是从score从0,当前时间的毫秒数获取。取出数据后再采用removeRangeByScore,将数据删除。这样数据可以直接写到Redis里边,然后取出数据后直接处理。这种方法比前边的方法稍微好一些,但是实际上还存在一些问题,因为依赖Redis,如果Redis内存不足或者连不上的时候,系统将变得不可用。
4. 总结一下,另外还有哪些可以延迟队列。上面的方法其实还是存在问题的,比如系统重启的时候还是会造成任务的丢失。所以我们在生产上使用的时候,我们还需要将任务保存起来,比如放到数据库和文件存储系统将数据存储起来,这样做到double-check,双重检查,最终达到任务的99.999%能够处理。
其实还有很多东西可以实现延迟队列。
1) RabbitMQ就可以实现此功能。这个消息队列可以把数据保存起来并且进行处理。
2)Kafka也可以实现这个功能。
3)Netty的HashedWheelTimer也可以实现这个功能。
最后放上我的代码: https://github.com/stonehqs/delayQueue
到此这篇关于Redis延迟队列和分布式延迟队列的简答实现的文章就介绍到这了,更多相关Redis延迟队列和分布式延迟队列内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!
我想要获取技术服务或软件
服务范围:MySQL、ORACLE、SQLSERVER、MongoDB、PostgreSQL 、程序问题
服务方式:远程服务、电话支持、现场服务,沟通指定方式服务
技术标签:数据恢复、安装配置、数据迁移、集群容灾、异常处理、其它问题
本站部分文章参考或来源于网络,如有侵权请联系站长。
数据库远程运维 Redis延迟队列和分布式延迟队列的简答实现
相关文章
- 锁使用Redis实现Java安全可靠的过期锁(redisjava过期)
- 监控Redis队列,更好地优化系统性能(redis队列监控)
- Redis队列:高效操作数据的必备技能(redis队列操作)
- Redis:高效实用的消息队列实现方法(redis如何做消息队列)
- 探究Redis缓存队列长度变化的研究(缓存队列长度 redis)
- 利用Redis队列发动秒杀式抢购(秒杀用redis队列)
- 监控Redis队列助力生产率(监控 redis队列)
- 监控Redis队列,有序处理任务(监控redis处理队列)
- 瞬息万变用Redis构建时间轮(时间轮 redis)
- 时序数据实时存储与管理Redis的应用(时序数据 redis)
- 互联网技术中利用Redis获得更好的体验(互联网redis应用场景)
- 利用Redis实现延时消息队列的高效实现(使用redis做延时队列)
- 多线程安全操作Redis存储数据(多个线程操作redis)
- 公务行设立Redis缓存带来更多便利(公务行设置redis)
- Redis集群镜像建立高效可靠的分布式系统(redis 集群镜像)
- Redis队列阻塞如何处理(redis 队列阻塞)
- 体会Redis队列带来的必要性(redis队列的必要性)
- Redis队列独占方式让消息发送无阻碍(redis队列独占)
- 使用Redis队列有效解决消费问题(redis队列 消费)
- 提升运行效率Redis队列的多线程消费(redis队列多线程消费)
- 的处理处理Redis队列中取出来的数据(redis队列取出来后)
- Redis构建的队列任务机制(redis 队列任务)
- Redis实现超时入队列(redis超时入队列)