一份golang令牌桶使用攻略(juju/ratelimit)
使用场景
令牌桶的一个主要使用场景是限流。
程序以一定的速率生产令牌加入到令牌桶中。
每个请求到达时都会尝试从令牌桶中获取一块令牌, 如果获取令牌失败(令牌桶为空)则不处理该请求, 以此达到限流的目的。
juju/ratelimit使用
juju/ratelimit是开源的、golang语言实现的高效令牌桶,代码简洁,在本文写作时该项目有2.4k的start。
项目地址是 github.com/juju/ratelimit
api介绍
创建令牌锁
先看三个创建令牌锁的方法及它们的区别:
- NewBucket
创建一个令牌桶, 设置填充频率(fillInterval)和初始容量(capacity), 每填充频率的时间会向令牌桶中加入1块令牌。
func NewBucket(fillInterval time.Duration, capacity int64) *Bucket
//令牌桶容量为1,每10ms填充一块令牌bucket := ratelimit.NewBucket(10 * time.Millisecond, 1)
- NewBucketWithQuantum
创建一个令牌桶, 设置填充频率(fillInterval)、初始容量(capacity)、每秒填充的令牌数(quantum), 每填充频率的时间会向令牌桶中加入quantum块令牌。
func NewBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) *Bucket
//令牌桶容量为3, 每10ms填充3块令牌bucket := ratelimit.NewBucketWithQuantum(10 * time.Millisecond, 3, 3)
- NewBucketWithRate
创建一个令牌桶, 设置每秒速率(rate)、初始容量(capacity)。
func NewBucketWithRate(rate float64, capacity int64) *Bucket
//令牌桶容量为1, 每秒限速100次bucket := ratelimit.NewBucketWithRate(100, 1)
获取令牌
Take 非阻塞, 返回需等待的时间
func (tb *Bucket) Take(count int64) time.Duration
TakeAvailable 非阻塞, 令牌数不满足需求时, 返回可用的令牌数
func (tb *Bucket) TakeAvailable(count int64) int64
TakeMaxDuration 非阻塞, 返回需等待时间, 超过最大时间返回 0, false
func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool)
Wait 阻塞, 直到拿到令牌
func (tb *Bucket) Wait(count int64)
WaitMaxDuration 阻塞 若在最大等待时间内能拿到令牌则阻塞, 否则立即返回false
func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool
其它方法
Available 返回当前可用令牌数
func (tb *Bucket) Available() int64
Rate 返回每秒限流速率
func (tb *Bucket) Rate() float64
关键源码分析
令牌桶结构是怎样的
type Bucket struct { //Clock提供了获取当前时间、sleep指定时间的方法clock Clock//桶被创建的时间, 当前属于第几个tick也是基于这个起始时间来计算startTime time.Time// 桶容量capacity int64// 每个tick向桶填充的令牌数quantum int64// 填充间隔fillInterval time.Duration// 桶方法使用的互斥锁, 保障线程安全 mu sync.Mutex// 桶内可用的令牌数// 当有操作等待令牌的时候, 这个数值会变成负数availableTokens int64// 上一个填充tick// 计算当前tick与上一个tick的差值 得出需要填充的令牌数latestTick int64}
按一定的速率向桶中填充令牌是如何实现的?
juju/ratelimit没有启用额外的线程定时向桶中填充令牌, 而是在外部调用令牌锁的方法时触发一次填充方法,根据当前时间和令牌锁的创建时间的差值计算出是否需要填充、需要填充的数量。
填充令牌的方法, 外部调用令牌锁的方法时会触发
func (tb *Bucket) adjustavailableTokens(tick int64) { //令牌桶结构中记录了上一个填充周期的值 lastTick := tb.latestTick tb.latestTick = tick //如果桶是满的直接返回 if tb.availableTokens >= tb.capacity { return } //需要填充的数量是 (本周期数 - 上次填充周期数) * 单周期填充数 tb.availableTokens += (tick - lastTick) * tb.quantum //填充数量不得超过桶的容量 if tb.availableTokens > tb.capacity { tb.availableTokens = tb.capacity } return}
获取令牌的关键代码
Take(), TakeMaxDuration(), Wait(), WaitMaxDuration()这几个方法都是通过调用take()这个内部方法实现的
func (tb *Bucket) take(now time.Time, count int64, maxWait time.Duration) (time.Duration, bool) { if count = 0 { tb.availableTokens = avail return 0, true } //计算出一个endTick, 在未来的endTick到达时,令牌数将不再是负的 endTick := tick + (-avail+tb.quantum-1)/tb.quantum //计算endTick的时间点 endTime := tb.startTime.Add(time.Duration(endTick) * tb.fillInterval) //需要等待的时间时endTime - now waitTime := endTime.Sub(now) if waitTime > maxWait { return 0, false } //更新availableTokens, 可能为负值 tb.availableTokens = avail //返回等待时间, 获取成功 return waitTime, true}
使用令牌锁的简单例子
代码:
import ("github.com/juju/ratelimit""time")func main() { //创建一个令牌桶初始容量为1, 每10ms填充3个令牌 bucket := ratelimit.NewBucketWithQuantum(10 * time.Millisecond, 1, 3) //程序运行长3秒 endTime := time.Now().Add(3 * time.Second) //打印桶的每秒限速频率(预期300/s) println("bucket rate:" , bucket.Rate(), "/s") //使用一个变量记录获取令牌的总数 var tockensCount int64 = 0 for { //每次拿1块令牌, 成功返回1, 失败返回0 tocken := bucket.TakeAvailable(1) tockensCount += tocken if(time.Now().After(endTime)) { println("tockensCount: ", tockensCount) return; } time.Sleep(5 * time.Millisecond) }}
程序运行结果:
bucket rate: +3.000000e+002 /stockensCount: 301
tockensCount为什么是301而不是300:
因为创建的令牌桶初始容量为1。桶初始化完成后里面已经有一块令牌了, 可以立即拿到这块令牌不需要等待填充。
在后面的3000ms共填充了300块令牌。
使用令牌桶时要注意, 由于令牌桶是有”容量“的, 允许一定的瞬时流量, 对限制速率有严格要求的时候要小心设置容量与填充速度, 并进行实测验证。