限流之滑动窗口计数器实现

限流之滑动窗口计数器实现

滑动窗口其实就是对固定窗口做了进一步的细分,将原先的粒度切的更细,相当于固定窗口计数器算法的升级版。

如果固定窗口的「固定周期」已经很小了,那么使用滑动窗口的意义也就没有了。

本文整理了滑动窗口计数器的伪代码、Redis、Go 的实现方法。

比如 1 分钟的固定窗口切分为 60 个 1 秒的滑动窗口。统计的时间范围随着时间的推移同步后移。

当滑动窗口的格子划分的越多,滑动窗口的滚动就越平滑,限流的统计就会越精确。

实现的不同

滑动窗口目前主要有两种实现思路:

  • 窗口滑动,请求统计以整个时间窗口范围进行,请求在每个小窗口中不均匀
  • 窗口滑动,请求平均分散在每个滑动的格子中

缺点

滑动窗口也无法解决短时间之内集中流量的冲击。例如每秒限制 1000 个请求,但是有可能存在前 5 毫秒的时候,阀值就被打满的情况。

计数器限流原理简单,实现比较容易,但是也有一个痛点问题就是它的突刺现象

10s 限制 1000 请求,到第 2s 时已达请求上限,那么在第 3-10s 内的请求将会持续拒绝,在服务器资源空闲的状态下会造成极大的浪费。

伪代码实现

1
2
3
4
5
6
7
8
9
10
11
12
全局数组 链表[]  counterList = new 链表[切分的滑动窗口数量];
// 有一个定时器,在每一次统计时间段起点需要变化的时候就将索引0位置的元素移除,并在末端追加一个新元素。

int sum = counterList.Sum();
if(sum > 限流阈值) {
return; //不继续处理请求。
}

int 当前索引 = 当前时间的秒数 % 切分的滑动窗口数量;
counterList[当前索引]++;

// do something...

Redis 实现

主要是利用 redis 的有序集合(zset)来实现,计算前一秒内已访问的次数主要使用 zcount 命令来计算在当前集合中指定区间分数内的成员数。

只有成员数没达到限流阈值才用 zadd 将当前请求加入 zset。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
local key = KEYS[1]
-- 一个时间窗口限制数量
local limitCount = tonumber(ARGV[1])
-- 获取当前时间,单位毫秒
local currentTime = tonumber(ARGV[2])
-- 当前时间戳的key值,唯一
local timeKey = ARGV[3]
-- 获取时间窗口范围,传参单位秒,默认窗口一秒
local timeRange
if ARGV[4] == nil then
timeRange = 1000
else
timeRange = tonumber(ARGV[4]) * 1000
end
-- 获取集合key过期时间,当key过期时会存在瞬时并发的情况,因此过期时间不能太短或者改用定时清除,传参单位秒,默认一小时
local expiration
if ARGV[5] == nil then
expiration = 3600
else
timeRange = tonumber(ARGV[5])
end
-- 前一秒内已访问的次数
local beforeCount = 0
local existKey = redis.call('exists', key)
if (existKey == 1) then
-- 计算前一秒访问次数
beforeCount = redis.call('zcount', key, currentTime - timeRange, currentTime)
end

local result = 0
if (limitCount > beforeCount) then
result = 1
-- 记录当前访问时间
redis.call('zadd', key, currentTime, timeKey)
end
-- 设置过期时间
redis.call('expire', key, expiration)
return result

参数:

  • KEYS[1]: 限流的 Key
  • ARGV[1]:一个时间窗口限制数量
  • ARGV[2]:当前时间,单位毫秒
  • ARGV[3]:时间戳的 key 值,唯一
  • ARGV[4]:时间窗口范围,传参单位秒,默认窗口一秒
  • ARGV[5]:限流的 Key 的超时时间,单位秒,默认 1 小时

Golang 实现

以下实现为每窗口平均计数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package main

import (
"fmt"
"sync"
"time"
)

// SlidingWindowLimiter 滑动窗口计数器限流
type SlidingWindowLimiter struct {
Interval int64 // 总计数时间
LastTime time.Time // 上一个窗口时间
Lck *sync.Mutex // 锁
WinCount []int64 // 窗口中请求当前数量
TicketSize int64 // 窗口最大容量
TicketCount int64 // 窗口个数
CurIndex int64 // 目前使用的窗口下标
}

// NewSlidingWindowLimiter 初始化滑动窗口计数器限流
func NewSlidingWindowLimiter(interval int64, ticketCount int64, ticketSize int64) *SlidingWindowLimiter {
return &SlidingWindowLimiter{
Interval: interval,
LastTime: time.Now(),
TicketSize: ticketSize,
TicketCount: ticketCount,
WinCount: make([]int64, ticketSize, ticketSize),
CurIndex: 0,
Lck: new(sync.Mutex),
}
}

// slidingCounterLimit 滑动窗口计数器限流实现
func (r *SlidingWindowLimiter) slidingCounterLimit() bool {
r.Lck.Lock()
defer r.Lck.Unlock()
eachTicketTime := r.Interval / r.TicketCount
now := time.Now()
// 如果间隔时间超过一个窗口的时间 当前窗口置0 指向下一个窗口
if now.Unix()-r.LastTime.Unix() > eachTicketTime {
r.WinCount[r.CurIndex] = 0
r.CurIndex = (r.CurIndex + 1) % r.TicketCount
r.LastTime = now
}
fmt.Println("当前窗口:", r.CurIndex)
// 当前窗口未满则正常计数
if r.WinCount[r.CurIndex] < r.TicketSize {
r.WinCount[r.CurIndex]++
return true
}
return false
}

func main() {
// 定义1秒10个时间窗口 每个窗口大小为1 即1秒10个请求
r := NewSlidingWindowLimiter(1, 10, 1)
for i := 0; i < 20; i++ {
ok := r.slidingCounterLimit()
if ok {
fmt.Println("pass ", i)
} else {
fmt.Println("limit ", i)
}
time.Sleep(100 * time.Millisecond)
}
}

引用

  1. 分布式限流:基于 Redis 实现
  2. 分布式系统关注点——限流该怎么做?
  3. 分布式系统限流算法分析与实现
  4. redis+lua实现滑动窗口限流
作者

Jakes Lee

发布于

2021-10-20

更新于

2021-11-24

许可协议

评论