限流实现2

简介: 剩下的几种本来打算能立即写完,没想到一下三个月过去了,很是尴尬。本次主要实现如下两种算法- 令牌桶算法- 漏斗算法

简介

上一篇文章 限流实现1 已经介绍三种限流方案

  • 随机拒流
  • 计数器方式
  • 基于滑动时间窗口限流

剩下的几种本来打算能立即写完,没想到一下三个月过去了,很是尴尬。本次主要实现如下两种算法

  • 令牌桶算法
  • 漏斗算法

算法的具体实现可以在github上查看 https://github.com/shidawuhen/asap/tree/master/controller/limit

下面先来介绍一下这两种算法

令牌桶算法

算法说明

令牌桶算法(Token Bucket):是网络流量整形(Traffic Shaping)和速率限制(Rate Limiting)中最常使用的一种算法。令牌桶算法示意图如下所示:

一般的流程为:

a. 按特定的速率向令牌桶投放令牌

b. 根据预设的匹配规则先对报文进行分类,不符合匹配规则的报文不需要经过令牌桶的处理,直接发送;

c. 符合匹配规则的报文,则需要令牌桶进行处理。当桶中有足够的令牌则报文可以被继续发送下去,同时令牌桶中的令牌量按报文的长度做相应的减少;

d. 当令牌桶中的令牌不足时,报文将不能被发送,只有等到桶中生成了新的令牌,报文才可以发送。这就可以限制报文的流量只能是小于等于令牌生成的速度,达到限制流量的目的。

大小固定的令牌桶可自行以恒定的速率源源不断地产生令牌。如果令牌不被消耗,或者被消耗的速度小于产生的速度,令牌就会不断地增多,直到把桶填满。后面再产生的令牌就会从桶中溢出。最后桶中可以保存的最大令牌数永远不会超过桶的大小。

令牌桶能允许突发,是因为令牌桶一般有两个值,一个是桶容量,一个是单位时间投放令牌量。如果桶容量大于令牌单位时间投放量,而且单位时间消耗量比投放量少,则令牌数量最终会达到桶容量最大值。此时如果大量请求到达,会将所有令牌消耗,实现了允许突发的效果。

算法实现

该算法有几个核心点

  1. 因为要更新令牌数量,所以需要加锁
  2. 定时向桶中放入令牌,有两种方式,一种是起goroutine,定时投放,另一种是在判断是否还有足够令牌的时候,根据投放情况进行投放。这次实现使用的是第二种方法,整个架构会更简单一些。
package limit

import (
   "github.com/gin-gonic/gin"
   "net/http"
   "sync"
   "time"
)

// @Tags limit
// @Summary 令牌桶拒流
// @Produce  json
// @Success 200 {string} string "成功会返回ok"
// @Failure 502 "失败返回reject"
// @Router /limit/tokenreject [get]
type TokenBucket struct {
   rate         int64 //固定的token放入速率, r/s
   capacity     int64 //桶的容量
   tokens       int64 //桶中当前token数量
   lastTokenSec int64 //桶上次放token的时间戳 s

   lock sync.Mutex
}

func (l *TokenBucket) Allow() bool {
   l.lock.Lock()
   defer l.lock.Unlock()

   now := time.Now().Unix()
   l.tokens = l.tokens + (now-l.lastTokenSec)*l.rate // 先添加令牌
   if l.tokens > l.capacity {
      l.tokens = l.capacity
   }
   l.lastTokenSec = now
   if l.tokens > 0 {
      // 还有令牌,领取令牌
      l.tokens--
      return true
   } else {
      // 没有令牌,则拒绝
      return false
   }
}

func (l *TokenBucket) Set(r, c int64) {
   l.rate = r
   l.capacity = c
   l.tokens = 0
   l.lastTokenSec = time.Now().Unix()
}

func CreateTokenBucket()*TokenBucket{
   t := &TokenBucket{}
   t.Set(1,5)
   return t
}

var tokenBucket *TokenBucket = CreateTokenBucket()

func TokenReject(c *gin.Context) {
   //fmt.Println(tokenBucket.tokens)
   if !tokenBucket.Allow() {
      c.String(http.StatusBadGateway, "reject")
      return
   }
   c.String(http.StatusOK, "ok")
}

漏桶算法

算法说明

漏桶作为计量工具(The Leaky Bucket Algorithm as a Meter)时,可以用于流量整形(Traffic Shaping)和流量控制(TrafficPolicing),漏桶算法的描述如下:

  • 一个固定容量的漏桶,按照常量固定速率流出水滴;
  • 如果桶是空的,则不需流出水滴;
  • 可以以任意速率流入水滴到漏桶;
  • 如果流入水滴超出了桶的容量,则流入的水滴溢出了(被丢弃),而漏桶容量是不变的。

漏桶法限流很好理解,假设我们有一个水桶按固定的速率向下方滴落一滴水,无论有多少请求,请求的速率有多大,都按照固定的速率流出,对应到系统中就是按照固定的速率处理请求。

示意图如下:

算法实现

查阅了一下相关资料,主要的算法实现有三种。学习完这几种实现后,我怀疑我是不是理解错了,感觉还没有计数拒流方便好用。如果我理解的有误,大家也可以告诉我。

这三种方式为:

令牌桶算法变种

桶的大小就是单位时间内能流出的最大量,这种就不写了。

计数拒流变种

该方法在指定时间将可用空间置为初始值。

package limit

import (
    "fmt"
    "github.com/gin-gonic/gin"
    "net/http"
    "sync"
    "time"
)

type LeakyBucket struct {
    // 容量
    capacity  int64
    // 剩余大小
    remaining int64
    // 下一次的重置容量时间
    reset     time.Time
    // 重置容量时间间隔
    rate      time.Duration
    mutex     sync.Mutex
}
func (b *LeakyBucket) Allow() bool {
    b.mutex.Lock()
    defer b.mutex.Unlock()
    if time.Now().After(b.reset) { // 需要重置
        b.reset = time.Now().Add(b.rate) // 更新时间
        b.remaining = b.capacity // 重置剩余容量
    }
    fmt.Println(b.remaining)
    if b.remaining > 0 { // 判断是否能过
        b.remaining--
        return true
    }
    return false
}

func (b *LeakyBucket) Set(r time.Duration, c int64) {
    b.rate = r
    b.capacity = c
    b.remaining = c
    b.reset = time.Now().Add(b.rate)
}

func CreateLeakyBucket(r time.Duration,c int64) *LeakyBucket {
    t := &LeakyBucket{}
    t.Set(r, c)
    return t
}

var leakyBucket *LeakyBucket = CreateLeakyBucket(time.Second*2,10)
func LeakyReject(c *gin.Context) {
    if !leakyBucket.Allow() {
        c.String(http.StatusBadGateway, "reject")
        return
    }
    c.String(http.StatusOK, "ok")
}

真固定速率

这个算法的的实现,根据uber团队开源的 github.com/uber-go/ratelimit 而来。

该实现保证如果有大量请求,每一个请求会按照规定的时间间隔执行。如设置1s处理100个请求,则每10ms会处理一个。

如果长时间没有请求,仍会产生请求在短时间内被处理完毕的情况。当然对于这种情况可以很容易修复,大家可以思考一下如何修改。

package limit

import (
   "fmt"
   "github.com/andres-erbsen/clock"
   "github.com/gin-gonic/gin"
   "net/http"
   "sync"
   "time"
)

//真固定速率
type Clock interface {
   Now() time.Time
   Sleep(time.Duration)
}

type limiter struct {
   sync.Mutex               // 锁
   last       time.Time     // 上一次的时刻
   sleepFor   time.Duration // 需要等待的时间
   perRequest time.Duration // 每次的时间间隔
   maxSlack   time.Duration // 最大的富余量
   clock      Clock         // 时钟
}

// Take 会阻塞确保两次请求之间的时间走完
// Take 调用平均数为 time.Second/rate.
func (t *limiter) Take() time.Time {
   t.Lock()
   defer t.Unlock()

   now := t.clock.Now()

   // 如果是第一次请求就直接放行
   if t.last.IsZero() {
      t.last = now
      return t.last
   }

   // sleepFor 根据 perRequest 和上一次请求的时刻计算应该sleep的时间
   // 由于每次请求间隔的时间可能会超过perRequest, 所以这个数字可能为负数,并在多个请求之间累加
   t.sleepFor += t.perRequest - now.Sub(t.last)
   fmt.Println(t.sleepFor)
   // 我们不应该让sleepFor负的太多,因为这意味着一个服务在短时间内慢了很多随后会得到更高的RPS。
   if t.sleepFor < t.maxSlack {
      t.sleepFor = t.maxSlack
   }

   // 如果 sleepFor 是正值那么就 sleep
   if t.sleepFor > 0 {
      t.clock.Sleep(t.sleepFor)
      t.last = now.Add(t.sleepFor)
      t.sleepFor = 0
   } else {
      t.last = now
   }
   return t.last
}

func NewLimiter(rate int) *limiter {
   l := &limiter{
      perRequest: time.Second / time.Duration(rate),
      maxSlack:   -10 * time.Second / time.Duration(rate),
   }

   if l.clock == nil {
      l.clock = clock.New()
   }
   return l
}

var rl = NewLimiter(100) // per second,每秒100个请求
func LeakyRejectFixedRate(c *gin.Context) {
   prev := time.Now()
   for i := 0; i < 10; i++ {
      now := rl.Take()
      fmt.Println(i, now.Sub(prev))
      prev = now
   }
   c.String(http.StatusOK, "ok")
}

演示结果如下:

总结

学习了令牌桶和漏桶算法,结合这几年工作中的具体场景,感觉令牌桶算法的实用价值更大一些。

下面是令牌桶和漏桶对比:

  • 令牌桶是按照固定速率往桶中添加令牌,请求是否被处理需要看桶中令牌是否足够,当令牌数减为零时则拒绝新的请求;
  • 漏桶则是按照常量固定速率流出请求,流入请求速率任意,当流入的请求数累积到漏桶容量时,则新流入的请求被拒绝;
  • 令牌桶限制的是平均流入速率(允许突发请求,只要有令牌就可以处理,支持一次拿3个令牌,4个令牌),并允许一定程度突发流量;
  • 漏桶限制的是常量流出速率(即流出速率是一个固定常量值,比如都是1的速率流出,而不能一次是1,下次又是2),从而平滑突发流入速率;
  • 令牌桶允许一定程度的突发,而漏桶主要目的是平滑流入速率;
  • 两个算法实现可以一样,但是方向是相反的,对于相同的参数得到的限流效果是一样的。

最后,给大家展示一下各种限流算法的比较

资料

  1. 限频方案比较
  2. 高并发系统限流-漏桶算法和令牌桶算法
  3. 令牌桶与漏桶算法
  4. 漏桶、令牌桶限流的Go语言实现

最后

大家如果喜欢我的文章,可以关注我的公众号(程序员麻辣烫)

我的个人博客为:https://shidawuhen.github.io/

往期文章回顾:

技术

  1. 秒杀系统
  2. 分布式系统与一致性协议
  3. 微服务之服务框架和注册中心
  4. Beego框架使用
  5. 浅谈微服务
  6. TCP性能优化
  7. 限流实现1
  8. Redis实现分布式锁
  9. Golang源码BUG追查
  10. 事务原子性、一致性、持久性的实现原理
  11. CDN请求过程详解
  12. 常用缓存技巧
  13. 如何高效对接第三方支付
  14. Gin框架简洁版
  15. InnoDB锁与事务简析
  16. 算法总结

读书笔记

  1. 敏捷革命
  2. 如何锻炼自己的记忆力
  3. 简单的逻辑学-读后感
  4. 热风-读后感
  5. 论语-读后感
  6. 孙子兵法-读后感

思考

  1. 项目流程管理
  2. 对项目管理的一些看法
  3. 对产品经理的一些思考
  4. 关于程序员职业发展的思考
  5. 关于代码review的思考
  6. Markdown编辑器推荐-typora
相关文章
|
2月前
SaaS自助建站平台有哪些?自助建站哪个好
网站建设途径丰富多样,选择时需综合考量自身的技术能力、预算情况、时间规划以及功能需求。
180 4
|
机器学习/深度学习 人工智能 算法
一文让你了解AI产品的测试 评价人工智能算法模型的几个重要指标
一文让你了解AI产品的测试 评价人工智能算法模型的几个重要指标
1909 0
一文让你了解AI产品的测试 评价人工智能算法模型的几个重要指标
|
存储 缓存 NoSQL
Spring-Boot实战|分布式缓存-JPA的二级缓存-Redis
以JPA为ORM框架的微服务,默认是二级缓存是关闭的。因为在分布式集群架构下,本地的二级缓存必然会带来多个微服务实例缓存不一致问题。将二级缓存移交给第三方中间件可以很好的解决缓存不一致问题。而Redis一款高性能的K-V存储中间件,在保证缓存一致性的同时,还能提供高性能,高可用的特性。本篇文章就是基于开源框架Hibernate-Redis,将Redis作为为JPA的二级缓存实现
5367 0
Spring-Boot实战|分布式缓存-JPA的二级缓存-Redis
|
4月前
|
传感器 人工智能 安全
法思诺创新洞察:TRIZ方法如何助力企业卡脖子实现关键技术自主可控?实战案例解析!
法思诺创新学院运用TRIZ系统化创新方法,助力企业破解“卡脖子”技术难题,如高精度传感器温漂与工业泵振动等,实现关键技术自主可控,提升产品竞争力与市场突破能力。
277 0
|
机器学习/深度学习 算法 安全
随机性、熵与随机数生成器:解析伪随机数生成器(PRNG)和真随机数生成器(TRNG)
随机性在密码学、仿真和机器学习等领域中至关重要,本文探讨了随机性、熵的概念以及伪随机数生成器(PRNG)和真随机数生成器(TRNG)的原理和应用。PRNG通过算法生成看似随机的序列,适用于高效需求;TRNG利用物理过程生成真正随机数,适用于高安全需求。文章还讨论了两者的协同应用及其面临的挑战。
782 5
随机性、熵与随机数生成器:解析伪随机数生成器(PRNG)和真随机数生成器(TRNG)
|
9月前
|
弹性计算 Linux 云计算
阿里云操作系统控制台——ECS操作及云计算应用实践
本文详细介绍了云服务器ECS的使用流程,包括开通服务、系统配置、权限管理、组件安装及内存全景诊断等关键步骤。通过开通阿里云操作系统服务、授予RAM用户权限和安装必要组件,可实现对服务器的有效管理与维护。在内存诊断部分,展示了如何发起诊断并解析结果,帮助精准定位内存问题。此外,文章还讲解了利用ECS训练模型的操作方法,从上传文件到终端命令执行,直至完成模型训练。最后总结指出,掌握这些技能不仅提升了对云服务器架构的理解,还为实际业务提供了高效解决方案,展现了ECS在数据处理与分析中的重要价值。
484 8
阿里云操作系统控制台——ECS操作及云计算应用实践
|
11月前
|
存储 IDE 开发工具
ModelScope魔搭25年1月版本发布月报
随着2025年帷幕的缓缓拉开,ModelScope团队怀着新年新气象的美好期许,为广大开发者带来了1月份的重磅更新。
504 13
|
存储 Java
|
存储 监控 安全
建立有效的反馈机制
建立有效的反馈机制
1225 6