组件背景
程序内已经使用了k8s的client端,为了减少其他组件的依赖,所以不使用redis和etcd。采用configmap实现。因为configmap没有锁机制,所以要自己写这个机制,同样ttl机制也需要代码实现。
通过corn定时任务,每50秒所有组件抢占一次锁,如果原有leader是default或已经50秒没有更新ttl时间。则进行抢占。否则退出此次任务。每20秒进行一次ttl的更新。确保主一直在工作,同时有备进行争抢。
需要实现的功能

抢占式锁
主备
ttl检查
每次验证后确保服务只启动一个,并且Worker仅运行一次

代码

var Stop chan struct{}\
var IsLeader bool = false

//定时任务
syncLock := cron2.AddFunc("*/50 * * * * *", services.SyncLock)
worker := cron2.AddFunc("*/20 * * * * *", services.HoldLeader)

//抢锁
func SyncLock() {
   configMap, err := client.Api.CoreV1().ConfigMaps(namespace).Get(context.TODO(), name, metav1.GetOptions{})
   if err != nil {
      logger.Errorf("configmap存在错误")
   }
   if configMap != nil {
      val, _ := configMap.Data["nodeName"]
      if val == config.Config.NodeName {
         //就是自己,判断一下k8s连接是否还在。
         IsLeader = true
         //自己掉了,又自己恢复了,未到超时时间,别人还未抢到状态,重新拉起服务
         if Stop == nil {
            //Worker部分为真正业务代码,实现的是k8s的informer。通过channel进行关闭。\
            Worker()
         }
      } else {
         //判断时间戳和当前是否大于50秒,或者是否是default。小于直接返回,大于证明ttl超时就去抢资格
         times, _ := configMap.Data["startTime"]
         timeNow, _ := strconv.ParseInt(times, 10, 64)
         if val == "default" || timeNow+50 <= time.Now().Unix() {
            configMap.Data["nodeName"] = config.Config.NodeName
            startTimeStr := strconv.FormatInt(time.Now().Unix(), 10)
            configMap.Data["startTime"] = startTimeStr
            if _, err := client.Api.CoreV1().ConfigMaps(namespace).Update(context.TODO(), configMap, metav1.UpdateOptions{}); err != nil {
               logger.Errorf("configmap更新存在存在错误。err: %s", err)
            }
            //去抢资格,因为configmap没有锁和事务机制。多个组件可能都写入了。写入后等待2秒
            time.Sleep(time.Second * 2)
            //两秒后在看看最后写入的是不是自己
            configMapNow, _ := client.Api.CoreV1().ConfigMaps(namespace).Get(context.TODO(), name, metav1.GetOptions{})
            if configMapNow != nil {
               valNow, _ := configMapNow.Data["nodeName"]
               if valNow == config.Config.NodeName {
                  //注册成功,证明是抢到的leader,开启所有服务
                  Worker()
               } else {
                  //抢占失败,关闭通道。
                  IsLeader = false
                  if Stop != nil {
                     close(Stop)
                     Stop = nil
                  }
               }
            }
         }
      }
   }
}

//续时自己是leader
func HoldLeader() {
   if IsLeader {
      configMap, err := client.Api.CoreV1().ConfigMaps(namespace).Get(context.TODO(), name, metav1.GetOptions{})
      if err != nil {
         logger.Errorf("configmap存在错误")
      }
      if configMap != nil {
         val, _ := configMap.Data["nodeName"]
         if val == config.Config.NodeName {
            //就是自己,更新时间戳
            startTimeStr := strconv.FormatInt(time.Now().Unix(), 10)
            configMap.Data["startTime"] = startTimeStr
            if _, err := client.Api.CoreV1().ConfigMaps(namespace).Update(context.TODO(), configMap, metav1.UpdateOptions{}); err != nil {
               logger.Errorf("configmap更新存在存在错误。err: %s", err)
            }
         } else {
            //不是自己的话断开服务,关闭管道
            IsLeader = false
            if Stop != nil {
               close(Stop)
               Stop = nil
            }
         }
      }
   }
}
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐