zl程序教程

您现在的位置是:首页 >  后端

当前栏目

限流算法如何选择?时间敏感,QPS,集群

算法集群 如何 时间 选择 限流 敏感 QPS
2023-09-11 14:15:39 时间

限流是指在系统面临高并发、大流量请求的情况下,限制新的流量对系统的访问,从而保证系统服务的安全性。常用的限流算法有计数器固定窗口算法、滑动窗口算法、漏斗算法和令牌桶算法,下面将对这几种算法进行分别介绍,并给出具体的实现。本文目录如下,略长,读者可以全文阅读,同样也可以只看感兴趣的部分。

计数器固定窗口算法

原理

计数器固定窗口算法是最基础也是最简单的一种限流算法。原理就是对一段固定时间窗口内的请求进行计数,如果请求数超过了阈值,则舍弃该请求;如果没有达到设定的阈值,则接受该请求,且计数加1。当时间窗口结束时,重置计数器为0。

img

代码实现及测试

实现起来也比较简单,如下

import java.util.concurrent.atomic.AtomicInteger;

/**
 * Project: AllForJava
 * Title:
 * Description:
 * Date: 2020-09-07 15:56
 * Copyright: Copyright (c) 2020
 *
* @version 1.0
 **/

public class CounterLimiter {

    private int windowSize; //窗口大小,毫秒为单位
    private int limit;//窗口内限流大小
    private AtomicInteger count;//当前窗口的计数器

    private CounterLimiter(){}

    public CounterLimiter(int windowSize,int limit){
        this.limit = limit;
        this.windowSize = windowSize;
        count = new AtomicInteger(0);

        //开启一个线程,达到窗口结束时清空count
        new Thread(new Runnable() {
            @Override
            public void run() {
                while(true){
                    count.set(0);
                    try {
                        Thread.sleep(windowSize);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }

    //请求到达后先调用本方法,若返回true,则请求通过,否则限流
    public boolean tryAcquire(){
        int newCount = count.addAndGet(1);
        if(newCount > limit){
            return false;
        }else{
            return true;
        }
    }

    //测试
    public static void main(String[] args) throws InterruptedException {
        //每秒20个请求
        CounterLimiter counterLimiter = new CounterLimiter(1000,20);
        int count = 0;
        //模拟50次请求,看多少能通过
        for(int i = 0;i < 50;i ++){
            if(counterLimiter.tryAcquire()){
                count ++;
            }
        }
        System.out.println("第一拨50次请求中通过:" + count + ",限流:" + (50 - count));
        //过一秒再请求
        Thread.sleep(1000);
        //模拟50次请求,看多少能通过
        count = 0;
        for(int i = 0;i < 50;i ++){
            if(counterLimiter.tryAcquire()){
                count ++;
            }
        }
        System.out.println("第二拨50次请求中通过:" + count + ",限流:" + (50 - count));
    }

}

测试结果如下:

计数器固定窗口算法测试结果

可以看到50个请求只有20个通过了,30个被限流,达到了预期的限流效果。

特点分析

优点:实现简单,容易理解。

缺点:流量曲线可能不够平滑,有“突刺现象”,如下图所示。这样会有两个问题:

计数器固定窗口算法限流曲线

  1. 一段时间内(不超过时间窗口)系统服务不可用。比如窗口大小为1s,限流大小为100,然后恰好在某个窗口的第1ms来了100个请求,然后第2ms-999ms的请求就都会被拒绝,这段时间用户会感觉系统服务不可用。

  2. 窗口切换时可能会产生两倍于阈值流量的请求。比如窗口大小为1s,限流大小为100,然后恰好在某个窗口的第999ms来了100个请求,窗口前期没有请求,所以这100个请求都会通过。再恰好,下一个窗口的第1ms有来了100个请求,也全部通过了,那也就是在2ms之内通过了200个请求,而我们设定的阈值是100,通过的请求达到了阈值的两倍。

    计数器固定窗口限流算法产生两倍于阈值流量的请求

计数器滑动窗口算法

原理

计数器滑动窗口算法是计数器固定窗口算法的改进,解决了固定窗口切换时可能会产生两倍于阈值流量请求的缺点。

**滑动窗口算法在固定窗口的基础上,将一个计时窗口分成了若干个小窗口,然后每个小窗口维护一个独立的计数器。**当请求的时间大于当前窗口的最大时间时,则将计时窗口向前平移一个小窗口。平移时,将第一个小窗口的数据丢弃,然后将第二个小窗口设置为第一个小窗口,同时在最后面新增一个小窗口,将新的请求放在新增的小窗口中。同时要保证整个窗口中所有小窗口的请求数目之后不能超过设定的阈值。

计数器滑动窗口算法原理图

从图中不难看出,滑动窗口算法就是固定窗口的升级版。将计时窗口划分成一个小窗口,滑动窗口算法就退化成了固定窗口算法。而滑动窗口算法其实就是对请求数进行了更细粒度的限流,窗口划分的越多,则限流越精准。

代码实现及测试

package project.limiter;

/**
 * Project: AllForJava
 * Title:
 * Description:
 * Date: 2020-09-07 18:38
 * Copyright: Copyright (c) 2020
 *




* @version 1.0
 **/

public class CounterSildeWindowLimiter {

    private int windowSize; //窗口大小,毫秒为单位
    private int limit;//窗口内限流大小
    private int splitNum;//切分小窗口的数目大小
    private int[] counters;//每个小窗口的计数数组
    private int index;//当前小窗口计数器的索引
    private long startTime;//窗口开始时间

    private CounterSildeWindowLimiter(){}

    public CounterSildeWindowLimiter(int windowSize, int limit, int splitNum){
        this.limit = limit;
        this.windowSize = windowSize;
        this.splitNum = splitNum;
        counters = new int[splitNum];
        index = 0;
        startTime = System.currentTimeMillis();
    }

    //请求到达后先调用本方法,若返回true,则请求通过,否则限流
    public synchronized boolean tryAcquire(){
        long curTime = System.currentTimeMillis();
        long windowsNum = Math.max(curTime - windowSize - startTime,0) / (windowSize / splitNum);//计算滑动小窗口的数量
        slideWindow(windowsNum);//滑动窗口
        int count = 0;
        for(int i = 0;i < splitNum;i ++){
            count += counters[i];
        }
        if(count >= limit){
            return false;
        }else{
            counters[index] ++;
            return true;
        }
    }

    private synchronized void slideWindow(long windowsNum){
        if(windowsNum == 0)
            return;
        long slideNum = Math.min(windowsNum,splitNum);
        for(int i = 0;i < slideNum;i ++){
            index = (index + 1) % splitNum;
            counters[index] = 0;
        }
        startTime = startTime + windowsNum * (windowSize / splitNum);//更新滑动窗口时间
    }

    //测试
    public static void main(String[] args) throws InterruptedException {
        //每秒20个请求
        int limit = 20;
        CounterSildeWindowLimiter counterSildeWindowLimiter = new CounterSildeWindowLimiter(1000,limit,10);
        int count = 0;

        Thread.sleep(3000);
        //计数器滑动窗口算法模拟100组间隔30ms的50次请求
        System.out.println("计数器滑动窗口算法测试开始");
        System.out.println("开始模拟100组间隔150ms的50次请求");
        int faliCount = 0;
        for(int j = 0;j < 100;j ++){
            count = 0;
            for(int i = 0;i < 50;i ++){
                if(counterSildeWindowLimiter.tryAcquire()){
                    count ++;
                }
            }
            Thread.sleep(150);
            //模拟50次请求,看多少能通过
            for(int i = 0;i < 50;i ++){
                if(counterSildeWindowLimiter.tryAcquire()){
                    count ++;
                }
            }
            if(count > limit){
                System.out.println("时间窗口内放过的请求超过阈值,放过的请求数" + count + ",限流:" + limit);
                faliCount ++;
            }
            Thread.sleep((int)(Math.random() * 100));
        }
        System.out.println("计数器滑动窗口算法测试结束,100组间隔150ms的50次请求模拟完成,限流失败组数:" + faliCount);
        System.out.println("===========================================================================================");


        //计数器固定窗口算法模拟100组间隔30ms的50次请求
        System.out.println("计数器固定窗口算法测试开始");
        //模拟100组间隔30ms的50次请求
        CounterLimiter counterLimiter = new CounterLimiter(1000,limit);
        System.out.println("开始模拟100组间隔150ms的50次请求");
        faliCount = 0;
        for(int j = 0;j < 100;j ++){
            count = 0;
            for(int i = 0;i < 50;i ++){
                if(counterLimiter.tryAcquire()){
                    count ++;
                }
            }
            Thread.sleep(150);
            //模拟50次请求,看多少能通过
            for(int i = 0;i < 50;i ++){
                if(counterLimiter.tryAcquire()){
                    count ++;
                }
            }
            if(count > limit){
                System.out.println("时间窗口内放过的请求超过阈值,放过的请求数" + count + ",限流:" + limit);
                faliCount ++;
            }
            Thread.sleep((int)(Math.random() * 100));
        }
        System.out.println("计数器滑动窗口算法测试结束,100组间隔150ms的50次请求模拟完成,限流失败组数:" + faliCount);
    }
}


测试时,取滑动窗口大小为1000/10=100ms,然后模拟100组间隔150ms的50次请求,计数器滑动窗口算法与计数器固定窗口算法进行对别,可以看到如下结果:

计数器滑动窗口算法测试结果

固定窗口算法在窗口切换时产生了两倍于阈值流量请求的问题,而滑动窗口算法避免了这个问题。

特点分析

  1. 避免了计数器固定窗口算法固定窗口切换时可能会产生两倍于阈值流量请求的问题
  2. 和漏斗算法相比,新来的请求也能够被处理到,避免了漏斗算法的饥饿问题

对比:固定时间窗 & 滑动时间窗

img

固定时间窗

【设计原理】

  1. 统计固定时间周期内的请求量,超 过阈值则限流;

  2. 存在临界点问题,如图中的红蓝两 点对应的时间范围。

滑动时间窗

【设计原理】

  1. 统计滑动时间周期内的请求量,超 过阈值则限流;

  2. 判断比较准确,但实现稍微复杂。

漏斗算法

原理

漏斗算法的原理也很容易理解。请求来了之后会首先进到漏斗里,然后漏斗以恒定的速率将请求流出进行处理,从而起到平滑流量的作用。当请求的流量过大时,漏斗达到最大容量时会溢出,此时请求被丢弃。

从系统的角度来看,我们不知道什么时候会有请求来,也不知道请求会以多大的速率来,这就给系统的安全性埋下了隐患。

但是如果加了一层漏斗算法限流之后,就能够保证请求以恒定的速率流出。在系统看来,请求永远是以平滑的传输速率过来,从而起到了保护系统的作用。

漏斗算法原理图

代码实现及测试

package project.limiter;

import java.util.Date;
import java.util.LinkedList;

/**
* Project: AllForJava
* Title: 
* Description:
* Date: 2020-09-08 16:45
* Copyright: Copyright (c) 2020
*




* @version 1.0
**/
public class LeakyBucketLimiter {

    private int capaticy;//漏斗容量
    private int rate;//漏斗速率
    private int left;//剩余容量
    private LinkedList<Request> requestList;

    private LeakyBucketLimiter() {}

    public LeakyBucketLimiter(int capaticy, int rate) {
        this.capaticy = capaticy;
        this.rate = rate;
        this.left = capaticy;
        requestList = new LinkedList<>();

        //开启一个定时线程,以固定的速率将漏斗中的请求流出,进行处理
        new Thread(new Runnable() {
            @Override
            public void run() {
                while(true){
                    if(!requestList.isEmpty()){
                        Request request = requestList.removeFirst();
                        handleRequest(request);
                    }
                    try {
                        Thread.sleep(1000 / rate); //睡眠
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }

    /**
     * 处理请求
     * @param request
     */
    private void handleRequest(Request request){
        request.setHandleTime(new Date());
        System.out.println(request.getCode() + "号请求被处理,请求发起时间:"
                + request.getLaunchTime() + ",请求处理时间:" + request.getHandleTime() + ",处理耗时:"
                + (request.getHandleTime().getTime()  - request.getLaunchTime().getTime()) + "ms");
    }

    public synchronized boolean tryAcquire(Request request){
        if(left <= 0){
            return false;
        }else{
            left --;
            requestList.addLast(request);
            return true;
        }
    }


    /**
     * 请求类,属性包含编号字符串、请求达到时间和请求处理时间
     */
    static class Request{
        private int code;
        private Date launchTime;
        private Date handleTime;

        private Request() { }

        public Request(int code,Date launchTime) {
            this.launchTime = launchTime;
            this.code = code;
        }

        public int getCode() {
            return code;
        }

        public void setCode(int code) {
            this.code = code;
        }

        public Date getLaunchTime() {
            return launchTime;
        }

        public void setLaunchTime(Date launchTime) {
            this.launchTime = launchTime;
        }

        public Date getHandleTime() {
            return handleTime;
        }

        public void setHandleTime(Date handleTime) {
            this.handleTime = handleTime;
        }
    }

    public static void main(String[] args) {
        LeakyBucketLimiter leakyBucketLimiter = new LeakyBucketLimiter(5,2);
        for(int i = 1;i <= 10;i ++){
            Request request = new Request(i,new Date());
            if(leakyBucketLimiter.tryAcquire(request)){
                System.out.println(i + "号请求被接受");
            }else{
                System.out.println(i + "号请求被拒绝");
            }
        }
    }
}


测试时,取漏斗限流算法的容量是5,漏斗速率为2个/秒,然后模拟了连续的10个请求,编号从1-10,结果如下:

漏斗算法测试结果

可以看到1-5号请求被接受,而6-10号请求被拒绝,说明此时漏斗已经溢出了,符合我们的预期。

我们再关注下被接受的这5个请求的处理情况,可以看到这5个请求虽然被接受了,但是处理是一个一个被处理的(不一定是顺序的,取决于具体实现),大约每500ms处理一个。这就体现了漏斗算法的特点了,即虽然请求流量是瞬时产生的,但是请求以固定速率流出被处理。因为我们设定的漏斗速率为2个/秒,所以每500ms漏斗会漏出一个请求然后进行处理。

img

请求放入“桶”(消息队列等),业务处理单元(线程/进程/ 服务)从桶里拿请求处理,桶满则丢弃新的请求。

特点分析

  1. 漏桶的漏出速率是固定的,可以起到整流的作用。即虽然请求的流量可能具有随机性,忽大忽小,但是经过漏斗算法之后,变成了有固定速率的稳定流量,从而对下游的系统起到保护作用。
  2. 不能解决流量突发的问题。还是拿刚刚测试的例子,我们设定的漏斗速率是2个/秒,然后突然来了10个请求,受限于漏斗的容量,只有5个请求被接受,另外5个被拒绝。你可能会说,漏斗速率是2个/秒,然后瞬间接受了5个请求,这不就解决了流量突发的问题吗?不,这5个请求只是被接受了,但是没有马上被处理,处理的速度仍然是我们设定的2个/秒,所以没有解决流量突发的问题。而接下来我们要谈的令牌桶算法能够在一定程度上解决流量突发的问题,读者可以对比一下。

技术本质

总量控制,桶大小是设计关键。

优缺点

  1. 桶大小动态调整比较困难,例如 Java BlockingQueue;

  2. 无法控制流出速度(处理速度);

  3. 突发流量时丢弃的请求较少。

应用场景

瞬时高并发流量,例如0点签到,整点秒杀。

漏桶算法变种 - 写缓冲(Buffer)

img

基本原理

如果漏桶的容量无限(例如用 Kafka 消息队列),则漏桶可以用来做写缓冲。

技术本质

同步改异步,缓冲所有请求,慢慢处理。

应用场景

高并发写入请求,例如热门微博评论。

令牌桶算法

原理

令牌桶算法是对漏斗算法的一种改进,除了能够起到限流的作用外,还允许一定程度的流量突发。

在令牌桶算法中,存在一个令牌桶,算法中存在一种机制以恒定的速率向令牌桶中放入令牌。令牌桶也有一定的容量,如果满了令牌就无法放进去了。当请求来时,会首先到令牌桶中去拿令牌,如果拿到了令牌,则该请求会被处理,并消耗掉拿到的令牌;如果令牌桶为空,则该请求会被丢弃。

令牌桶算法原理图

令牌桶其实和漏桶的原理类似,只不过漏桶是定速地流出,而令牌桶是定速地往桶里塞入令牌,然后请求只有拿到了令牌才能通过,之后再被服务器处理。

当然令牌桶的大小也是有限制的,假设桶里的令牌满了之后,定速生成的令牌会丢弃。

规则:

  • 定速的往桶内放入令牌
  • 令牌数量超过桶的限制,丢弃
  • 请求来了先向桶内索要令牌,索要成功则通过被处理,反之拒绝

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QmmV3awc-1645706310250)(https://segmentfault.com/img/remote/1460000023552190)]

看到这又想到啥?Semaphore 信号量啊,信号量可控制某个资源被同时访问的个数,其实和咱们拿令牌思想一样,一个是拿信号量,一个是拿令牌。只不过信号量用完了返还,而咱们令牌用了不归还,因为令牌会定时再填充。

代码实现及测试

package project.limiter;

import java.util.Date;

/**
* Project: AllForJava
* Title: 
* Description:
* Date: 2020-09-08 19:22
* Copyright: Copyright (c) 2020
* 




* @version 1.0
**/
public class TokenBucketLimiter {

    private int capaticy;//令牌桶容量
    private int rate;//令牌产生速率
    private int tokenAmount;//令牌数量

    public TokenBucketLimiter(int capaticy, int rate) {
        this.capaticy = capaticy;
        this.rate = rate;
        tokenAmount = capaticy;
        new Thread(new Runnable() {
            @Override
            public void run() {
                //以恒定速率放令牌
                while (true){
                    synchronized (this){
                        tokenAmount ++;
                        if(tokenAmount > capaticy){
                            tokenAmount = capaticy;
                        }
                    }
                    try {
                        Thread.sleep(1000 / rate);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }

    public synchronized boolean tryAcquire(Request request){
        if(tokenAmount > 0){
            tokenAmount --;
            handleRequest(request);
            return true;
        }else{
            return false;
        }

    }

    /**
     * 处理请求
     * @param request
     */
    private void handleRequest(Request request){
        request.setHandleTime(new Date());
        System.out.println(request.getCode() + "号请求被处理,请求发起时间:"
                + request.getLaunchTime() + ",请求处理时间:" + request.getHandleTime() + ",处理耗时:"
                + (request.getHandleTime().getTime()  - request.getLaunchTime().getTime()) + "ms");
    }

    /**
     * 请求类,属性只包含一个名字字符串
     */
    static class Request{
        private int code;
        private Date launchTime;
        private Date handleTime;

        private Request() { }

        public Request(int code,Date launchTime) {
            this.launchTime = launchTime;
            this.code = code;
        }

        public int getCode() {
            return code;
        }

        public void setCode(int code) {
            this.code = code;
        }

        public Date getLaunchTime() {
            return launchTime;
        }

        public void setLaunchTime(Date launchTime) {
            this.launchTime = launchTime;
        }

        public Date getHandleTime() {
            return handleTime;
        }

        public void setHandleTime(Date handleTime) {
            this.handleTime = handleTime;
        }
    }


    public static void main(String[] args) throws InterruptedException {
        TokenBucketLimiter tokenBucketLimiter = new TokenBucketLimiter(5,2);
        for(int i = 1;i <= 10;i ++){
            Request request = new Request(i,new Date());
            if(tokenBucketLimiter.tryAcquire(request)){
                System.out.println(i + "号请求被接受");
            }else{
                System.out.println(i + "号请求被拒绝");
            }
        }
    }
}

测试时,为了与漏斗限流算法进行对别,同样取令牌桶算法的容量是5,产生令牌的速度为2个/秒,然后模拟了连续的10个请求,编号从1-10,结果如下:

令牌桶算法测试结果

可以看到,对于10个请求,令牌桶算法和漏斗算法一样,都是接受了5个请求,拒绝了5个请求。与漏斗算法不同的是,令牌桶算法马上处理了这5个请求,处理速度可以认为是5个/秒,超过了我们设定的2个/秒的速率,即允许一定程度的流量突发。这一点也是和漏斗算法的主要区别,可以认真体会一下。

img

基本原理

某个处理单元按照指定速率将令牌放入“桶”(消息队列等),业

务处理单元收到请求后需要获取令牌,获取不到就丢弃请求。

特点分析

令牌桶算法是对漏桶算法的一种改进,除了能够在限制调用的平均速率的同时还允许一定程度的流量突发。

技术本质

速率控制,令牌产生的速度是设计关键。

优缺点

优点
  1. 可以动态调整处理速度;
  2. 放过的流量比较均匀,有利于保护系统。
  3. 存量令牌能应对突发流量,很多时候,我们希望能放过脉冲流量。而对于持续的高流量,后面又能均匀地放过不超过限流值的请求数。
缺点
  1. 突发流量的时候可能丢弃很多请求;
  2. 实现相对复杂。

典型应用场景

  1. 控制访问第三方服务的速度,防止把下游压垮;

  2. 控制自己的处理速度,防止过载

令牌桶算法实现( Redis + Lua )

我们采用 Redis + Lua 脚本的方式来实现令牌桶算法,在 Redis 中使用 Lua 脚本有诸多好处,例如:

  • 减少网络开销:本来多次网络请求的操作,可以用一个请求完成,原先多次请求的逻辑放在 Redis 服务器上完成。使用脚本,减少了网络往返时延。
  • 原子操作:Redis 会将整个脚本作为一个整体执行,中间不会被其他进程或者进程的命令插入。
  • 复用:客户端发送的脚本会永久存储在 Redis 中,意味着其他客户端可以复用这一脚本而不需要使用代码完成同样的逻辑。
  • 复用:客户端发送的脚本会永久存储在 Redis 中,意味着其他客户端可以复用这一脚本而不需要使用代码完成同样的逻辑。

这其中最重要的方法就是原子操作。将 Redis 的多条命令写成一个 Lua 脚本,然后调用脚本执行操作,相当于只有一条执行脚本的命令,所以整个 Lua 脚本中的操作都是原子性的。

在 Redis 中使用 Lua 脚本主要涉及 Script LoadEvalEvalsha 三个命令:

Eval ${lua_script} 可以直接执行 Lua 脚本。

Script Load ${lua_script} 命令是将脚本载入 Redis,载入成功后会返回一个脚本的 sha1 值,一旦载入则永久存储在 Redis 中,后续可以通过 Evalsha ${sha1} 来直接调用此脚本。我们采用先 Load 脚本得到 Sha1 值,再调用这个 sha1 值来执行脚本的方式可以减少像eval ${lua_script} 命令这样每次都向 Redis 中发送一长串 Lua 脚本带来的网络开销。

使用 Redis 中的 Hash 数据结构来存储限流配置,每个 Hash 表的 Key 为限流的粒度,可以是接口 Uri、客户端 IP、应用 uuid 或者他们的组合形式。每个 Hash 表为一个令牌桶,Hash 表中包含如下字段:

  • last_time 最近一次请求的时间戳,毫秒级别。
  • curr_permits 当前桶内剩余令牌数量,单位为:个。
  • bucket_cap 桶的容量,即桶内可容纳最大令牌数量,代表限流时间周期内允许通过的最大请求数。
  • period 限流的时间周期,单位为:秒。
  • rate 令牌产生的速率,单位:个/秒,rate = bucket_cap / period

在上面的令牌桶算法描述中生产令牌的方式是按照一定的速率生产令牌并放入令牌桶中,这种方式需要一个线程不停地按照一定的速率生产令牌并更新相应的桶,如果被限流的接口(每个桶)令牌生产的速率都不一样,那么就需要开多个线程,很浪费资源。

为了提高系统的性能,减少限流层的资源消耗,我们将令牌的生产方式改为:每次请求进来时一次性生产上一次请求到本次请求这一段时间内的令牌。随意每次请求生成的令牌数就是 (curr_time -last_time) / 1000 * rate,注意:这里两次时间戳的差值单位是毫秒,而令牌产生速率的单位是 个/秒,所以要除以 1000,把时间戳的差值的单位也换算成秒。

令牌桶算法的实现逻辑为:

img

假如我们的限流策略是一分钟内最多能通过 600 个请求,那么相应的令牌产生速率为 600 / 60 = 10 (个/秒) 。那么当限流策略刚刚配置好这一时刻就有突发的 10 个请求进来,此时令牌桶内还没来的及生产令牌,所以请求拿不到令牌就会被拒绝,这显然不符合我们要求。

为了解决这一问题,我们在限流策略刚刚配置好后的第一个请求来临时将当前可用令牌的值设置为桶的最大容量 600,将最近一次请求时间设置为本次请求来临时一分钟后的时间戳,减去出本次请求需要的令牌后更新桶。这样,在这一分钟以内,有下一次请求进来时,从 Hash 表内取出配置计算当前时间就会小于最近一次请求的时间,随后计算生成的令牌就会是一个小于 0 的负数。所以在更新桶这一步,要根据生成的令牌是否为负数来决定是否更新最后一次请求时间的值。

用 Lua 脚本实现上述逻辑:

local key = KEYS[1] -- 要进行限流的Key,可以是 uri
local consume_permits = tonumber(ARGV[1]) -- 请求消耗的令牌数,每个请求消耗一个
local curr_time = tonumber(ARGV[2]) -- 当前时间

local limiter_info = redis.pcall("HMGET", key, "last_time", "curr_permits", "bucket_cap", "rate", "period")
if not limiter_info[3] then
    return -1
end
local last_time = tonumber(limiter_info[1]) or 0
local curr_permits = tonumber(limiter_info[2]) or 0
local bucket_cap = tonumber(limiter_info[3]) or 0
local rate = tonumber(limiter_info[4]) or 0
local period = tonumber(limiter_info[5]) or 0

local total_permits = bucket_cap
local is_update_time = true
if last_time > 0 then
    local new_permits = math.floor((curr_time-last_time)/1000 * rate)
    if new_permits <= 0 then
        new_permits = 0
        is_update_time = false
    end

    total_permits = new_permits + curr_permits
    if total_permits > bucket_cap then
        total_permits = bucket_cap
    end
else
    last_time = curr_time + period * 1000
end

local res = 1
if total_permits >= consume_permits then
    total_permits = total_permits - consume_permits
else
    res = 0
end

if is_update_time then
    redis.pcall("HMSET", key, "curr_permits", total_permits, "last_time", curr_time)
else
    redis.pcall("HSET", key, "curr_permits", total_permits)
end
return res

上述脚本在调用时接收三个参数,分别为:限流的 key、请求消耗的令牌数、 当前时间戳(毫秒级别)。

在我们的业务代码中,先调用 Redis 的 SCRIPT LOAD 命令将上述脚本 Load 到 Redis 中并将该命令返回的脚本 sha1 值保存。

在后续的请求进来时,调用 Redis 的EVALSHA 命令执行限流逻辑,根据返回值判断是否对本次请求触发限流行为。假如限流的 key 为每次请求的 uri,每次请求消耗 1 个令牌,那么执行 Evalsha 命令进行限流判断的具体操作为:EVALSHA ${sha1} 1 ${uri} 1 ${当前时间戳}(第一个数字 1 代表脚本可接收的参数中有 1 个 Key,第二个数字 1 代表本次请求消耗一个令牌);执行完这条命令后如果返回值是 1 代表桶中令牌够用,请求通过;如果返回值为 0 代表桶中令牌不够,触发限流;如果返回值为 -1 代表本次请求的 uri 未配置限流策略,可根据自己的实际业务场景判断是通过还是拒绝。

限流算法对比

我们对上述四种限流算法进行一下简单的总结。

计数器固定窗口算法实现简单,容易理解。和漏斗算法相比,新来的请求也能够被马上处理到。但是流量曲线可能不够平滑,有“突刺现象”,在窗口切换时可能会产生两倍于阈值流量的请求。

计数器滑动窗口算法作为计数器固定窗口算法的一种改进,有效解决了窗口切换时可能会产生两倍于阈值流量请求的问题

漏斗算法能够对流量起到整流的作用,让随机不稳定的流量以固定的速率流出,但是不能解决流量突发的问题。

令牌桶算法作为漏斗算法的一种改进,除了能够起到平滑流量的作用,还允许一定程度的流量突发。

以上四种限流算法都有自身的特点,具体使用时还是要结合自身的场景进行选取,没有最好的算法,只有最合适的算法

所以漏桶和令牌桶其实比较适合阻塞式限流场景,即没令牌我就等着,这就不会误杀了,而漏桶本就是等着。比较适合后台任务类的限流

而基于时间窗口的限流比较适合对时间敏感的场景,请求过不了您就快点儿告诉我。

比如令牌桶算法一般用于保护自身的系统,对调用者进行限流,保护自身的系统不被突发的流量打垮。如果自身的系统实际的处理能力强于配置的流量限制时,可以允许一定程度的流量突发,使得实际的处理速率高于配置的速率,充分利用系统资源。

漏斗算法一般用于保护第三方的系统,比如自身的系统需要调用第三方的接口,为了保护第三方的系统不被自身的调用打垮,便可以通过漏斗算法进行限流,保证自身的流量平稳的打到第三方的接口上。

算法是死的,而算法中的思想精髓才是值得我们学习的。实际的场景中完全可以灵活运用,还是那句话,没有最好的算法,只有最合适的算法

如何选择限流算法

您可以会根据您的实际情况选择限流算法,建议如下:

  • 当单机 QPS < 100 时,建议使用令牌桶算法。
  • 当单机 QPS > 100 时,可以选择时间窗口限流算法和令牌桶算法。
  • 若您不能容忍单个周期放过的请求数超过限流值时,请选择时间窗口限流算法。

单机限流和分布式限流

本质上单机限流和分布式限流的区别其实就在于 “阈值” 存放的位置。

单机限流就上面所说的算法直接在单台服务器上实现就好了,而往往我们的服务是集群部署的。因此需要多台机器协同提供限流功能。

像上述的计数器或者时间窗口的算法,可以将计数器存放至 Tair 或 Redis 等分布式 K-V 存储中。

例如滑动窗口的每个请求的时间记录可以利用 Redis 的 zset 存储,利用ZREMRANGEBYSCORE删除时间窗口之外的数据,再用 ZCARD计数。

像令牌桶也可以将令牌数量放到 Redis 中。

不过这样的方式等于每一个请求我们都需要去Redis判断一下能不能通过,在性能上有一定的损耗,所以有个优化点就是 「批量」。例如每次取令牌不是一个一取,而是取一批,不够了再去取一批。这样可以减少对 Redis 的请求。

不过要注意一点,批量获取会导致一定范围内的限流误差。比如你取了 10 个此时不用,等下一秒再用,那同一时刻集群机器总处理量可能会超过阈值。

其实「批量」这个优化点太常见了,不论是 MySQL 的批量刷盘,还是 Kafka 消息的批量发送还是分布式 ID 的高性能发号,都包含了「批量」的思想。

当然分布式限流还有一种思想是平分,假设之前单机限流 500,现在集群部署了 5 台,那就让每台继续限流 500 呗,即在总的入口做总的限流限制,然后每台机子再自己实现限流。

限流的难点

可以看到每个限流都有个阈值,这个阈值如何定是个难点。

定大了服务器可能顶不住,定小了就“误杀”了,没有资源利用最大化,对用户体验不好。

我能想到的就是限流上线之后先预估个大概的阈值,然后不执行真正的限流操作,而是采取日志记录方式,对日志进行分析查看限流的效果,然后调整阈值,推算出集群总的处理能力,和每台机子的处理能力(方便扩缩容)。

然后将线上的流量进行重放,测试真正的限流效果,最终阈值确定,然后上线。

我之前还看过一篇耗子叔的文章,讲述了在自动化伸缩的情况下,我们要动态地调整限流的阈值很难,于是基于TCP拥塞控制的思想,根据请求响应在一个时间段的响应时间P90或者P99值来确定此时服务器的健康状况,来进行动态限流。在他的Ease Gateway产品中实现了这套算法,有兴趣的同学可以自行搜索。

其实真实的业务场景很复杂,需要限流的条件和资源很多,每个资源限流要求还不一样。

限流组件

一般而言我们不需要自己实现限流算法来达到限流的目的,不管是接入层限流还是细粒度的接口限流其实都有现成的轮子使用,其实现也是用了上述我们所说的限流算法。

比如Google Guava 提供的限流工具类 RateLimiter,是基于令牌桶实现的,并且扩展了算法,支持预热功能。

阿里开源的限流框架Sentinel 中的匀速排队限流策略,就采用了漏桶算法。

Nginx 中的限流模块 limit_req_zone,采用了漏桶算法,还有 OpenResty 中的 resty.limit.req库等等。

参考链接:

https://juejin.cn/post/6870396751178629127

https://segmentfault.com/a/1190000023552181

https://www.infoq.cn/article/ipxnuqwu3lgwxc8j7tzw

https://help.aliyun.com/document_detail/149952.html