限流之令牌桶算法实现

令牌桶算法示意

和漏桶算法算法类似,令牌桶算法的核心是固定“进口”速率,可以应对突发流量,是非常非常常用的算法。

  • 桶里装的是令牌
  • 在被处理之前需要拿到一个令牌,请求处理完毕之后将这个令牌丢弃(删除)
  • 拿不到令牌就被「流量干预」
  • 根据限流大小,按照一定的速率往桶里添加令牌

通常在实现中,令牌的增加都是基于时间的延迟计算和预消费的思想。

缺点

如果在流量突增时有大量累积可用的令牌则会导致短暂的限流失效假象。

实际项目中可以基于当前服务能力动态调整令牌的发放速率,此外我们还需要为桶设置大小上限(或者为令牌设置生命周期),以防止大量令牌累积导致的“伪限流失效”现象。

伪代码实现

  • 控制令牌生成的速率,并放入桶中。单独一个线程在不断的生成令牌。
  • 控制桶中待领取的令牌水位不超过最大水位。这个和「漏桶」一样,就是一个全局计数器,进行加加减减。

通常在实现中并不会使用该方法,这里只是提供一个简单的实现思路。

1
2
3
4
5
6
7
8
9
10
11
if(tokenCount == 0){
return; //不继续处理请求。
}

tokenCount--;

// do something...

// 另外线程
if(tokenCount < maxCount)
tokenCount++;

Redis 实现

这里令牌桶算法的实现用的也是 Guava 项目中 RateLimiter 所使用的算法,同样采用延迟计算、提前消费的方式,将时间和 Token 数目联系起来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
-- key
local key = KEYS[1]
-- 最大存储的令牌数
local max_permits = tonumber(KEYS[2])
-- 每秒钟产生的令牌数
local permits_per_second = tonumber(KEYS[3])
-- 请求的令牌数
local required_permits = tonumber(ARGV[1])

-- 下次请求可以获取令牌的起始时间
local next_free_ticket_micros = tonumber(redis.call('hget', key, 'next_free_ticket_micros') or 0)

-- 当前时间
local time = redis.call('time')
-- time[1] 返回的为秒,time[2] 为 ms
local now_micros = tonumber(time[1]) * 1000000 + tonumber(time[2])

-- 查询获取令牌是否超时(传入参数,单位为 微秒)
if (ARGV[2] ~= nil) then
-- 获取令牌的超时时间
local timeout_micros = tonumber(ARGV[2])
local micros_to_wait = next_free_ticket_micros - now_micros
if (micros_to_wait> timeout_micros) then
return micros_to_wait
end
end

-- 当前存储的令牌数
local stored_permits = tonumber(redis.call('hget', key, 'stored_permits') or 0)
-- 添加令牌的时间间隔(1000000ms 为 1s)
-- 计算生产 1 个令牌需要多少微秒
local stable_interval_micros = 1000000 / permits_per_second

-- 补充令牌,如果当前时间超过上次预支令牌的时间,则可补充令牌
if (now_micros> next_free_ticket_micros) then
-- 根据时间差,算出这个时间已经产生多少个令牌
local new_permits = (now_micros - next_free_ticket_micros) / stable_interval_micros

-- 已有令牌 + 这个时间新产生令牌
stored_permits = math.min(max_permits, stored_permits + new_permits)

-- 补充后,更新下次可以获取令牌的时间为当前时间(已生成令牌的截止时间)
next_free_ticket_micros = now_micros
end

-- 消耗令牌
local moment_available = next_free_ticket_micros

-- 两种情况:required_permits<=stored_permits 或者 required_permits>stored_permits

-- 计算本次可以消耗的量
local stored_permits_to_spend = math.min(required_permits, stored_permits)

-- 需求量 - 本次可以消耗的量 = 还差的量
local fresh_permits = required_permits - stored_permits_to_spend;

-- 如果 fresh_permits>0,说明令牌桶的剩余数目不够了,需要等待一段时间
local wait_micros = fresh_permits * stable_interval_micros

-- Redis 提供了 redis.replicate_commands() 函数来实现这一功能,把发生数据变更的命令以事务的方式做持久化和主从复制,从而允许在 Lua 脚本内进行随机写入
redis.replicate_commands()

-- 存储剩余的令牌数:桶中剩余的数目 - 本次申请的数目
redis.call('hset', key, 'stored_permits', stored_permits - stored_permits_to_spend)

redis.call('hset', key, 'next_free_ticket_micros', next_free_ticket_micros + wait_micros)

redis.call('expire', key, 10)

-- 返回需要等待的时间长度
-- 返回为 0(moment_available==now_micros)表示桶中剩余的令牌足够,不需要等待
return moment_available - now_micros

上述代码传入参数为:

  • key:限流 key
  • max_permits:最大存储的令牌数
  • permits_per_second:每秒钟产生的令牌数
  • required_permits:请求的令牌数
  • timeout_micros:获取令牌的超时时间(非必须)

变量说明:

  • next_free_ticket_micros:下次请求可以获取令牌的起始时间(已产生令牌的截止时间)
    • 每秒都会产生一定数量的令牌
    • 该值是上次请求令牌时,预消费后需要等待的时间
    • 当前时间超过该时间才能产生新的令牌
    • 如果本次请求令牌数超过已产生令牌,则需要预消费一定的令牌数
    • 产生预消费的令牌数所需要的时间,累加回写到 next_free_ticket_micros 上,让下次请求进行等待偿还

返回值:

  • moment_available - now_micros:本次请求需要等待的时间
    • moment_available:是上次申请令牌时预消费让本次请求需要等待到的时间,根据当前时间判断
      • 如果当前时间大于 next_free_ticket_micros,则该值为当前时间 now_micros当前时间已经超过要等待时间,本次无需等待
      • 如果当前时间小于 next_free_ticket_micros,则值为 next_free_ticket_micros,还没到达能产生新令牌的时间,向下次预支,同时需要等一会,偿还上次预支的时间(上次预支到 next_free_ticket_micros
    • now_micros:是当前时间
    • 如果需要预支令牌,预支的时间体现在下次请求的 next_free_ticket_micros当前返回值不体现

运行流程如下:

令牌桶运行流程

预消费

算法的思想主要还是:本次请求需要为上次请求的预消费行为埋单

  • 当请求的令牌数大于已产生令牌数时,需要预消费下一次的令牌
  • 由于每秒都产生一定数量的令牌,由此可以算出预支的令牌需要等待的时间
  • 当方法执行完成时 next_free_ticket_micros 存储的是本次预消费掉令牌使下次获取令牌时需要等待到的时间(本次预支,下次等待
  • 由于上次请求令牌预支了令牌,本次请求令牌结束后需要等待一定时间(上次预支,本次等待

Golang 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package main

import (
"fmt"
"math"
"sync"
"time"
)

// TokenBucket 定义令牌桶结构
type TokenBucket struct {
LastTime time.Time // 当前请求时间
Capacity float64 // 桶的容量(存放令牌的最大量)
Rate float64 // 令牌放入速度
Tokens float64 // 当前令牌总量
Lck *sync.Mutex
}

// NewTokenBucket 初始化TokenBucket
func NewTokenBucket(rate int, cap int) *TokenBucket {
return &TokenBucket{
LastTime: time.Now(),
Capacity: float64(cap),
Rate: float64(rate),
Tokens: float64(cap),
Lck: new(sync.Mutex),
}
}

// getToken 判断是否获取令牌(若能获取,则处理请求)
func (r *TokenBucket) getToken() bool {
now := time.Now()
r.Lck.Lock()
defer r.Lck.Unlock()
// 先添加令牌
tokens := math.Min(r.Capacity, r.Tokens+now.Sub(r.LastTime).Seconds()*r.Rate)
r.Tokens = tokens
if tokens < 1 {
// 若桶中一个令牌都没有了,则拒绝
return false
} else {
// 桶中还有令牌,领取令牌
r.Tokens -= 1
r.LastTime = now
return true
}
}

func main() {
// 初始化 限制每秒2个请求 令牌桶容量为5
r := NewTokenBucket(2, 5)
for i := 0; i < 20; i++ {
ok := r.getToken()
if ok {
fmt.Println("pass ", i)
} else {
fmt.Println("limit ", i)
}
time.Sleep(100 * time.Millisecond)
}
}

Java 实现

Java 实现可以参考 Guava 包中的 RateLimiter,该类中实现基于延迟计算和预消费。上面 Redis Lua 其实也是基于 RateLimiter 的算法实现的。

以下主要看一下核心关键代码,主体代码可以参考上面 Redis 的实现。

1
2
3
4
5
6
7
8
9
10
11
/**
* Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time.
*/
void resync(long nowMicros) {
// if nextFreeTicket is in the past, resync to now
if (nowMicros > nextFreeTicketMicros) {
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
}

如上 resync 函数,该函数会在每次获取令牌之前调用。其实现思路为:

  1. 若当前时间晚于 nextFreeTicketMicros ,则计算该段时间内可以生成多少令牌
  2. 计算该段时间内可以生成多少令牌,将生成的令牌加入令牌桶中并更新数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14

final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros);
long returnValue = nextFreeTicketMicros; // 返回的是上次计算的nextFreeTicketMicros
double storedPermitsToSpend = min(requiredPermits, this.storedPermits); // 可以消费的令牌数
double freshPermits = requiredPermits - storedPermitsToSpend; // 还需要的令牌数
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros); // 根据freshPermits计算需要等待的时间

this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); // 本次计算的nextFreeTicketMicros不返回
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}

该函数用于获取 requiredPermits 个令牌,并返回需要等待到的时间点

  1. storedPermitsToSpend 为桶中可以消费的令牌数
  2. freshPermits 为还需要的(需要补充的)令牌数,根据该值计算需要等待的时间,追加并更新到 nextFreeTicketMicros

该函数的返回是更新前的(上次请求计算的)nextFreeTicketMicros,而不是本次更新的 nextFreeTicketMicros

本次请求需要为上次请求的预消费行为埋单:这次还需要的令牌数 freshPermits 被累加到下次 nextFreeTicketMicros 中了。下次再获取新的令牌时需要多等一会,为上一次埋单。

引用

  1. 分布式限流:基于 Redis 实现
  2. 分布式系统关注点——限流该怎么做?
  3. 限流技术中的常用算法及其优缺点
  4. 分布式系统限流算法分析与实现
  5. Guava RateLimiter 源码解析
  6. 接口限流算法:漏桶算法&令牌桶算法
作者

Jakes Lee

发布于

2021-11-20

更新于

2021-11-24

许可协议

评论