和漏桶算法算法类似,令牌桶算法的核心是固定“进口”速率,可以应对突发流量 ,是非常非常常用的算法。
桶里装的是令牌
在被处理之前需要拿到一个令牌,请求处理完毕之后将这个令牌丢弃(删除)
拿不到令牌就被「流量干预」
根据限流大小,按照一定的速率往桶里添加令牌
通常在实现中,令牌的增加都是基于时间的延迟计算和预消费的思想。
缺点 如果在流量突增时有大量累积可用的令牌则会导致短暂的限流失效假象。
实际项目中可以基于当前服务能力动态调整令牌的发放速率,此外我们还需要为桶设置大小上限(或者为令牌设置生命周期),以防止大量令牌累积导致的“伪限流失效”现象。
伪代码实现
控制令牌生成的速率,并放入桶中。单独一个线程在不断的生成令牌。
控制桶中待领取的令牌水位不超过最大水位。这个和「漏桶」一样,就是一个全局计数器,进行加加减减。
通常在实现中并不会使用该方法,这里只是提供一个简单的实现思路。
1 2 3 4 5 6 7 8 9 10 11 if(tokenCount == 0){ return; //不继续处理请求。 } tokenCount--; // do something... // 另外线程 if(tokenCount < maxCount) tokenCount++;
Redis 实现 这里令牌桶算法的实现用的也是 Guava 项目中 RateLimiter 所使用的算法,同样采用延迟计算、提前消费的方式,将时间和 Token 数目联系起来:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 local key = KEYS[1 ]local max_permits = tonumber (KEYS[2 ])local permits_per_second = tonumber (KEYS[3 ])local required_permits = tonumber (ARGV[1 ])local next_free_ticket_micros = tonumber (redis.call('hget' , key, 'next_free_ticket_micros' ) or 0 )local time = redis.call('time' )local now_micros = tonumber (time [1 ]) * 1000000 + tonumber (time [2 ])if (ARGV[2 ] ~= nil ) then local timeout_micros = tonumber (ARGV[2 ]) local micros_to_wait = next_free_ticket_micros - now_micros if (micros_to_wait> timeout_micros) then return micros_to_wait end end local stored_permits = tonumber (redis.call('hget' , key, 'stored_permits' ) or 0 )local stable_interval_micros = 1000000 / permits_per_secondif (now_micros> next_free_ticket_micros) then local new_permits = (now_micros - next_free_ticket_micros) / stable_interval_micros stored_permits = math .min (max_permits, stored_permits + new_permits) next_free_ticket_micros = now_micros end local moment_available = next_free_ticket_microslocal stored_permits_to_spend = math .min (required_permits, stored_permits)local fresh_permits = required_permits - stored_permits_to_spend;local wait_micros = fresh_permits * stable_interval_microsredis.replicate_commands() redis.call('hset' , key, 'stored_permits' , stored_permits - stored_permits_to_spend) redis.call('hset' , key, 'next_free_ticket_micros' , next_free_ticket_micros + wait_micros) redis.call('expire' , key, 10 ) return moment_available - now_micros
上述代码传入参数为:
key
:限流 key
max_permits
:最大存储的令牌数
permits_per_second
:每秒钟产生的令牌数
required_permits
:请求的令牌数
timeout_micros
:获取令牌的超时时间(非必须)
变量说明:
next_free_ticket_micros
:下次请求可以获取令牌的起始时间(已产生令牌的截止时间)
每秒都会产生一定数量的令牌
该值是上次请求令牌时,预消费后需要等待的时间
当前时间超过该时间才能产生新的令牌
如果本次请求令牌数超过已产生令牌,则需要预消费一定的令牌数
产生预消费的令牌数所需要的时间,累加回写到 next_free_ticket_micros
上,让下次请求进行等待偿还
返回值:
moment_available - now_micros
:本次请求需要等待的时间
moment_available
:是上次申请令牌时预消费让本次请求需要等待到的时间,根据当前时间判断
如果当前时间大于 next_free_ticket_micros
,则该值为当前时间 now_micros
,当前时间已经超过要等待时间,本次无需等待
如果当前时间小于 next_free_ticket_micros
,则值为 next_free_ticket_micros
,还没到达能产生新令牌的时间,向下次预支,同时需要等一会,偿还上次预支的时间(上次预支到 next_free_ticket_micros
)
now_micros
:是当前时间
如果需要预支令牌,预支的时间体现在下次请求的 next_free_ticket_micros
,当前返回值不体现
运行流程如下:
预消费 算法的思想主要还是:本次请求需要为上次请求的预消费行为埋单 。
当请求的令牌数大于已产生令牌数时,需要预消费下一次的令牌
由于每秒都产生一定数量的令牌,由此可以算出预支的令牌需要等待的时间
当方法执行完成时 next_free_ticket_micros
存储的是本次预消费掉令牌使下次获取令牌时需要等待到的时间(本次预支,下次等待 )
由于上次请求令牌预支了令牌,本次请求令牌结束后需要等待一定时间(上次预支,本次等待 )
Golang 实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 package main import ( "fmt" "math" "sync" "time" ) type TokenBucket struct { LastTime time.Time Capacity float64 Rate float64 Tokens float64 Lck *sync.Mutex } func NewTokenBucket (rate int , cap int ) *TokenBucket { return &TokenBucket{ LastTime: time.Now(), Capacity: float64 (cap ), Rate: float64 (rate), Tokens: float64 (cap ), Lck: new (sync.Mutex), } } func (r *TokenBucket) getToken() bool { now := time.Now() r.Lck.Lock() defer r.Lck.Unlock() tokens := math.Min(r.Capacity, r.Tokens+now.Sub(r.LastTime).Seconds()*r.Rate) r.Tokens = tokens if tokens < 1 { return false } else { r.Tokens -= 1 r.LastTime = now return true } } func main () { r := NewTokenBucket(2 , 5 ) for i := 0 ; i < 20 ; i++ { ok := r.getToken() if ok { fmt.Println("pass " , i) } else { fmt.Println("limit " , i) } time.Sleep(100 * time.Millisecond) } }
Java 实现 Java 实现可以参考 Guava 包中的 RateLimiter
,该类中实现基于延迟计算和预消费。上面 Redis Lua 其实也是基于 RateLimiter
的算法实现的。
以下主要看一下核心关键代码,主体代码可以参考上面 Redis 的实现。
1 2 3 4 5 6 7 8 9 10 11 void resync (long nowMicros) { if (nowMicros > nextFreeTicketMicros) { double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); storedPermits = min(maxPermits, storedPermits + newPermits); nextFreeTicketMicros = nowMicros; } }
如上 resync
函数,该函数会在每次获取令牌之前调用。其实现思路为:
若当前时间晚于 nextFreeTicketMicros
,则计算该段时间内可以生成多少令牌
计算该段时间内可以生成多少令牌,将生成的令牌加入令牌桶中并更新数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 final long reserveEarliestAvailable (int requiredPermits, long nowMicros) { resync(nowMicros); long returnValue = nextFreeTicketMicros; double storedPermitsToSpend = min(requiredPermits, this .storedPermits); double freshPermits = requiredPermits - storedPermitsToSpend; long waitMicros = storedPermitsToWaitTime(this .storedPermits, storedPermitsToSpend) + (long ) (freshPermits * stableIntervalMicros); this .nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); this .storedPermits -= storedPermitsToSpend; return returnValue; }
该函数用于获取 requiredPermits
个令牌,并返回需要等待到的时间点
storedPermitsToSpend
为桶中可以消费的令牌数
freshPermits
为还需要的(需要补充的)令牌数,根据该值计算需要等待的时间,追加并更新到 nextFreeTicketMicros
该函数的返回是更新前的(上次请求计算的)nextFreeTicketMicros
,而不是本次更新的 nextFreeTicketMicros
。
本次请求需要为上次请求的预消费行为埋单 :这次还需要的令牌数 freshPermits
被累加到下次 nextFreeTicketMicros
中了。下次再获取新的令牌时需要多等一会,为上一次埋单。
引用
分布式限流:基于 Redis 实现
分布式系统关注点——限流该怎么做?
限流技术中的常用算法及其优缺点
分布式系统限流算法分析与实现
Guava RateLimiter 源码解析
接口限流算法:漏桶算法&令牌桶算法