由于k8s ingress功能相对有限,因此目前写了套k8s的自定义资源extendIngress的代码去替代ingress。对于k8s集群流量入口,需要考虑多副本去分流,但在代码中,每个副本在watch自定义extendIngress增改操作后要去更新extendIngress对象的status状态。更新操作只需要一个副本去执行即可,如果不给副本加锁,每个副本都要去更新资源对象,导致一些不必要的更新,所以需要考虑给这些副本选举leader,由leader去执行更新操作。由于k8s也有leader选举机制,所以打算看它是怎么运用的,考虑将k8s的leader选举机制加入到自己的项目中。
在k8s的组件中,其中有kube-scheduler和kube-manager-controller两个组件是有leader选举的,这个选举机制是k8s对于这两个组件的高可用保障。即正常情况下kube-scheduler或kube-manager-controller组件的多个副本只有一个是处于业务逻辑运行状态,其它副本则不断的尝试去获取锁,去竞争leader,直到自己成为leader。如果正在运行的leader因某种原因导致当前进程退出,或者锁丢失,则由其它副本去竞争新的leader,获取leader继而执行业务逻辑。
这里选取kube-scheduler(基于k8s release 1.9)看该组件是怎么进行选举的,查看调度组件关于leader选举的启动参数:

fs.StringVar(&options.config.LeaderElection.LockObjectNamespace, "lock-object-namespace", options.config.LeaderElection.LockObjectNamespace, "Define the namespace of the lock object.")
fs.StringVar(&options.config.LeaderElection.LockObjectName, "lock-object-name", options.config.LeaderElection.LockObjectName, "Define the name of the lock object.")
fs.BoolVar(&l.LeaderElect, "leader-elect", l.LeaderElect, ""+
        "Start a leader election client and gain leadership before "+
        "executing the main loop. Enable this when running replicated "+
        "components for high availability.")
fs.DurationVar(&l.LeaseDuration.Duration, "leader-elect-lease-duration", l.LeaseDuration.Duration, ""+
        "The duration that non-leader candidates will wait after observing a leadership "+
        "renewal until attempting to acquire leadership of a led but unrenewed leader "+
        "slot. This is effectively the maximum duration that a leader can be stopped "+
        "before it is replaced by another candidate. This is only applicable if leader "+
        "election is enabled.")
fs.DurationVar(&l.RenewDeadline.Duration, "leader-elect-renew-deadline", l.RenewDeadline.Duration, ""+
        "The interval between attempts by the acting master to renew a leadership slot "+
        "before it stops leading. This must be less than or equal to the lease duration. "+
        "This is only applicable if leader election is enabled.")
fs.DurationVar(&l.RetryPeriod.Duration, "leader-elect-retry-period", l.RetryPeriod.Duration, ""+
        "The duration the clients should wait between attempting acquisition and renewal "+
        "of a leadership. This is only applicable if leader election is enabled.")
fs.StringVar(&l.ResourceLock, "leader-elect-resource-lock", l.ResourceLock, ""+
        "The type of resource object that is used for locking during "+
        "leader election. Supported options are `endpoints` (default) and `configmaps`.")

总共有7个leader选举参数,其中lock-object-namespace和lock-object-name是锁对象的命名空间和名称。leader-elect表示该组件运行时是否需要leader选举(如果集群时多副本,需要设置该选项为true,否则每个副本都将参与实际工作)。leader-elect-lease-duration为资源锁租约观察时间,如果其它竞争者在该时间间隔过后发现leader没更新获取锁时间,则其它副本可以认为leader已经挂掉不参与工作了,将重新选举leader。leader-elect-renew-deadline leader在该时间内没有更新则失去leader身份。leader-elect-retry-period为其它副本获取锁的时间间隔(竞争leader)和leader更新间隔。leader-elect-resource-lock是k8s分布式资源锁的资源对象,目前只支持endpoints和configmas。
kube-scheduler中关于leader的配置代码

// Set up leader election if enabled.
var leaderElectionConfig *leaderelection.LeaderElectionConfig
if config.LeaderElection.LeaderElect {
    leaderElectionConfig, err = makeLeaderElectionConfig(config.LeaderElection, leaderElectionClient, recorder)
    if err != nil {
        return nil, err
    }
}

// makeLeaderElectionConfig builds a leader election configuration. It will
// create a new resource lock associated with the configuration.
func makeLeaderElectionConfig(config componentconfig.KubeSchedulerLeaderElectionConfiguration, client clientset.Interface, recorder record.EventRecorder) (*leaderelection.LeaderElectionConfig, error) {
    // 获得当前锁的对象标识
    hostname, err := os.Hostname()
    if err != nil {
        return nil, fmt.Errorf("unable to get hostname: %v", err)
    }
    // 调用k8s的leaderelection包去生成锁对象
    // 后面会专门讲解leaderelection包
    rl, err := resourcelock.New(config.ResourceLock,
        config.LockObjectNamespace,
        config.LockObjectName,
        client.CoreV1(),
        resourcelock.ResourceLockConfig{
            Identity:      hostname,
            EventRecorder: recorder,
        })
    if err != nil {
        return nil, fmt.Errorf("couldn't create resource lock: %v", err)
    }

    return &leaderelection.LeaderElectionConfig{
        Lock:          rl,
        // 租约时间间隔
        LeaseDuration: config.LeaseDuration.Duration,
        // leader持有锁时间
        RenewDeadline: config.RenewDeadline.Duration,
        // 其它副本重试(竞争leader)时间间隔
        RetryPeriod:   config.RetryPeriod.Duration,
    }, nil
}

kube-scheduler的leader使用代码

// Prepare a reusable run function.
run := func(stopCh <-chan struct{}) {
    // scheduler的调度逻辑
    sched.Run()
    <-stopCh
}

// If leader election is enabled, run via LeaderElector until done and exit.
if s.LeaderElection != nil {
    s.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
        // 回调函数,如果选举成功则执行run函数,run函数就是上面定义的
        OnStartedLeading: run,
        // 若因某种原因导致失去leader,执行该函数
        OnStoppedLeading: func() {
            utilruntime.HandleError(fmt.Errorf("lost master"))
        },
    }
    // 根据scheduler的leader配置去返回leader对象,主要是判定配置的leader config是否正确
    leaderElector, err := leaderelection.NewLeaderElector(*s.LeaderElection)
    if err != nil {
        return fmt.Errorf("couldn't create leader elector: %v", err)
    }
    // 开始选举,若获得锁成为leader则执行OnStartedLeading,若获取锁失败则不断重试,直到成为leader
    leaderElector.Run()

    return fmt.Errorf("lost lease")
}

kube-scheduler中关于leader的实现就是这样,代码很少,主要是调用了k8s.io/client-go/tools/leaderelection实现的,到这里,好像还有必要深层次分析leaderelection包,关于leaderelection包的源码结构如下

k8s.io/client-go/tools/leaderelection
├── resourcelock
│   ├── BUILD
│   ├── configmaplock.go  //configmap资源锁的实现
│   ├── endpointslock.go  //endpointslock资源锁的实现
│   ├── interface.go  //资源锁接口
├── BUILD
├── leaderelection.go //leader选举主要逻辑
├── leaderelection_test.go 

我们顺着kube-scheduler的调用来分析leader选举是怎么使用的

// leader锁资源,只支持endpoints和configmaps,rlc中包含了锁标识和recorder对象
func New(lockType string, ns string, name string, client corev1.CoreV1Interface, rlc ResourceLockConfig) (Interface, error) {
    switch lockType {
    case EndpointsResourceLock:
        return &EndpointsLock{
            EndpointsMeta: metav1.ObjectMeta{
                Namespace: ns,
                Name:      name,
            },
            Client:     client,
            LockConfig: rlc,
        }, nil
    case ConfigMapsResourceLock:
        return &ConfigMapLock{
            ConfigMapMeta: metav1.ObjectMeta{
                Namespace: ns,
                Name:      name,
            },
            Client:     client,
            LockConfig: rlc,
        }, nil
    default:
        return nil, fmt.Errorf("Invalid lock-type %s", lockType)
    }
}

// leader的回调函数
type LeaderCallbacks struct {
    // OnStartedLeading is called when a LeaderElector client starts leading
    OnStartedLeading func(stop <-chan struct{})
    // OnStoppedLeading is called when a LeaderElector client stops leading
    OnStoppedLeading func()
    // OnNewLeader is called when the client observes a leader that is
    // not the previously observed leader. This includes the first observed
    // leader when the client starts.
    OnNewLeader func(identity string)
}

// 根据LeaderElectionConfig返回leader对象(做一些参数配置检测)
func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) {
    if lec.LeaseDuration <= lec.RenewDeadline {
        return nil, fmt.Errorf("leaseDuration must be greater than renewDeadline")
    }
    if lec.RenewDeadline <= time.Duration(JitterFactor*float64(lec.RetryPeriod)) {
        return nil, fmt.Errorf("renewDeadline must be greater than retryPeriod*JitterFactor")
    }
    if lec.Lock == nil {
        return nil, fmt.Errorf("Lock must not be nil.")
    }
    return &LeaderElector{
        config: lec,
    }, nil
}

查看leaderelection包中的Run函数,它实现了leader的选举流程

func (le *LeaderElector) Run() {
    defer func() {
        runtime.HandleCrash()
        // 如果Run函数退出,调用我们在前面注册的失败函数
        // func() {
        //     utilruntime.HandleError(fmt.Errorf("lost master"))
        // }
        le.config.Callbacks.OnStoppedLeading()
    }()
    // 各个副本竞争锁,选举leader,如果有实例竞争成功,返回并继续执行OnStartedLeading函数,即我们上面这次的sche.Run()函数,接着继续执行renew函数不断的更新获取锁时间,让其它副本知道自己还存活着。对于没有成为leader的副本将阻塞在acquire()函数,不断重试成为leader,如果renew函数异常退出,则发信号给OnStartedLeading,告诉它也应停止工作。
    le.acquire()
    stop := make(chan struct{})
    go le.config.Callbacks.OnStartedLeading(stop)
    le.renew()
    close(stop)
}

分别查看acquire和renew函数

// acquire函数
func (le *LeaderElector) acquire() {
    stop := make(chan struct{})
    glog.Infof("attempting to acquire leader lease...")
    // 不断重试获取锁,重试间隔为RetryPeriod(该值是我们在scheduler的启动参数中配置的重试间隔),如果获取锁成功则退出该JitterUntil函数,否则继续重试,直到成为leader
    wait.JitterUntil(func() {
        succeeded := le.tryAcquireOrRenew()
        le.maybeReportTransition()
        // desc形式为资源锁对象namespace/name
        desc := le.config.Lock.Describe()
        // 获取锁失败,退出继续执行JitterUntil函数
        if !succeeded {
            glog.V(4).Infof("failed to acquire lease %v", desc)
            return
        }
        le.config.Lock.RecordEvent("became leader")
        glog.Infof("successfully acquired lease %v", desc)
        close(stop)
    }, le.config.RetryPeriod, JitterFactor, true, stop)
}

// renew函数
func (le *LeaderElector) renew() {
    stop := make(chan struct{})
    // 更新锁,继续占有,如果在RenewDeadline间隔内未更新成功,将失去leader
    wait.Until(func() {
        err := wait.Poll(le.config.RetryPeriod, le.config.RenewDeadline, func() (bool, error) {
            return le.tryAcquireOrRenew(), nil
        })
        le.maybeReportTransition()
        // desc形式为资源锁对象namespace/name
        desc := le.config.Lock.Describe()
        if err == nil {
            glog.V(4).Infof("successfully renewed lease %v", desc)
            return
        }
        le.config.Lock.RecordEvent("stopped leading")
        glog.Infof("failed to renew lease %v: %v", desc, err)
        close(stop)
    }, 0, stop)
}

tryAcquireOrRenew函数是acquire和renew函数中最重要的函数,代码如下

func (le *LeaderElector) tryAcquireOrRenew() bool {
    now := metav1.Now()
    leaderElectionRecord := rl.LeaderElectionRecord{
        HolderIdentity:       le.config.Lock.Identity(),
        LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
        RenewTime:            now,
        AcquireTime:          now,
    }

    // 1. 获取资源锁,如果该资源未存在,则生成它
    oldLeaderElectionRecord, err := le.config.Lock.Get()
    if err != nil {
        if !errors.IsNotFound(err) {
            glog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
            return false
        }
        if err = le.config.Lock.Create(leaderElectionRecord); err != nil {
            glog.Errorf("error initially creating leader election record: %v", err)
            return false
        }
        le.observedRecord = leaderElectionRecord
        le.observedTime = time.Now()
        return true
    }

    // 1. 检查锁标识和更新时间
    if !reflect.DeepEqual(le.observedRecord, *oldLeaderElectionRecord) {
        le.observedRecord = *oldLeaderElectionRecord
        le.observedTime = time.Now()
    }
    // 如果在租约时间内leader更新并且当前副本不是leader,则返回等待重试时间间隔继续获取锁
    if le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
        oldLeaderElectionRecord.HolderIdentity != le.config.Lock.Identity() {
        glog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
        return false
    }

    // 3. 获取锁成功,如果当前副本已经是leader,则将获取时间赋值为第一次获取的时间,LeaderTransitions表示了组件运行时间内leader更换次数,如果当前获取锁的不是前任leader,则LeaderTransitions加1
    if oldLeaderElectionRecord.HolderIdentity == le.config.Lock.Identity() {
        leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
        leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
    } else {
        leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
    }

    // 更新资源锁信息,如果是endpoints,则在annotation的control-plane.alpha.kubernetes.io/leader字段里,包括相应的renew更新时间、锁标识、leaseDurationSeconds租约时间、acquireTime获取锁的时间、leaderTransitions leader改变次数等等
    if err = le.config.Lock.Update(leaderElectionRecord); err != nil {
        glog.Errorf("Failed to update lock: %v", err)
        return false
    }
    le.observedRecord = leaderElectionRecord
    le.observedTime = time.Now()
    return true

下面自己做了一些尝试,写了一个小demo查看leader选举情况,设置

// OnStartedLeading回调函数
electionRun := func(stopCh <-chan struct{}) {
    for {
        i++
        glog.Info("run ", i)
        time.Sleep(time.Second)
    }
    <-stopCh
}
// leader配置参数
LeaderElection := componentconfig.KubeSchedulerLeaderElectionConfiguration{componentconfig.LeaderElectionConfiguration{
    LeaderElect: true,
    LeaseDuration: metav1.Duration{Duration: 15 * time.Second},
    RenewDeadline: metav1.Duration{Duration: 10 * time.Second},
    RetryPeriod: metav1.Duration{Duration: 2 * time.Second},
    ResourceLock: "endpoints",
},"default", "test"}

分别启动两个副本DESKTOP-GFY7SQ100和DESKTOP-GF7FSQ081,DESKTOP-GF7FSQ081成为leader并执行业务逻辑
leader1
运行一段时间后停止DESKTOP-GF7FSQ081,发现DESKTOP-GFY7SQ100副本将替代DESKTOP-GF7FSQ081成为新的leader,并开始具体的业务逻辑
leader2
发现副本在租约时间过后发现旧leader没有更新锁获取时间,就取代了旧leader成为新leader了。

总结

本文从kube-scheduler入手分析了k8s的leader选举机制,并分析了leaderelection的相关函数,k8s在分布式锁上不使用redies数据库的实现方式(是不是为了减少组件引入?),也没有基于zk/etcd实现,而是通过创建k8s资源来实现锁,维护锁状态。至于为什么k8s会使用这样的做法,不得而知,不过作为k8s学习者来说,这也是利用源码来学习的一个途径。

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐