zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

图解 LeakyBucket限流器的实现原理

2023-02-26 09:48:43 时间

leaky bucket 也叫漏桶,就是将请求先放到一个桶中进行排队,然后按固定的速率来处理请求,即所谓的漏出。

桶具有一定的容量,即最多能容纳多少个请求排队,当桶满的时候,再进来的请求就直接过滤掉,不再被处理。

算法的实现有很多种,本文要介绍的算法是基于计数的原理实现的。计数原理的本质就是基于一个始终记录桶中最后一个请求被处理时间的字段、当前时间和速率来计算当前请求能够被处理的时间。如下图所示:

该算法主要的属性字段有以下三个:

  • 处理请求的速率 rate。该值代表多久处理一个请求。实际上就是指处理完该请求后,要等待多久才能处理下一个请求。比如我们初始化时指定服务每 100ms 处理一个请求,也就是每处理 1 个请求,需要等待 100ms 才能处理下一个请求。
  • 桶的最大容量 capacity。该值代表我们最多允许多少个请求排队,超过该值,就直接返回,不用等待了。这个在生活中有很多类似场景:有一次我们去公园排队坐游船,排了很长的队伍。管理员过来告诉我们,只有前 20 个人能排上号,20 号后面的就可以不用排了。
  • 桶中最后一个排队请求被处理的时间 last。该值有两个作用:
    • 第一个作用是当有新请求进来的时候,就可以计算出新请求需要被处理的时间:last+rate
    • 第二个作用是根据 last、当前时间 t 以及速率 rate 计算当前还有多少个请求等待被处理: golang waitRequests = (last - t) / rate

根据上面几个关键属性字段,我们就可以对 LeakBucket 定义了,如下:

type LeakyBucket struct {
  rate int64 //处理请求的速率
  capacity int64 //桶的最大容量
  last time.Time //桶中最后一个排队请求被处理的时间
}
然后,LeakyBucket 还有一个函数 Limit 函数:

func (t *LeakyBucket) Limit() (time.Duration, error) {

}


该函数的主要作用就是计算流入请求能够被处理的等待时间。针对该函数有以下两点说明:
  • 接收到的每个请求都需要调用该函数,每个调用一次就相当于有一个请求流入桶中。
  • 该函数返回值代表调用者要处理该请求需要等待的时长,调用者需要进行 time.Sleep 这么长时间才能进行处理,也就是通过 Sleep 控制了消耗的速度。

因此,我们可知请求流入和流出的流程如下:

如何计算请求被处理的时间

假设现在 LeakyBucket 是一个空桶,按 100ms 处理一个请求的速率漏出,容量大小为 5。现在同时有 5 个请求流入桶中,我们看看每个请求经过 Limit 是如何计算各自的预计处理时间以及等待时间的。

  • 第一个请求进来,不用等待,直接就会被处理。处理的时间就是 last=当前时间 t
  • 第二个请求,因为第一个请求刚刚排队并被处理了,那么就需要按处理速率等待,那么被处理的时间就是第一个请求被处理的时间 +rate,即 last=t+100ms
  • 第三个请求,因为第二个请求仍然在排队,所有应该在处理了第二个后,再等待 100ms 才能被处理,即 last=第二个的处理时间 +100ms=t+300ms
  • 第四、五个请求依次类推,如下图:

我们将上图转换在时间轴上看着会更直观一些:

以上情况是我们假设在空桶的状态下,同时流入了 n 个请求,从第 2 个请求开始,处理时间都在前一个请求的处理时间上 +rate。

但如果新的请求流入是在最后一个请求流入后的 50ms 流入的,即 last+50ms 的时间点。那么新流入的请求被处理的时间就不应该是 last+rate 了,这样该请求的处理时间距离上次处理请求的时间就是:

(last+rate) - (last+50ms)= rate+50ms

这样计算的话就比 rate 多了 50ms。那该如何计算呢? 实际的被处理时间应该是先计算当前时间距离上次被处理时间的间隔,然后再跟 rate 进行比较,看看比 rate 差多少,然后补全该差值即可,即:

delta = 当前时间t - last

last = 当前时间t + (rate - delta)

如下图所示:

如何计算桶满了

当新请求流入后,发现桶已经满了,就不再排队了,而是直接被丢弃掉。那如何计算桶是否已经满了呢?其实就是根据当前时间 now,和桶中排队的最后一个请求被处理的时间 last 之间的差值,再除以速率 rate,就能计算出正在等待被处理的请求有多少个了,然后和 capacity 进行比较即可:

wait =(last - now) / rate
if wait > capacity {
  // 表示桶已经满了
} else {
  // 未满,还可以继续排队
}

这里桶的容量实际上是代表业务能够接受请求被处理的最大等待时间。比如一个 web 用户访问一个页面,最多愿意等 10 秒,那如果请求等待处理的时间是 11 秒,那即使等到被处理了,用户也已经流失了,也就失去了等待的意义。

源码实现

好了,几个关键点介绍完了,下面我们直接贴上部分实现的代码,完整的代码可参考https://github.com/uber-go/ratelimit 和https://github.com/mennanov/limiters/blob/master/leakybucket.go

type LeakyBucket struct {
    rate int64 //处理请求的速率
    capacity int64 //桶的最大容量
    last time.Time //桶中最后一个排队请求被处理的时间
    mu sync.Mutex
}

func (t *LeakyBucket) Limit(ctx context.Context) (time.Duration, error) {
    //这里进行加锁,保证每个请求按顺序依次处理
    t.mu.Lock()
    defer t.mu.Unlock()

    now := time.Now().UnixNano() //当前时间的纳秒数
    if now < t.last {
        // 说明已经有请求在排队了,那么新请求进来排队后被处理的时间就是rate后
        t.last += t.rate
    } else {
        // 说明为桶为空,也许是初始状态,也许是所有的请求都被处理完了.

        var offset int64 //代表等待处理该请求的时间需要等待多久
        delta := now - state.Last // 代表当前时间距离上次处理请求的时间过了多久
        if delta < t.rate {
            //说明还没有到下次处理请求的时间,则需要等待offset后才能到
            offset = t.rate - delta
        }
        //如果delta > t.rate 说明当前时间距离上次处理请求的时间已经超过了rate,offset为0,新的请求就应该被马上处理
        t.last = now + offset //更新该请求应该被处理的时间
    }

    wait := t.last - now //计算桶是否已经满了
    if wait/t.rate > t.capacity {
      //桶满了,返回error,调用者根据需要是直接丢弃还是等待wait长的时间。一般是直接丢弃。

      t.last = now - offset //因为这里要丢弃该请求,所有要保持新请求排队前的状态
        return time.Duration(wait), ErrLimitExhausted
    }

    //排队成功,返回要等待的时间给调用者,让调用者sleep进行阻塞就能实现按rate的速率处理请求了
    return time.Duration(wait), nil
}
总结

LeakyBucket 的核心思想就是按固定的速率处理请求,所以不支持突增的流量。因为即使有再多的流量,也是按固定的速率被处理。他与TokenBucket的区别是TokenBucket是按固定速率产生Token,请求进来的时候只要有Token就能立即被处理,不用等待。只有在无Token时才会等待。


欢迎关注「Go学堂」,让学习成为一种习惯