使用redis实现分布式令牌桶算法:

团队接了个根据告警的进行处理的需求,需要限制这个处理的速度,比如说一个小时只能最多处理三个,防止在晚上出现大量不可控变更影响。令牌桶算法原理可以参照,k8s限速队列、令牌桶原理可以参照k8s限速队列


问题描述

限速队列在单实例的环境下用go可以很方便实现,大概是使用一个定时任务来轮询。

  1. 设定一个根据补充间隔设定定时任务
  2. 要是bucket是满的就直接更新时间并返回,bucket不满就加一个token
  3. 要是只在加token的时候更新时间,那会引发PeriodSec期间内生成Limit+1个token的情况
    稍微不符合算法。
type RateLimit struct {
	UpdateTimestamp int `json:"update_timestamp"`
	PeriodSec    int    `json:"period_sec"`
	Bucket       int    `json:"bucket"`
	Limit        int    `json:"limit"`
	lock 		 syna.Lock
}
func (r *RateLimit)Run(ctx *context.Context){
	tikcer := time.Tick(time.Unix(r.PeriodSec,0)) // 一秒检查一次
	for c := range ticker{
		select {
			case <-ctx.Done():
				return 
			default:
				
		}
	}
}
func (r *RateLimit)CheckBucket(){
	r.Lock()
	defer r.Unlock()
	previous := r.UpdateTimestamp
	r.UpdateTimestamp = nowTime
	if r.Bucket >= r.Limit{
		return
	} 
	nowTime := time.Now.Unix()
	if nowTime<=previous+r.PeriodSec{
		r.Bucket++
	}
}
func (r *RateLimit)GetToken(n int)bool{
	r.Lock()
	defer r.Unlock()
	if r.Bucket < n{
		return false
	} 
	r.Bucket -= n
	return true
}

但是问题不在这,因为我们消费侧是多实例部署grpc服务,单实例的限速队列几乎不满足需求,因为多实例的回调时随机的,一方面限速比较难管理,其限速状态也不一定能够保证。
所以需要一个实现一个分布式bucket算法。但又衍生出几个问题,怎么共享数据,怎么定时管理数据,处理互斥问题


原因分析:

共享数据上,可以使用redis hash来存储降速队列对象,这样可以保证数据唯一性以及共享问题

关于定时管理数据的问题,就网上的方案有两种:

  1. 使用一个进程轮询redis数据进行管理。
  2. 不做额外轮询,仅在请求token的时候,进行bucket的处理。
    两种方案都有一定的缺陷。方法一需要大量的网络请求,多实例情况下还要选主,要是创建多个ratelimit,会有性能影响;方法二存要求请求token时先对bucket进行管理,比方案一更复杂更不好理解。
    这里选择方案二,主要是为了性能,避免使用轮询线程,增加运维负担。方案一选主下个博文讲

互斥问题可以使用分布式锁来解决、事务或者lua。首先锁的性能是最低的,不建议永,然后公司redis组件不支持txpipeline,事务用不了,最后就是lua,后端可重入分布式锁用的就是lua,使用lua性能最高。最后使用lua来做。(lua是万能的,但是确实很难做)


解决方案:

使用lua+不轮询+redis来实现一个分布式令牌桶。
定义令牌桶对象:没什么特别的,就是为了使用redis新加了一个key类型,同时要注意多实例时间是否不对齐

type RateLimitRedis struct {
	UpdateTimestamp int `json:"update_timestamp"`
	PeriodSec    int    `json:"period_sec"`
	Bucket       int    `json:"bucket"`
	Limit        int    `json:"limit"`
	KeyName      string `json:"key_name"`
}

初始化:需要将对象内容保存到redis里面由多实例共享,lua语句中一旦发现该key已经存在了就只做bucket,不做对象初始化。

func (rt *RateLimitRedis) Init() error {
	_, err := GetClient().Eval(initLua, []string{rt.key}, []string{
		strconv.Itoa(rt.Bucket), strconv.Itoa(rt.PeriodSec), strconv.Itoa(rt.Limit),
		strconv.Itoa(rt.UpdateTimestamp), 
	}).Result()
	if err != nil {
		return err
	}
	return nil
}
if redis.call("EXISTS",KEYS[1]) == 1 then
	local tokens=redis.call("HGET",KEYS[1],"bucket")
	if(tonumber(tokens)>tonumber(ARGV[3])) then 
		redis.call("HSET",KEYS[1],"bucket",ARGV[3])
	end
else
	redis.call("HSET",KEYS[1],"bucket",ARGV[1])
end
redis.call("HMSET",KEYS[1],"period_sec",ARGV[2],"limit",ARGV[3],"update_time_sec",ARGV[4])

获取token:主要看luna,第一步将hash的对象数据全都拿出来,第二步计算这个新增token数目更新周期时间,第三步判断当前bucket数是多少,第四步取出token

func (rt *RateLimitRedis) GetToken(n int) bool {
	res, err := GetClient().Eval(getTokenLua, []string{key}, []string{
		strconv.Itoa(n), strconv.Itoa(int(nowTime.Unix()))}).Result()
	if err != nil {
		return false
	}
	r, ok := res.(int)
	if ok && r == 1 {
		return true
	}
	return false
}
local rateLimit=redis.call("HMGET",KEYS[1],"update_time_sec","bucket","limit","period_sec")
local last_time=tonumber(rateLimit[1])
local bucket=tonumber(rateLimit[2])
local limit=tonumber(rateLimit[3])
local period_sec=tonumber(rateLimit[4])

local currBucket=limit;

local tokens=math.floor((tonumber(ARGV[2])-last_time)/period_sec)
if(tokens>0) then
    redis.call("HSET",KEYS[1],"update_time_sec",ARGV[2])
end
local compareBucket=tokens+bucket
currBucket=math.min(compareBucket,limit);

local result=-1
if(currBucket-tonumber(ARGV[1])>0) then
    result=1
    redis.call("HSET",KEYS[1],"bucket",currBucket-ARGV[1])
else
    redis.call("HSET",KEYS[1],"bucket",currBucket)
end

return result

因此,几乎无法开不请求token的情况下知道当前的bucket应该有多少个。

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐