可能要用心学高并发核心编程,限流原理与实战,分布式令牌桶限流
2023-06-13 09:14:10 时间
实战:分布式令牌桶限流
本节介绍的分布式令牌桶限流通过Lua+Java结合完成,首先在Lua脚本中完成限流的计算,然后在Java代码中进行组织和调用。
分布式令牌桶限流Lua脚本
分布式令牌桶限流Lua脚本的核心逻辑和Java令牌桶的执行逻辑类似,只是限流计算相关的统计和时间数据存放于Redis中。
这里将限流的脚本命名为rate_limiter.lua,该脚本既使用Redis存储令牌桶信息,自身又执行于Redis中,所以笔者将该脚本放置于base-redis基础模块中,它的代码如下:
---此脚本的环境:redis内部,不是运行在Nginx内部---方法:申请令牌----1:failed---1:success---@param key:key限流关键字---@param apply:申请的令牌数量local function acquire(key, apply)
local times = redis.call('TIME'); --times[1] 秒数 --times[2] 微秒数
local curr_mill_second = times[] * + times[];
curr_mill_second = curr_mill_second / ; local cacheInfo = redis.pcall("HMGET", key, "last_mill_second", "curr_permits", "max_permits", "rate") ---局部变量:上次申请的时间
local last_mill_second = cacheInfo[]; ---局部变量:之前的令牌数
local curr_permits = tonumber(cacheInfo[]); ---局部变量:桶的容量
local max_permits = tonumber(cacheInfo[]); ---局部变量:令牌的发放速率
local rate = cacheInfo[]; ---局部变量:本次的令牌数
local local_curr_permits = max_permits; if (type(last_mill_second) ~= 'boolean' and last_mill_second ~= nil) then
--计算时间段内的令牌数
local reverse_permits = math.floor(((curr_mill_second - last_mill_second) / ) *rate); --令牌总数
local expect_curr_permits = reverse_permits + curr_permits; --可以申请的令牌总数
local_curr_permits = math.min(expect_curr_permits, max_permits); else
--第一次获取令牌
redis.pcall("HSET", key, "last_mill_second", curr_mill_second) end
local result = -1; --有足够的令牌可以申请
if (local_curr_permits - apply >= ) then
--保存剩余的令牌
redis.pcall("HSET", key, "curr_permits", local_curr_permits - apply); --保存时间,下次令牌获取时使用
redis.pcall("HSET", key, "last_mill_second", curr_mill_second) --返回令牌获取成功
result = ; else
--保存令牌总数
redis.pcall("HSET", key, "curr_permits", local_curr_permits); --返回令牌获取失败
result = -1; end
return resultend---方法:初始化限流器---1 success---@param key key---@param max_permits 桶的容量---@param rate 令牌的发放速率local function init(key, max_permits, rate)
local rate_limit_info = redis.pcall("HMGET", key, "last_mill_second", "curr_permits", "max_permits", "rate") local org_max_permits = tonumber(rate_limit_info[]) local org_rate = rate_limit_info[] if (org_max_permits == nil) or (rate ~= org_rate or max_permits ~= org_max_permits) then
redis.pcall("HMSET", key, "max_permits", max_permits, "rate", rate, "curr_permits", max_permits) end
return ;end---方法:删除限流Keylocal function delete(key)
redis.pcall("DEL", key) return ;endlocal key = KEYS[]local method = ARGV[]if method == 'acquire' then
return acquire(key, ARGV[], ARGV[])elseif method == 'init' then
return init(key, ARGV[], ARGV[])elseif method == 'delete' then
return delete(key)else
--ignoreend
该脚本有3个方法,其中两个方法比较重要,分别说明如下:
(1)限流器初始化方法init(key,max_permits,rate),此方法在限流开始时被调用。
(2)限流检测的方法acquire(key,apply),此方法在请求到来时被调用。
Java分布式令牌桶限流
rate_limiter.lua脚本既可以在Java中调用,又可以在Nginx中调用。本小节先介绍其在Java中的使用,第10章再介绍其在Nginx中的使用。
Java分布式令牌桶限流器的实现就是通过Java代码向Redis加载rate_limiter.lua脚本,然后封装其令牌桶初始化方法init(...)和限流监测方法acquire(...),以供外部调用。它的代码如下:
package com.crazymaker.springcloud.standard.ratelimit;
.../**
*实现:令牌桶限流服务
*create by尼恩 @ 疯狂创客圈
**/@Slf4jpublic class RedisRateLimitImpl implements RateLimitService, InitializingBean
{ /**
*限流器的redis key前缀
*/
private static final String RATE_LIMITER_KEY_PREFIX = "rate_limiter:";//private ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
private RedisRateLimitProperties redisRateLimitProperties; private RedisTemplate redisTemplate; //lua脚本的实例
private static RedisScript<Long> rateLimiterScript = null; //lua脚本的类路径
private static String rateLimitLua = "script/rate_limiter.lua"; static
{ //从类路径文件中加载令牌桶lua脚本
String script = IOUtil.loadJarFile(RedisRateLimitImpl.class.getClassLoader(), rateLimitLua); if (StringUtils.isEmpty(script))
{
log.error("lua script load failed:" + rateLimitLua);
} else
{ //创建Lua脚本实例
rateLimiterScript = new DefaultRedisScript<>(script, Long.class);
}
} public RedisRateLimitImpl(
RedisRateLimitProperties redisRateLimitProperties,
RedisTemplate redisTemplate)
{ this.redisRateLimitProperties = redisRateLimitProperties; this.redisTemplate = redisTemplate;
} private Map<String, LimiterInfo> limiterInfoMap = new HashMap<>(); /**
*限流器的信息
*/
@Builder
@Data
public static class LimiterInfo
{ /**
*限流器的key,如秒杀的id
*/ private String key; /**
*限流器的类型,如seckill
*/
private String type = "default"; /**
*限流器的最大桶容量
*/
private Integer maxPermits; /**
*限流器的速率
*/
private Integer rate; /**
*限流器的redis key
*/
public String fullKey()
{ return RATE_LIMITER_KEY_PREFIX + type + ":" + key;
} /**
*限流器在map中的缓存key
*/
public String cashKey()
{ return type + ":" + key;
}
} /**
*限流检测:是否超过redis令牌桶限速器的限制
*
*@param cacheKey计数器的key
*@return true or false
*/
@Override
public Boolean tryAcquire(String cacheKey)
{ if (cacheKey == null)
{ return true;
} if (cacheKey.indexOf(":") <= )
{
cacheKey = "default:" + cacheKey;
}
LimiterInfo limiterInfo = limiterInfoMap.get(cacheKey); if (limiterInfo == null)
{ return true;
}
Long acquire = (Long) redisTemplate.execute(rateLimiterScript,
ImmutableList.of(limiterInfo.fullKey()), "acquire", "1"); if (acquire == )
{ return false;
} return true;
} /**
*重载方法:限流器初始化
*
*@param limiterInfo限流的类型
*/
public void initLimitKey(LimiterInfo limiterInfo)
{ if (null == rateLimiterScript)
{ return;
} String maxPermits = limiterInfo.getMaxPermits().toString(); String rate = limiterInfo.getRate().toString(); //执行redis脚本
Long result = (Long) redisTemplate.execute(rateLimiterScript,
ImmutableList.of(limiterInfo.fullKey()), "init",
maxPermits,
rate); limiterInfoMap.put(limiterInfo.cashKey(), limiterInfo);
} /**
*限流器初始化
*
*@param type类型
*@param key id
*@param maxPermits上限
*@param rate 速度
*/
public void initLimitKey(String type, String key,
Integer maxPermits, Integer rate)
{
LimiterInfo limiterInfo = LimiterInfo.builder()
.type(type)
.key(key)
.maxPermits(maxPermits)
.rate(rate)
.build();
initLimitKey(limiterInfo);
} /**
*获取redis lua脚本的sha1编码,并缓存到redis
*/
public String cacheSha1()
{ String sha1 = rateLimiterScript.getSha1();
redisTemplate.opsForValue().set("lua:sha1:rate_limiter", sha1); return sha1;
}
}
Java分布式令牌桶限流的自验证
自验证的工作:首先初始化分布式令牌桶限流器,然后使用两条
线程不断进行限流的检测。自验证的代码如下:
package com.crazymaker.springcloud.ratelimit;
...@Slf4j@RunWith(SpringRunner.class)
//指定启动类
@SpringBootTest(classes = {DemoCloudApplication.class})
/**
*redis分布式令牌桶测试类
*/public class RedisRateLimitTest{ @Resource(name = "redisRateLimitImpl")
RedisRateLimitImpl limitService; //线程池,用于多线程模拟测试
private ExecutorService pool = Executors.newFixedThreadPool(); @Test
public void testRedisRateLimit()
{ //初始化分布式令牌桶限流器
limitService.initLimitKey( "seckill", //redis key中的类型
"10000", //redis key中的业务key,比如商品id
, //桶容量
); //每秒令牌数
AtomicInteger count = new AtomicInteger(); long start = System.currentTimeMillis(); //线程数
final int threads = ; //每条线程的执行轮数
final int turns = ; //同步器
CountDownLatch countDownLatch = new CountDownLatch(threads); for (int i = ; i < threads; i++)
{
pool.submit(() ->
{ try
{ //每个用户访问turns次
for (int j = ; j < turns; j++)
{ boolean limited = limitService.tryAcquire
("seckill:10000"); if (limited)
{
count.getAndIncrement();
}
Thread.sleep();
}
} catch (Exception e)
{ e.printStackTrace();
}
countDownLatch.countDown();
});
} try
{
countDownLatch.await();
} catch (InterruptedException e)
{
e.printStackTrace();
} float time = (System.currentTimeMillis() - start) / 1000F; //输出统计结果
log.info("限制的次数为:" + count.get() + " 时长为:" + time);
log.info("限制的次数为:" + count.get() + ",通过的次数为:" + (threads *turns - count.get()));
log.info("限制的比例为:" +
(float) count.get() / (float) (threads *turns));
log.info("运行的时长为:" + time); try
{
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
两条线程各运行20次,每一次运行休眠200毫秒,总计耗时4秒,
运行40次,部分输出结果如下:
[main] INFO c.c.s.r.RedisRateLimitTest - 限制的次数为:32 时长为:4.015[main] INFO c.c.s.r.RedisRateLimitTest - 限制的次数为:32,通过的次数为:8[main] INFO c.c.s.r.RedisRateLimitTest - 限制的比例为:0.8[main] INFO c.c.s.r.RedisRateLimitTest - 运行的时长为:4.015
大家可以自行调整参数,运行以上自验证程序并观察实验结果,体验一下分布式令牌桶限流的效果。
本文给大家讲解的内容是高并发核心编程,限流原理与实战,实战:分布式令牌桶限流
- 下篇文章给大家讲解的是高并发核心编程,Spring Cloud+Nginx秒杀实战;
- 觉得文章不错的朋友可以转发此文关注小编;
- 感谢大家的支持!
本文就是愿天堂没有BUG给大家分享的内容,大家有收获的话可以分享下,想学习更多的话可以到微信公众号里找我,我等你哦。
相关文章
- Java并发编程之AQS以及源码解析
- Go语言中有没有结构化并发?
- 【java并发编程】底层原理——用户态和内核态的区别
- 【并发编程】生产者-消费者模式
- 【并发编程】Executor框架
- juc并发编程03——Lock与Condition接口
- Juc并发编程06——深入剖析队列同步器AQS源码
- 高并发核心编程Spring Cloud+Nginx秒杀实战,秒杀业务的参考实现
- 涨薪5K必学高并发核心编程,限流原理与实战,分布式计数器限流
- 长这么大才读懂高并发核心编程,限流原理与实战,Nginx漏桶限流
- 阿里大牛评:入门到大成!GitHub新上线并发编程深度解析实战PDF
- 单机高并发模型设计
- 瞅一眼就会使用GO的并发编程分享
- 【日拱一卒进击大厂系列】面试官:为什么单线程的Redis可以实现高并发访问
- 甲骨文Java语言架构师:虚拟线程将会深刻影响大规模Java应用的并发机制
- 【Java 并发编程】线程池机制 ( 线程池状态分析 | 线程池状态转换 | RUNNING | SHUTDOWN | STOP | TIDYING | TERMINATED )
- 【浅尝高并发编程】接私活差点翻车
- Go-并发编程-无缓冲和有缓冲 channel 的区别(一)
- Java并发编程之读写锁详解编程语言
- Java并发编程之synchronized详解编程语言
- Linux高性能编程:提升效率的最佳实践(linux高并发编程)
- Redis实现高性能分布式客户端(redis 高并发客户端)
- 解决方案Redis集群的并发性能低下(redis集群并发变低)
- Redis自增自减操作的高并发实现(redis自增和自减并发)
- Java并发编程示例(九):本地线程变量的使用