go 使用configmap通过自建抢占式锁实现 主备
组件背景程序内已经使用了k8s的client端,为了减少其他组件的依赖,所以不使用redis和etcd。采用configmap实现。因为configmap没有锁机制,所以要自己写这个机制,同样ttl机制也需要代码实现。通过corn定时任务,每50秒所有组件抢占一次锁,如果原有leader是default或已经50秒没有更新ttl时间。则进行抢占。否则退出此次任务。每20秒进行一次ttl的更新。确保
·
组件背景
程序内已经使用了k8s的client端,为了减少其他组件的依赖,所以不使用redis和etcd。采用configmap实现。因为configmap没有锁机制,所以要自己写这个机制,同样ttl机制也需要代码实现。
通过corn定时任务,每50秒所有组件抢占一次锁,如果原有leader是default或已经50秒没有更新ttl时间。则进行抢占。否则退出此次任务。每20秒进行一次ttl的更新。确保主一直在工作,同时有备进行争抢。
需要实现的功能
抢占式锁
主备
ttl检查
每次验证后确保服务只启动一个,并且Worker仅运行一次
代码
var Stop chan struct{}\
var IsLeader bool = false
//定时任务
syncLock := cron2.AddFunc("*/50 * * * * *", services.SyncLock)
worker := cron2.AddFunc("*/20 * * * * *", services.HoldLeader)
//抢锁
func SyncLock() {
configMap, err := client.Api.CoreV1().ConfigMaps(namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
logger.Errorf("configmap存在错误")
}
if configMap != nil {
val, _ := configMap.Data["nodeName"]
if val == config.Config.NodeName {
//就是自己,判断一下k8s连接是否还在。
IsLeader = true
//自己掉了,又自己恢复了,未到超时时间,别人还未抢到状态,重新拉起服务
if Stop == nil {
//Worker部分为真正业务代码,实现的是k8s的informer。通过channel进行关闭。\
Worker()
}
} else {
//判断时间戳和当前是否大于50秒,或者是否是default。小于直接返回,大于证明ttl超时就去抢资格
times, _ := configMap.Data["startTime"]
timeNow, _ := strconv.ParseInt(times, 10, 64)
if val == "default" || timeNow+50 <= time.Now().Unix() {
configMap.Data["nodeName"] = config.Config.NodeName
startTimeStr := strconv.FormatInt(time.Now().Unix(), 10)
configMap.Data["startTime"] = startTimeStr
if _, err := client.Api.CoreV1().ConfigMaps(namespace).Update(context.TODO(), configMap, metav1.UpdateOptions{}); err != nil {
logger.Errorf("configmap更新存在存在错误。err: %s", err)
}
//去抢资格,因为configmap没有锁和事务机制。多个组件可能都写入了。写入后等待2秒
time.Sleep(time.Second * 2)
//两秒后在看看最后写入的是不是自己
configMapNow, _ := client.Api.CoreV1().ConfigMaps(namespace).Get(context.TODO(), name, metav1.GetOptions{})
if configMapNow != nil {
valNow, _ := configMapNow.Data["nodeName"]
if valNow == config.Config.NodeName {
//注册成功,证明是抢到的leader,开启所有服务
Worker()
} else {
//抢占失败,关闭通道。
IsLeader = false
if Stop != nil {
close(Stop)
Stop = nil
}
}
}
}
}
}
}
//续时自己是leader
func HoldLeader() {
if IsLeader {
configMap, err := client.Api.CoreV1().ConfigMaps(namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
logger.Errorf("configmap存在错误")
}
if configMap != nil {
val, _ := configMap.Data["nodeName"]
if val == config.Config.NodeName {
//就是自己,更新时间戳
startTimeStr := strconv.FormatInt(time.Now().Unix(), 10)
configMap.Data["startTime"] = startTimeStr
if _, err := client.Api.CoreV1().ConfigMaps(namespace).Update(context.TODO(), configMap, metav1.UpdateOptions{}); err != nil {
logger.Errorf("configmap更新存在存在错误。err: %s", err)
}
} else {
//不是自己的话断开服务,关闭管道
IsLeader = false
if Stop != nil {
close(Stop)
Stop = nil
}
}
}
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)