### 令牌桶算法(Token Bucket)
令牌桶和漏桶的原理类似,不过漏桶是**定速地流出**,而令牌桶是**定速地往桶里塞入令牌**,然后请求只有拿到了令牌才能通过,之后再被服务器处理。当然令牌桶的大小也是有限制的,假设桶里的令牌满了之后,定速生成的令牌会丢弃。规则:
- 定速的往桶内放入令牌
- 令牌数量超过桶的限制,丢弃
- 请求来了先向桶内索要令牌,索要成功则通过被处理,反之拒绝
可以看出令牌桶在应对突发流量的时候,桶内假如有 100 个令牌,那么这 100 个令牌可以马上被取走,而不像漏桶那样匀速的消费。所以在**应对突发流量的时候令牌桶表现的更佳**。
令牌桶算法伪代码实现如下:
```java
/**
* 每秒处理数(放入令牌数量)
*/
private long putTokenRate;
/**
* 最后刷新时间
*/
private long refreshTime;
/**
* 令牌桶容量
*/
private long capacity;
/**
* 当前桶内令牌数
*/
private long currentToken = 0L;
/**
* 漏桶算法
* @return
*/
boolean tokenBucketTryAcquire() {
long currentTime = System.currentTimeMillis(); //获取系统当前时间
long generateToken = (currentTime - refreshTime) / 1000 * putTokenRate; //生成的令牌 =(当前时间-上次刷新时间)* 放入令牌速率
currentToken = Math.min(capacity, generateToken + currentToken); // 当前令牌数量 = 之前的桶内令牌数量+放入的令牌数量
refreshTime = currentTime; // 刷新时间
//桶里面还有令牌,请求正常处理
if (currentToken > 0) {
currentToken--; //令牌数量-1
return true;
}
return false;
}
```
### 分布式限流
计数器限流的核心是 `INCRBY` 和 `EXPIRE` 指令,测试用例在此,通常,计数器算法容易出现不平滑的情况,瞬间的 qps 有可能超过系统的承载。
```lua
-- 获取调用脚本时传入的第一个 key 值(用作限流的 key)
local key = KEYS[1]
-- 获取调用脚本时传入的第一个参数值(限流大小)
local limit = tonumber(ARGV[1])
-- 获取计数器的限速区间 TTL
local ttl = tonumber(ARGV[2])
-- 获取当前流量大小
local curentLimit = tonumber(redis.call('get', key) or "0")
-- 是否超出限流
if curentLimit + 1 > limit then
-- 返回 (拒绝)
return 0
else
-- 没有超出 value + 1
redis.call('INCRBY', key, 1)
-- 如果 key 中保存的并发计数为 0,说明当前是一个新的时间窗口,它的过期时间设置为窗口的过期时间
if (current_permits == 0) then
redis.call('EXPIRE', key, ttl)
end
-- 返回 (放行)
return 1
end
```
此段 Lua 脚本的逻辑很直观:
- 通过 `KEYS[1]` 获取传入的 key 参数,为某个限流指标的 key
- 通过 `ARGV[1]` 获取传入的 limit 参数,为限流值
- 通过 `ARGV[2]` 获取限流区间 ttl
- 通过 `redis.call`,拿到 key 对应的值(默认为 0),接着与 limit 判断,如果超出表示该被限流;否则,使用 `INCRBY` 增加 1,未限流(需要处理初始化的情况,设置 `TTL`)
不过上面代码是有问题的,如果 key 之前存在且未设置 `TTL`,那么限速逻辑就会永远生效了(触发 limit 值之后),使用时需要注意。
令牌桶算法也是 Guava 中使用的算法,同样采用计算的方式,将时间和 Token 数目联系起来:
```lua
-- key
local key = KEYS[1]
-- 最大存储的令牌数
local max_permits = tonumber(KEYS[2])
-- 每秒钟产生的令牌数
local permits_per_second = tonumber(KEYS[3])
-- 请求的令牌数
local required_permits = tonumber(ARGV[1])
-- 下次请求可以获取令牌的起始时间
local next_free_ticket_micros = tonumber(redis.call('hget', key, 'next_free_ticket_micros') or 0)
-- 当前时间
local time = redis.call('time')
-- time[1] 返回的为秒,time[2] 为 ms
local now_micros = tonumber(time[1]) * 1000000 + tonumber(time[2])
-- 查询获取令牌是否超时(传入参数,单位为 微秒)
if (ARGV[2] ~= nil) then
-- 获取令牌的超时时间
local timeout_micros = tonumber(ARGV[2])
local micros_to_wait = next_free_ticket_micros - now_micros
if (micros_to_wait> timeout_micros) then
return micros_to_wait
end
end
-- 当前存储的令牌数
local stored_permits = tonumber(redis.call('hget', key, 'stored_permits') or 0)
-- 添加令牌的时间间隔(1000000ms 为 1s)
-- 计算生产 1 个令牌需要多少微秒
local stable_interval_micros = 1000000 / permits_per_second
-- 补充令牌
if (now_micros> next_free_ticket_micros) then
local new_permits = (now_micros - next_free_ticket_micros) / stable_interval_micros
stored_permits = math.min(max_permits, stored_permits + new_permits)
-- 补充后,更新下次可以获取令牌的时间
next_free_ticket_micros = now_micros
end
-- 消耗令牌
local moment_available = next_free_ticket_micros
-- 两种情况:required_permits<=stored_permits 或者 required_permits>stored_permits
local stored_permits_to_spend = math.min(required_permits, stored_permits)
local fresh_permits = required_permits - stored_permits_to_spend;
-- 如果 fresh_permits>0,说明令牌桶的剩余数目不够了,需要等待一段时间
local wait_micros = fresh_permits * stable_interval_micros
-- Redis 提供了 redis.replicate_commands() 函数来实现这一功能,把发生数据变更的命令以事务的方式做持久化和主从复制,从而允许在 Lua 脚本内进行随机写入
redis.replicate_commands()
-- 存储剩余的令牌数:桶中剩余的数目 - 本次申请的数目
redis.call('hset', key, 'stored_permits', stored_permits - stored_permits_to_spend)
redis.call('hset', key, 'next_free_ticket_micros', next_free_ticket_micros + wait_micros)
redis.call('expire', key, 10)
-- 返回需要等待的时间长度
-- 返回为 0(moment_available==now_micros)表示桶中剩余的令牌足够,不需要等待
return moment_available - now_micros
```