写在前面
前段时间做了一些限流的事情,现在总结一下各种主流的限流算法,并且用go语言实现一遍。代码都在github上:https://github.com/CocaineCong/BiliBili-Code
固定窗口
固定窗口限流算法,我们也称为计数器算法。这个算法很简单,在一段时间内只能接受特定数量的请求。 比如 1s 内只能接受100个请求,第101个开始就进行抛弃。为了并发安全,我们要使用原子操作,核型代码逻辑如下:
func (f *FixedWindowCounter) Allow() bool {
f.mutex.Lock()
defer f.mutex.Unlock()
now := time.Now()
// 如果超过这段时间范围,重置计数器
if now.Sub(f.lastTime) >= f.window {
f.counter = 0
f.lastTime = now
}
// 检查是否超过限制
if f.counter >= f.limit {
return false
}
f.counter++
return true
}
所以我们可以看到这个算法比较明显的缺点:在临界点上,如果突然增加了流量,会导致请求到下游的流量翻倍。 举个例子:
-
0:59 这时请求100个 -
到 1:00 进行重置计数器 -
到 1:01 又请求了100个 -
那么就会有短时间翻倍的请求打到下游
所以我们引入滑动窗口,来规避这种临界问题。
滑动窗口
其实早在计算机网络的TCP中,我们就深入理解过TCP的流量控制方法,滑动窗口。原理也不难,滑动 说明:
-
这个窗口 有别了上面的固定窗口,在时间轴上的位置不固定,随着时间滑动 -
这个窗口 可能固定,也可能随时间变大变小,也可能随请求量变大变小
-
将时间轴变成多段时间,比如1s一段。 -
再定义一个时间窗口,比如 0.5s 作为一个窗口。 -
当时间窗口移动时,需要把 上一个时间片段中的请求数减掉。比如本来窗口有100个,过来0.1,0.2s后将清除0.1和0.2的50个请求。 -
当有新的请求进来的时候,检查窗口内的计数是否已满。 -
如果计数未满,请求被允许执行,并增加当前窗口的请求数。 -
如果计数已满,请求被拒绝或进入等待队列。
核心代码逻辑:
func (s *SlidingWindowCounter) Allow() bool {
s.mutex.Lock()
defer s.mutex.Unlock()
now := time.Now()
currentWindow := now.Truncate(s.precision).Unix()
s.cleanExpiredWindows(now) // 清理过期的窗口数据
totalRequests := s.countRequestsInWindow(now) // 计算当前窗口内的总请求数
if totalRequests >= s.limit { // 检查是否超过限制
return false
}
s.requests[currentWindow]++ // 增加当前窗口的计数
return true
}
大家看到滑动窗口可以很平滑地控制流量,至少可以应对突发的峰值流量。 但如果这个窗口如果一开始就被打满了,那么剩下的时间都不可访问了,这似乎有点不讲道理。然后我们就引入了漏桶算法。
漏桶算法
根据常识我们知道,如果把水放入一个固定容量的桶并且桶底有一个固定大小的洞时,水会以固定速率从洞中流出。如果水的流入速度超过了洞口的流出速率,那么多余的水会溢出被丢弃。
如果大家把上面这张图逆时针旋转90度来看,是不是很像我们的MQ消息队列。雨表示消息队列的大量生产者,桶表示消息队列的存储,水滴表示消息队列的消费者。
所以漏桶算法也具备削峰填谷的作用,经过漏桶后请求就能匀速平滑的流出。
-
启动一个协程定时漏水 -
如果桶满了,不再给予请求通过,如果没满,则放入桶中等待处理请求 -
获取桶中的水滴后,对桶内的容量进行扣除,以让下一个请求进行处理
核心代码逻辑如下:
-
start 启动漏桶的定时漏水协程
func (lb *LeakyBucket) start() {
lb.mutex.Lock()
if lb.isRunning {
lb.mutex.Unlock()
return
}
lb.isRunning = true
lb.mutex.Unlock()
gofunc() {
ticker := time.NewTicker(lb.leakRate)
defer ticker.Stop()
for {
select {
case <-ticker.C:
lb.leak()
case <-lb.stopCh:
return
}
}
}()
}

-
Allow 尝试向桶中添加一个请求,如果桶未满,返回 true;如果桶满了,返回 false
func (lb *LeakyBucket) Allow() bool {
lb.mutex.Lock()
defer lb.mutex.Unlock()
if lb.tokens < lb.capacity {
lb.tokens++
return true
}
return false
}
-
leak 执行漏桶操作,让请求处理完后进行漏桶。
func (lb *LeakyBucket) leak() {
lb.mutex.Lock()
defer lb.mutex.Unlock()
if lb.tokens > 0 {
lb.tokens--
lb.lastLeak = time.Now()
}
}
虽然漏桶算法可以顺滑处理请求,但是无法应对突发流量的来袭,并且处理请求会有延迟。
那什么情况会用漏桶算法呢?我们上面说了,漏桶算法很像消息队列,我们可以想一下我们什么场景下会用消息队列,是不是为了应付一些高流量的场景,避免高流量把下游打满,这是不是对下游的一种保护。所以其实漏桶算法就是为了保护下游,避免下游打挂,下游不能承受过重的QPS,只能一点一点地喂给下游
我们消息队列是为了那些不是很实时的业务,所做的高时延的动作。但对于http请求我们必须要求低时延,不能说我一个请求要等很久,并且漏桶算法也没法处理突发流量,所以就引入了令牌桶算法。
令牌桶算法
-
令牌桶也是一个桶,但会往桶里添加令牌,填满了就丢弃令牌。 -
请求是否被处理要看桶中令牌是否足够, 桶中有多少令牌就处理多少请求。 -
如果令牌为0那么拒绝处理请求。
我们可以看到令牌桶是可以处理突发流量,无论桶里的令牌够不够,都能马上给出响应。令牌够,请求,令牌不够,请求够的,不够的直接拒绝,干脆利索。
核心代码逻辑:
-
start 启动令牌补充协程
func (tb *TokenBucket) start() {
tb.mutex.Lock()
if tb.isRunning {
tb.mutex.Unlock()
return
}
tb.isRunning = true
tb.mutex.Unlock()
gofunc() {
ticker := time.NewTicker(tb.refillPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
tb.refill()
case <-tb.stopCh:
return
}
}
}()
}
-
函数 AllowN尝试获取 n 个令牌
func (tb *TokenBucket) AllowN(n int) bool {
tb.mutex.Lock()
defer tb.mutex.Unlock()
if tb.tokens >= n {
tb.tokens -= n
return true
}
return false
}
与漏桶算法对比,其实令牌桶算法可以应对一些突发的流量,并且支持低时延的响应。