go redis lua实现分布式令牌桶算法
团队接了个根据告警的进行处理的需求,需要限制这个处理的速度,比如说一个小时只能最多处理三个,防止在晚上出现大量不可控变更影响。令牌桶算法原理可以参照,k8s限速队列、令牌桶原理可以参照。
使用redis实现分布式令牌桶算法:
团队接了个根据告警的进行处理的需求,需要限制这个处理的速度,比如说一个小时只能最多处理三个,防止在晚上出现大量不可控变更影响。令牌桶算法原理可以参照,k8s限速队列、令牌桶原理可以参照k8s限速队列。
问题描述
限速队列在单实例的环境下用go可以很方便实现,大概是使用一个定时任务来轮询。
- 设定一个根据补充间隔设定定时任务
- 要是bucket是满的就直接更新时间并返回,bucket不满就加一个token
- 要是只在加token的时候更新时间,那会引发PeriodSec期间内生成Limit+1个token的情况
稍微不符合算法。
type RateLimit struct {
UpdateTimestamp int `json:"update_timestamp"`
PeriodSec int `json:"period_sec"`
Bucket int `json:"bucket"`
Limit int `json:"limit"`
lock syna.Lock
}
func (r *RateLimit)Run(ctx *context.Context){
tikcer := time.Tick(time.Unix(r.PeriodSec,0)) // 一秒检查一次
for c := range ticker{
select {
case <-ctx.Done():
return
default:
}
}
}
func (r *RateLimit)CheckBucket(){
r.Lock()
defer r.Unlock()
previous := r.UpdateTimestamp
r.UpdateTimestamp = nowTime
if r.Bucket >= r.Limit{
return
}
nowTime := time.Now.Unix()
if nowTime<=previous+r.PeriodSec{
r.Bucket++
}
}
func (r *RateLimit)GetToken(n int)bool{
r.Lock()
defer r.Unlock()
if r.Bucket < n{
return false
}
r.Bucket -= n
return true
}
但是问题不在这,因为我们消费侧是多实例部署grpc服务,单实例的限速队列几乎不满足需求,因为多实例的回调时随机的,一方面限速比较难管理,其限速状态也不一定能够保证。
所以需要一个实现一个分布式bucket算法。但又衍生出几个问题,怎么共享数据,怎么定时管理数据,处理互斥问题。
原因分析:
共享数据上,可以使用redis hash来存储降速队列对象,这样可以保证数据唯一性以及共享问题。
关于定时管理数据的问题,就网上的方案有两种:
- 使用一个进程轮询redis数据进行管理。
- 不做额外轮询,仅在请求token的时候,进行bucket的处理。
两种方案都有一定的缺陷。方法一需要大量的网络请求,多实例情况下还要选主,要是创建多个ratelimit,会有性能影响;方法二存要求请求token时先对bucket进行管理,比方案一更复杂更不好理解。
这里选择方案二,主要是为了性能,避免使用轮询线程,增加运维负担。方案一选主下个博文讲
互斥问题可以使用分布式锁来解决、事务或者lua。首先锁的性能是最低的,不建议永,然后公司redis组件不支持txpipeline,事务用不了,最后就是lua,后端可重入分布式锁用的就是lua,使用lua性能最高。最后使用lua来做。(lua是万能的,但是确实很难做)
解决方案:
使用lua+不轮询+redis来实现一个分布式令牌桶。
定义令牌桶对象:没什么特别的,就是为了使用redis新加了一个key类型,同时要注意多实例时间是否不对齐
type RateLimitRedis struct {
UpdateTimestamp int `json:"update_timestamp"`
PeriodSec int `json:"period_sec"`
Bucket int `json:"bucket"`
Limit int `json:"limit"`
KeyName string `json:"key_name"`
}
初始化:需要将对象内容保存到redis里面由多实例共享,lua语句中一旦发现该key已经存在了就只做bucket,不做对象初始化。
func (rt *RateLimitRedis) Init() error {
_, err := GetClient().Eval(initLua, []string{rt.key}, []string{
strconv.Itoa(rt.Bucket), strconv.Itoa(rt.PeriodSec), strconv.Itoa(rt.Limit),
strconv.Itoa(rt.UpdateTimestamp),
}).Result()
if err != nil {
return err
}
return nil
}
if redis.call("EXISTS",KEYS[1]) == 1 then
local tokens=redis.call("HGET",KEYS[1],"bucket")
if(tonumber(tokens)>tonumber(ARGV[3])) then
redis.call("HSET",KEYS[1],"bucket",ARGV[3])
end
else
redis.call("HSET",KEYS[1],"bucket",ARGV[1])
end
redis.call("HMSET",KEYS[1],"period_sec",ARGV[2],"limit",ARGV[3],"update_time_sec",ARGV[4])
获取token:主要看luna,第一步将hash的对象数据全都拿出来,第二步计算这个新增token数目更新周期时间,第三步判断当前bucket数是多少,第四步取出token
func (rt *RateLimitRedis) GetToken(n int) bool {
res, err := GetClient().Eval(getTokenLua, []string{key}, []string{
strconv.Itoa(n), strconv.Itoa(int(nowTime.Unix()))}).Result()
if err != nil {
return false
}
r, ok := res.(int)
if ok && r == 1 {
return true
}
return false
}
local rateLimit=redis.call("HMGET",KEYS[1],"update_time_sec","bucket","limit","period_sec")
local last_time=tonumber(rateLimit[1])
local bucket=tonumber(rateLimit[2])
local limit=tonumber(rateLimit[3])
local period_sec=tonumber(rateLimit[4])
local currBucket=limit;
local tokens=math.floor((tonumber(ARGV[2])-last_time)/period_sec)
if(tokens>0) then
redis.call("HSET",KEYS[1],"update_time_sec",ARGV[2])
end
local compareBucket=tokens+bucket
currBucket=math.min(compareBucket,limit);
local result=-1
if(currBucket-tonumber(ARGV[1])>0) then
result=1
redis.call("HSET",KEYS[1],"bucket",currBucket-ARGV[1])
else
redis.call("HSET",KEYS[1],"bucket",currBucket)
end
return result
因此,几乎无法开不请求token的情况下知道当前的bucket应该有多少个。
更多推荐
所有评论(0)