k8s informer的list-watch机制剖析

1、list-watch场景:

client-go中的reflector模块首先会list apiserver获取某个资源的全量信息,然后根据list到的rv来watch资源的增量信息。希望使用client-go编写的控制器组件在与apiserver发生连接异常时,尽量的re-watch资源而不是re-list

2、list-watch主要做的三件事:

informer的list-watch逻辑主要做三个事情:

  • 1、List部分逻辑:设置分页参数;执行list方法;将list结果同步进DeltaFIFO队列中,其实是调用store中的Replace方法。

  • 2、定时同步:定时同步以协程的方式运行,使用定时器实现定期同步,Store中的Resync操作。

  • 3、Watch部分逻辑:在for循环里;执行watch函数获取resultchan;监听resultchan中数据并处理;

过程细节剖析:

  • 1、第一次list资源会设置资源版本号为空,旧版会设为0,拉完后就更新资源版本,后面watch的时候只要关心比这个资源版本大的资源。list的时候会把ListWatch对象包裹在pager对象里 ,这个对象的作用是控制分页查询,比如资源对象太多时,为了防止过大的网络IO,pager可以通过控制url的limit和continue参数来指定一次请求获取的资源数量。

  • 2、watch的时候会开启一个死循环,ListerWatcher会返回要一个watch对象及其内部的一条channel,没有数据时则一直阻塞监听channel,只要有新资源变化就会停止阻塞,然后就根据事件类型往DeltaFIFO里面更新数据,最后会更新最新资源版本。

  • 3、每次向apiserver发起watch请求,如果大概8分钟内都没有任何事件,则apiserver会主动断开连接,断开连接则会关闭watch对象的channel ,Reflector监听channel结束,然后会再次构建watch对象并发起watch请求。

  • 4、ListAndWatch()会被Run()调用。Run()里面把ListAndWatch()包裹在了一个重试函数wait.Until()里面,ListAndWatch()正常情况下是死循环,一旦ListAndWatch()发送错误就会返回,wait.Until()在指定时间后又会重新执行ListAndWatch() 。这一步也叫所谓的ReList。再一次list资源时会尝试传入一个上次list到或最新watch到的资源版本,但并不保证可以成功list,比如watch到的Pod的资源版本和PodList的资源版本没有任何关联,Pod的更新不代表PodList的更新,这里只是尝试一下而已,如果list失败了就把url参数resourceVersion置为空,这样就能拉最新的列表。

概括:

  • 通过list机制来获取全量资源,然后使用那个resourceversion并通过watch模式来增量更新,后续每次watch到新的变化后除了更新cache,还会更新resourceversion,并用新的resourceversion去watch

3、list-watch源码剖析:

// ListAndWatch 函数首先列出所有的对象,并在调用的时候获得资源版本,然后使用该资源版本来进行 watch 操作。
// 如果 ListAndWatch 没有初始化 watch 成功就会返回错误。
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
    var resourceVersion string

    options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}

  // 1.List部分逻辑:设置分页参数;执行list方法;将list结果同步进DeltaFIFO队列中;
    if err := func() error {
        initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
        defer initTrace.LogIfLong(10 * time.Second)
        var list runtime.Object
        var paginatedResult bool
        var err error
        listCh := make(chan struct{}, 1)
        panicCh := make(chan interface{}, 1)
        go func() {
            defer func() {
                if r := recover(); r != nil {
                    panicCh <- r
                }
            }()
            // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
            // list request will return the full response.
            // 如果listerWatcher支持,则尝试以块的形式收集列表,如果不支持,则收集第一个列表请求将返回完整响应
            pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
                return r.listerWatcher.List(opts)
            }))
            switch {
            case r.WatchListPageSize != 0:
                pager.PageSize = r.WatchListPageSize
            case r.paginatedResult:
                // We got a paginated result initially. Assume this resource and server honor
                // paging requests (i.e. watch cache is probably disabled) and leave the default
                // pager size set.
            case options.ResourceVersion != "" && options.ResourceVersion != "0":
                // User didn't explicitly request pagination.
                //
                // With ResourceVersion != "", we have a possibility to list from watch cache,
                // but we do that (for ResourceVersion != "0") only if Limit is unset.
                // To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
                // switch off pagination to force listing from watch cache (if enabled).
                // With the existing semantic of RV (result is at least as fresh as provided RV),
                // this is correct and doesn't lead to going back in time.
                //
                // We also don't turn off pagination for ResourceVersion="0", since watch cache
                // is ignoring Limit in that case anyway, and if watch cache is not enabled
                // we don't introduce regression.
                pager.PageSize = 0
            }
            // 如果过期或者不合法 resourceversion 则进行重试
            list, paginatedResult, err = pager.List(context.Background(), options)
            if isExpiredError(err) || isTooLargeResourceVersionError(err) {
                r.setIsLastSyncResourceVersionUnavailable(true)
                // Retry immediately if the resource version used to list is unavailable.
                // The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
                // continuation pages, but the pager might not be enabled, the full list might fail because the
                // resource version it is listing at is expired or the cache may not yet be synced to the provided
                // resource version. So we need to fallback to resourceVersion="" in all to recover and ensure
                // the reflector makes forward progress.
                list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
            }
            close(listCh)
        }()
        select {
        case <-stopCh:
            return nil
        case r := <-panicCh:
            panic(r)
        case <-listCh:
        }
        if err != nil {
            return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err)
        }

        // We check if the list was paginated and if so set the paginatedResult based on that.
        // However, we want to do that only for the initial list (which is the only case
        // when we set ResourceVersion="0"). The reasoning behind it is that later, in some
        // situations we may force listing directly from etcd (by setting ResourceVersion="")
        // which will return paginated result, even if watch cache is enabled. However, in
        // that case, we still want to prefer sending requests to watch cache if possible.
        //
        // Paginated result returned for request with ResourceVersion="0" mean that watch
        // cache is disabled and there are a lot of objects of a given type. In such case,
        // there is no need to prefer listing from watch cache.
        if options.ResourceVersion == "0" && paginatedResult {
            r.paginatedResult = true
        }

        r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
        initTrace.Step("Objects listed")
    // 
        listMetaInterface, err := meta.ListAccessor(list)
        if err != nil {
            return fmt.Errorf("unable to understand list result %#v: %v", list, err)
        }
    // 获取资源版本号
        resourceVersion = listMetaInterface.GetResourceVersion()
        initTrace.Step("Resource version extracted")
    // 将资源对象转换为资源列表,讲runtime.Object 对象转换为[]runtime.Object对象
        items, err := meta.ExtractList(list)
        if err != nil {
            return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
        }
        initTrace.Step("Objects extracted")
    // 将资源对象列表中的资源和版本号存储在store中
        if err := r.syncWith(items, resourceVersion); err != nil {
            return fmt.Errorf("unable to sync list result: %v", err)
        }
        initTrace.Step("SyncWith done")
        // 更新resourceVersion 
        r.setLastSyncResourceVersion(resourceVersion)
        initTrace.Step("Resource version updated")
        return nil
    }(); err != nil {
        return err
    }

  // 2.定时同步:定时同步以协程的方式运行,使用定时器实现定期同步
    resyncerrc := make(chan error, 1)
    cancelCh := make(chan struct{})
    defer close(cancelCh)
    go func() {
        resyncCh, cleanup := r.resyncChan()
        defer func() {
            cleanup() // Call the last one written into cleanup
        }()
        for {
            select {
            case <-resyncCh:
            case <-stopCh:
                return
            case <-cancelCh:
                return
            }
      // 如果ShouldResync 为nil或者调用返回true,则执行Store中的Resync操作
            if r.ShouldResync == nil || r.ShouldResync() {
                klog.V(4).Infof("%s: forcing resync", r.name)
        // 将indexer的数据和deltafifo进行同步
                if err := r.store.Resync(); err != nil {
                    resyncerrc <- err
                    return
                }
            }
            cleanup()
            resyncCh, cleanup = r.resyncChan()
        }
    }()

  // 3.在for循环里;执行watch函数获取resultchan;监听resultchan中数据并处理;
    for {
        // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
        select {
        case <-stopCh:
            return nil
        default:
        }

        timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
        options = metav1.ListOptions{
            ResourceVersion: resourceVersion,
            // We want to avoid situations of hanging watchers. Stop any wachers that do not
            // receive any events within the timeout window.
            TimeoutSeconds: &timeoutSeconds,
            // To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
            // Reflector doesn't assume bookmarks are returned at all (if the server do not support
            // watch bookmarks, it will ignore this field).
            AllowWatchBookmarks: true,
        }

        // start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
        start := r.clock.Now()
        w, err := r.listerWatcher.Watch(options)
        if err != nil {
            // If this is "connection refused" error, it means that most likely apiserver is not responsive.
            // It doesn't make sense to re-list all objects because most likely we will be able to restart
            // watch where we ended.
            // If that's the case begin exponentially backing off and resend watch request.
            //  如果这是“连接被拒绝”错误,则意味着 apiserver 很可能没有响应。
            // 重新列出所有对象是没有意义的,因为我们很可能能够重新启动
            // 看我们结束的地方。
            // 如果是这种情况,开始指数级后退并重新发送监视请求
            if utilnet.IsConnectionRefused(err) {
                <-r.initConnBackoffManager.Backoff().C()
                continue
            }
            return err
        }

        if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
            if err != errorStopRequested {
                switch {
                case isExpiredError(err):
                    // Don't set LastSyncResourceVersionUnavailable - LIST call with ResourceVersion=RV already
                    // has a semantic that it returns data at least as fresh as provided RV.
                    // So first try to LIST with setting RV to resource version of last observed object.
                    klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
                default:
                    klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
                }
            }
            return nil
        }
    }
}

4.4 LastSyncResourceVersion:获取上一次同步的资源版本

func (r *Reflector) LastSyncResourceVersion() string {
    r.lastSyncResourceVersionMutex.RLock()
    defer r.lastSyncResourceVersionMutex.RUnlock()
    return r.lastSyncResourceVersion
}

4.5 resyncChan:返回一个定时通道和清理函数,清理函数就是停止计时器。这边的定时重新同步是使用定时器实现的。

func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
    if r.resyncPeriod == 0 {
        return neverExitWatch, func() bool { return false }
    }
    // The cleanup function is required: imagine the scenario where watches
    // always fail so we end up listing frequently. Then, if we don't
    // manually stop the timer, we could end up with many timers active
    // concurrently.
    t := r.clock.NewTimer(r.resyncPeriod)
    return t.C(), t.Stop
}
4.6 syncWith:将从apiserver list的资源对象结果同步进DeltaFIFO队列中,调用队列的Replace方法实现。

func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
    found := make([]interface{}, 0, len(items))
    for _, item := range items {
        found = append(found, item)
    }
    return r.store.Replace(found, resourceVersion)
}

4.7 watchHandler:watch的处理:接收watch的接口作为参数,watch接口对外方法是Stop和Resultchan,前者关闭结果通道,后者获取通道。

func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
    eventCount := 0

    // Stopping the watcher should be idempotent and if we return from this function there's no way
    // we're coming back in with the same watch interface.
    defer w.Stop()

loop:
    for {
        select {
        case <-stopCh:
            return errorStopRequested
        case err := <-errc:
            return err
        case event, ok := <-w.ResultChan():
            if !ok {
                break loop
            }
            if event.Type == watch.Error {
                return apierrors.FromObject(event.Object)
            }
            if r.expectedType != nil {
                if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
                    utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
                    continue
                }
            }
      // 判断期待的类型和监听到的事件类型是否一致
            if r.expectedGVK != nil {
                if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
                    utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
                    continue
                }
            }
      // 获取事件对象
            meta, err := meta.Accessor(event.Object)
            if err != nil {
                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
                continue
            }
            newResourceVersion := meta.GetResourceVersion()
      // 对事件类型进行判断,并进行对应操作
            switch event.Type {
            case watch.Added:
                err := r.store.Add(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
                }
            case watch.Modified:
                err := r.store.Update(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
                }
            case watch.Deleted:
                // TODO: Will any consumers need access to the "last known
                // state", which is passed in event.Object? If so, may need
                // to change this.
                err := r.store.Delete(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
                }
            case watch.Bookmark:
        // 表示监听已在此处同步,只需更新
                // A `Bookmark` means watch has synced here, just update the resourceVersion
            default:
                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
            }
            *resourceVersion = newResourceVersion
            // 更新 resource version 版本, 下次使用该 resourceVersion 来 watch 监听.
            r.setLastSyncResourceVersion(newResourceVersion)
            if rvu, ok := r.store.(ResourceVersionUpdater); ok {
                rvu.UpdateResourceVersion(newResourceVersion)
            }
            eventCount++
        }
    }

    watchDuration := r.clock.Since(start)
    // 如果 watch 退出小于 一秒, 另外一条事件也没拿到, 则打条错误日志
    if watchDuration < 1*time.Second && eventCount == 0 {
        return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
    }
    klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)
    return nil
}

4.8 relistResourceVersion:relistResourceVersion 函数获得反射器 relist 的资源版本,如果资源版本非 0,
则表示根据资源版本号继续获取,当传输过程中遇到网络故障或者其他原因导致中断,下次再连接时,会根据资源版本号继续传输未完成的部分。
可以使本地缓存中的数据与Etcd集群中的数据保持一致,该函数实现如下所示:

// 如果最后一次relist的结果是HTTP 410(Gone)状态码,则返回"",这样relist将通过quorum读取etcd中可用的最新资源版本。
// 返回使用 lastSyncResourceVersion,这样反射器就不会使用在relist结果或watch事件中watch到的资源版本更老的资源版本进行relist了
// 当 r.lastSyncResourceVersion 为 "" 时这里为 "0",当使用 r.lastSyncResourceVersion 失败时这里为 ""
// 区别是 "" 会直接请求到 etcd,获取一个最新的版本,而 "0" 访问的是 cache
// 第一次使用0,出错了使用"",否则用lastSyncResourceVersion
// 注意:第一次不会直接全量list etcd,是全量list apiserver
func (r *Reflector) relistResourceVersion() string {
    r.lastSyncResourceVersionMutex.RLock()
    defer r.lastSyncResourceVersionMutex.RUnlock()

    if r.isLastSyncResourceVersionUnavailable {
    // 因为反射器会进行分页List请求,如果 lastSyncResourceVersion 过期了,所有的分页列表请求就都会跳过 watch 缓存
    // 所以设置 ResourceVersion="",然后再次 List,重新建立反射器到最新的可用资源版本,从 etcd 中读取,保持一致性。
        return ""
    }
    if r.lastSyncResourceVersion == "" {
    // 反射器执行的初始 List 操作的时候使用0作为资源版本。
        return "0"
    }
    return r.lastSyncResourceVersion
}

4.9 setLastSyncResourceVersion:用于存储已被Reflector处理的最新资源对象的ResourceVersion,r.setLastSyncResourceVersion方法用于更新该值。
lastSyncResourceVersion属性为Reflector struct的一个属性,
func (r *Reflector) setLastSyncResourceVersion(v string) {
    r.lastSyncResourceVersionMutex.Lock()
    defer r.lastSyncResourceVersionMutex.Unlock()
    r.lastSyncResourceVersion = v
}

// setIsLastSyncResourceVersionUnavailable 设置是否返回具有lastSyncResourceVersion 的最后一个列表或监视请求“过期”或“资源版本太大”错误。
func (r *Reflector) setIsLastSyncResourceVersionUnavailable(isUnavailable bool) {
    r.lastSyncResourceVersionMutex.Lock()
    defer r.lastSyncResourceVersionMutex.Unlock()
    r.isLastSyncResourceVersionUnavailable = isUnavailable
}
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐