前言
我有一个朋友, 最近用到了限流方式, 主要是限制前台用户请求接口次数, 那么就让我们来聊一聊最常见的限流方式吧
正文
假设限制10s最多请求二十次, redis_key为user_id:api
为了方便下面就直接用python实现了
固定窗口:
解释
即固定死10s时间段, 在这期间只接受二十次请求, 请求打满了就只能登下一段10s, 可以用incr实现,接受请求调用incr key, 如果value=1, 则证明为第一次请求, 使用exipre加上10s过期时间,如果大于20, 则拒绝该请求
缺点
有可能会遇到突刺情况, 即在9.9s和10.1s各发出20个请求, 这就相当于10s内发了40个了
实现
``` import redis client = redis.Redis(host="127.0.0.1", port=6379) num = client.incr("user_id:api") if num == 1: client.expire(num, 10) elif num > 20: print("请求超过次数") ```
滑动窗口:
解释
即时间是流动的, 一直保持者当前时间往后推10s的一个滑动窗口, 可以用zset实现接受请求调用zadd添加窗口数据, score为当前时间, value为一个唯一值(一般用毫秒时间戳), 然后删除(zremrangebyscore)当前时间10s前的数据, 在获取(zcard)当前窗口内的请求次数,进行判断, 记得也需要加上一个过期时间, 避免空间占用问题
实现:
``` import redis, time client = redis.Redis(host="127.0.0.1", port=6379) def is_action_allowed(key, period, max_count): now_ts = int(time.time() * 1000) # 毫秒时间戳 with client.pipeline() as pipe: # client 是 StrictRedis 实例 # 记录行为 pipe.zadd(key, now_ts, now_ts) # value 和 score 都使用毫秒时间戳 # 移除时间窗口之前的行为记录,剩下的都是时间窗口内的 pipe.zremrangebyscore(key, 0, now_ts - period * 1000) # 获取窗口内的行为数量 pipe.zcard(key) # 设置 zset 过期时间,避免冷用户持续占用内存 # 过期时间应该等于时间窗口的长度,再多宽限 1s pipe.expire(key, period + 1) # 批量执行 _, _, current_count, _ = pipe.execute() # 比较数量是否超标 return current_count <= max_count is_action_allowed("user_id:api", 10, 20) ```
漏桶算法:
解释
即一个水桶, 进水(接受请求)的速率不限, 出水(处理请求)的速率是一定的, 而且水桶大小也是有限制的, 也有可能造成水桶溢出
缺点
因为速率是一样的, 所以假如有突发大量请求的话就不是很合适了
实现
``` import time class Funnel(object): def __init__(self, capacity, leaking_rate): self.capacity = capacity # 漏斗容量 self.leaking_rate = leaking_rate # 漏嘴流水速率 self.left_quota = capacity # 漏斗剩余空间 self.leaking_ts = time.time() # 上一次漏水时间 def make_space(self): now_ts = time.time() delta_ts = now_ts - self.leaking_ts # 距离上一次漏水过去了多久 delta_quota = delta_ts * self.leaking_rate # 又可以腾出不少空间了 if delta_quota < 1: # 腾的空间太少,那就等下次吧 return self.left_quota += delta_quota # 增加剩余空间 self.leaking_ts = now_ts # 记录漏水时间 if self.left_quota > self.capacity: # 剩余空间不得高于容量 self.left_quota = self.capacity def watering(self, quota): self.make_space() if self.left_quota >= quota: # 判断剩余空间是否足够 self.left_quota -= quota return True return False funnels = {} # 所有的漏斗 # capacity 漏斗容量 # leaking_rate 漏嘴流水速率 quota/s def is_action_allowed(key, capacity, leaking_rate): funnel = funnels.get(key) if not funnel: funnel = Funnel(capacity, leaking_rate) funnels[key] = funnel return funnel.watering(1) print(is_action_allowed("user_id:api", 15, 0.5)) ```
令牌桶算法:
解释
即也是一个桶, 按照设定的速率往桶里放令牌, 10s二十次即1s放两个令牌(允许处理两次请求), 然后请求来之后必须从桶里取出来令牌才可以进行处理, 没有令牌则选择拒绝或等待
实现
利用redis-cell可以实现, 这块其实有一个问题, 都说的是redis-cell是用了漏桶算法实现的, 但是我用的时候其实是按照令牌桶的方式来用的(即我拿到令牌才可以去处理请求)…这块就比较尴尬了, 有大神知道的话可以帮我纠正一下
redis-cell 是redis的一个插件, 这里我用于测试直接用docker装一个
docker search redis-cell docker pull hsz1273327/redis-cell docker run -d -p 6379:6379 --name redis hsz1273327/redis-cell:latest
其实redis-cell就一个命令 cl.throttle , 这里还是用上面的例子举例
user_id:api 15 20 10 1 15 是桶的容量 -- 即同时能存在多少个令牌 20 就是速率限制了 10 单位时间(s) 1 一次取出几个令牌, 默认是一
命令返回值
本地:0>cl.throttle user_id:api 15 20 10 1 1) "0" // 0是允许, 1是拒绝 2) "16" // 桶容量 3) "15" // 剩余令牌 4) "-1" // 拒绝的话需要等待多长时间再试(这就很贴心了) 5) "0" // 多长时间令牌放满