client-go—— Reflector(k8s)(六)
文章目录本文主要对Informer的Reflector组件进行分析说明下图是整个client-go的完整结构图,或者说使我们实现一个定义的控制器的完整流程,其中黄色图标是我们开发者需要自行开发的部分,而其它的部分是client-go已经提供的,直接使用即可。由于client-go实现非常复杂,我们这里先对上图中最核心的部分Informer进行说明。在Informer的架构中包含如下几个核心组件:I
下图是整个client-go的完整结构图,或者说使我们实现一个定义的控制器的完整流程,其中黄色图标是我们开发者需要自行开发的部分,而其它的部分是client-go已经提供的,直接使用即可。由于client-go实现非常复杂,我们这里先对上图中最核心的部分Informer进行说明。在Informer的架构中包含如下几个核心组件:
Informers是client-go中非常重要的概念 ,接下来我们来仔细分析下Informers的实现原理,下图是client-go的实现架构图
本文主要对Informer的Reflector组件进行分析说明
前面我们说了Informer通过对APIServer的资源对象执行List和Watch操作,把获取到的数据存储在Reflector,我们可以称其为反射器,从名字我们可以看出来他的主要功能就是反射,就是将Etcd里面的数据反射到本地存储(DeltaFIFO)中。Reflector首先通过List操作获取所有的资源对象数据,保存到本地存储(DeltaFIFO),然后通过watch操作监控资源的变化,触发响应的事件处理,比如前面示例的Add事件、Update事件、Delete事件。
Reflector结构体的定义位于staging/src/k8s.io/client-go/tools/cache/reflector.go下面:
//Reflector(反射器)监听指定资源,将所有的变化都反射到指定的存储中去
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
type Reflector struct {
// name标识这个反射器的名称,默认为 文件:行数(比如:reflector.go: 50),默认名字通过
//k8s.io/apimachinery/pkg/util/naming/from_stack.go下面的GetNameFromCallsite函数生成
// name identifies this reflector. By default it will be a file:line if possible.
name string
// The name of the type we expect to place in the store. The name
// will be the stringification of expectedGVK if provided, and the
// stringification of expectedType otherwise. It is for display
// only, and should not be used for parsing or comparison.
// 期望放到store中的类型名称,如果提供,则是expectedGVK的字符串形式,否则就是expectedType的字符串,它仅仅用于显示,不用于解析或者比较
expectedTypeName string
// An example object of the type we expect to place in the store.
// Only the type needs to be right, except that when that is
// `unstructured.Unstructured` the object's `"apiVersion"` and
// `"kind"` must also be right.
// 我们期望放到store中的对象类型, 当对象是非结构化时,它的type需要正确,apiVersion和kind也必须是对的
expectedType reflect.Type
// The GVK of the object we expect to place in the store if unstructured.
// 如果是非结构化的,我们期望放在store中的对象GVK
expectedGVK *schema.GroupVersionKind
// The destination to sync up with the watch source
// 把数据同步到缓存中的watch源,实际上就是DeltaFIFO
store Store
// listerWatcher is used to perform lists and watches.
// 用来执行lists和watch操作的listWatch接口
listerWatcher ListerWatcher
// 管理ListWatch的backoff
// backoff manages backoff of ListWatch
backoffManager wait.BackoffManager
// 同步周期
resyncPeriod time.Duration
// ShouldResync is invoked periodically and whenever it returns `true` the Store's Resync operation is invoked
// 如果该函数返回true或者nil,则会周期性同步
ShouldResync func() bool
// clock allows tests to manipulate time
clock clock.Clock
// paginatedResult defines whether pagination should be forced for list calls.
// It is set based on the result of the initial list call.
// 获取列表的结果是否被强制分页
paginatedResult bool
// lastSyncResourceVersion is the resource version token last
// observed when doing a sync with the underlying store
// it is thread safe, but not synchronized with the underlying store
// 资源在k8s apiserver中的版本,对象的增加、修改、更新的都会是的一个新的版本
lastSyncResourceVersion string
// isLastSyncResourceVersionGone is true if the previous list or watch request with lastSyncResourceVersion
// failed with an HTTP 410 (Gone) status code.
// 如果之前使用lastSyncResourceVersion进行list、watch请求失败,返回410 http错误码,isLastSyncResourceVersionGone是true
isLastSyncResourceVersionGone bool
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
// 保证lastSyncResourceVersion的读写
lastSyncResourceVersionMutex sync.RWMutex
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
// If unset, for consistent reads (RV="") or reads that opt-into arbitrarily old data
// (RV="0") it will default to pager.PageSize, for the rest (RV != "" && RV != "0")
// it will turn off pagination to allow serving them from watch cache.
// NOTE: It should be used carefully as paginated lists are always served directly from
// etcd, which is significantly less efficient and may lead to serious performance and
// scalability problems.
// watchList的分页chunk大小
WatchListPageSize int64
}
/*
NewReflector创建一个新的Reflector对象,该对象将使给定的存储与给定资源的服务器内容保持最新。Reflector承诺只在 store中放置类型为expectedType的物品,除非expectedType为零。如果resyncPeriod为非零,则反射器将定期查询其ShouldResync函数,以确定是否调用store的Resync操作`ShouldResync==nil`表示始终“是”。这使您能够使用反射器定期处理所有内容,并以增量方式处理更改的内容
*/
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
/*
GetNameFromCallsite遍历调用堆栈,直到我们在ignoredPackages之外找到一个调用方,它返回一个shortpath/filename:line,以帮助在开始记录时识别此反射
*/
return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}
// NewNamedReflector和NewReflector作用相同,但是可以指定日志名
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
realClock := &clock.RealClock{}
r := &Reflector{
name: name,
listerWatcher: lw,
store: store,
// We used to make the call every 1sec (1 QPS), the goal here is to achieve ~98% traffic reduction when
// API server is not healthy. With these parameters, backoff will stop at [30,60) sec interval which is
// 0.22 QPS. If we don't backoff for 2min, assume API server is healthy and we reset the backoff.
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),
resyncPeriod: resyncPeriod,
clock: realClock,
}
r.setExpectedType(expectedType)
return r
}
// 重复运行使用反射器的ListAndWatch获取所有对象和后续增量。stopCh关闭时,Run将退出。stopCh通知其他协程退出
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err)
}
}, r.backoffManager, true, stopCh)
klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}
// ListAndWatch首先列出所有项目,并在调用时获取资源版本,然后使用资源版本进行监视。如果ListAndWatch甚至没有尝试初始化watch,它将返回错误。
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
var resourceVersion string
// 传递资源对象的版本
options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
if err := func() error {
initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
defer initTrace.LogIfLong(10 * time.Second)
var list runtime.Object
var paginatedResult bool
var err error
listCh := make(chan struct{}, 1)
panicCh := make(chan interface{}, 1)
go func() {
defer func() {
if r := recover(); r != nil {
panicCh <- r
}
}()
// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
// list request will return the full response.
// 如果listerWatcher支持,尝试将列表分块收集,如果不支持,第一个列表请求将返回完整响应。
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.List(opts)
}))
switch {
case r.WatchListPageSize != 0:
pager.PageSize = r.WatchListPageSize
case r.paginatedResult:
// We got a paginated result initially. Assume this resource and server honor
// paging requests (i.e. watch cache is probably disabled) and leave the default
// pager size set.
// 最初我们得到了分页结果。假设此资源和服务器满足分页请求(即可能禁用了监视缓存),并保留默认的分页大小设置。
case options.ResourceVersion != "" && options.ResourceVersion != "0":
// User didn't explicitly request pagination.
//
// With ResourceVersion != "", we have a possibility to list from watch cache,
// but we do that (for ResourceVersion != "0") only if Limit is unset.
// To avoid thundering herd on etcd (e.g. on master upgrades), we explicitly
// switch off pagination to force listing from watch cache (if enabled).
// With the existing semantic of RV (result is at least as fresh as provided RV),
// this is correct and doesn't lead to going back in time.
//
// We also don't turn off pagination for ResourceVersion="0", since watch cache
// is ignoring Limit in that case anyway, and if watch cache is not enabled
// we don't introduce regression.
// 用户未明确请求分页。使用ResourceVersion!=“”中,我们可以从监视缓存中列出,但只有在未设置限制的情况下才能这样做(对于ResourceVersion!=“0”)。为了避免在etcd上引起轰动(例如在主升级上),我们明确关闭分页以强制从监视缓存中列出(如果启用)。使用RV的现有语义(结果至少与提供的RV一样新鲜),这是正确的,不会导致时间倒退。我们也不会关闭ResourceVersion=“0”的分页,因为在这种情况下,监视缓存会忽略限制,如果未启用监视缓存,则不会引入回归。
pager.PageSize = 0
}
list, paginatedResult, err = pager.List(context.Background(), options)
if isExpiredError(err) {
r.setIsLastSyncResourceVersionExpired(true)
// Retry immediately if the resource version used to list is expired.
// The pager already falls back to full list if paginated list calls fail due to an "Expired" error on
// continuation pages, but the pager might not be enabled, or the full list might fail because the
// resource version it is listing at is expired, so we need to fallback to resourceVersion="" in all
// to recover and ensure the reflector makes forward progress.
// 如果用于列出的资源版本已过期,请立即重试。如果分页列表调用因继续页上的“Expired”错误而失败,则寻呼机已返回到完整列表,但寻呼机可能未启用,或者完整列表可能会因其所在的资源版本已过期而失败,因此我们总共需要回退到resourceVersion="",以恢复并确保反射程序向前进展
list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
}
close(listCh)
}()
select {
case <-stopCh:
return nil
case r := <-panicCh:
panic(r)
case <-listCh:
}
if err != nil {
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err)
}
// We check if the list was paginated and if so set the paginatedResult based on that.
// However, we want to do that only for the initial list (which is the only case
// when we set ResourceVersion="0"). The reasoning behind it is that later, in some
// situations we may force listing directly from etcd (by setting ResourceVersion="")
// which will return paginated result, even if watch cache is enabled. However, in
// that case, we still want to prefer sending requests to watch cache if possible.
//
// Paginated result returned for request with ResourceVersion="0" mean that watch
// cache is disabled and there are a lot of objects of a given type. In such case,
// there is no need to prefer listing from watch cache.
// 我们检查列表是否已分页,如果已分页,则在此基础上设置分页结果。但是,我们只想对初始列表执行此操作(这是设置ResourceVersion=“0”时的唯一情况)。其背后的原因是,在某些情况下,我们可能会直接从etcd强制列出(通过设置ResourceVersion=“”),这将返回分页结果,即使启用了watch cache。然而,在这种情况下,如果可能的话,我们仍然希望发送请求以监视缓存。为ResourceVersion=“0”的请求返回的分页结果意味着禁用了监视缓存,并且存在许多给定类型的对象。在这种情况下,不需要从监视缓存中选择列表。
if options.ResourceVersion == "0" && paginatedResult {
r.paginatedResult = true
}
r.setIsLastSyncResourceVersionExpired(false) // list was successful
initTrace.Step("Objects listed")
listMetaInterface, err := meta.ListAccessor(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
}
resourceVersion = listMetaInterface.GetResourceVersion()
initTrace.Step("Resource version extracted")
// 解析list从runtime.object->items
items, err := meta.ExtractList(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
}
initTrace.Step("Objects extracted")
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
}
initTrace.Step("SyncWith done")
r.setLastSyncResourceVersion(resourceVersion)
initTrace.Step("Resource version updated")
return nil
}(); err != nil {
return err
}
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
go func() {
resyncCh, cleanup := r.resyncChan()
defer func() {
cleanup() // Call the last one written into cleanup
}()
for {
select {
case <-resyncCh:
case <-stopCh:
return
case <-cancelCh:
return
}
if r.ShouldResync == nil || r.ShouldResync() {
klog.V(4).Infof("%s: forcing resync", r.name)
if err := r.store.Resync(); err != nil {
resyncerrc <- err
return
}
}
cleanup()
resyncCh, cleanup = r.resyncChan()
}
}()
for {
// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
// 给stopCh一个停止循环的机会,即使在continue语句进一步出错的情况下也是如此
select {
case <-stopCh:
return nil
default:
}
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options = metav1.ListOptions{
ResourceVersion: resourceVersion,
// We want to avoid situations of hanging watchers. Stop any wachers that do not
// receive any events within the timeout window.
// 我们希望避免出现绞死观察者的情况。停止任何在超时窗口内未接收到任何事件的Wacher。
TimeoutSeconds: &timeoutSeconds,
// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
// Reflector doesn't assume bookmarks are returned at all (if the server do not support
// watch bookmarks, it will ignore this field).
// 要减少监视重新启动时kube apiserver上的负载,可以启用监视 bookmarks。Reflector根本不假设返回了bookmarks(如果服务器不支持 watch bookmarks ,它将忽略此字段)。
AllowWatchBookmarks: true,
}
// start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent
start := r.clock.Now()
w, err := r.listerWatcher.Watch(options)
if err != nil {
switch {
case isExpiredError(err):
// Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already
// has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object.
// 不要使用ResourceVersion=RV设置LastSyncResourceVersionExpired-LIST调用,因为它已经有了一个语义,即返回的数据至少与RV提供的数据一样新鲜。因此,首先尝试将RV设置为最后观察到的对象的资源版本
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
case err == io.EOF:
// watch closed normally
case err == io.ErrUnexpectedEOF:
klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
default:
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
}
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
// It doesn't make sense to re-list all objects because most likely we will be able to restart
// watch where we ended.
// If that's the case wait and resend watch request.
// 如果这是“拒绝连接”错误,则表示apiserver很可能没有响应。重新列出所有对象是没有意义的,因为我们很可能能够在结束时重新启动watch。如果是这种情况,请等待并重新发送监视请求
if utilnet.IsConnectionRefused(err) {
time.Sleep(time.Second)
continue
}
return nil
}
if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
if err != errorStopRequested {
switch {
case isExpiredError(err):
// Don't set LastSyncResourceVersionExpired - LIST call with ResourceVersion=RV already
// has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object.
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
default:
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
}
}
return nil
}
}
}
首先通过反射器的relistResourceVersion()获得反射器relist的资源版本,如果资源版本非0,则表示根据资源版本号继续获取,当传输过程中遇到网络故障或者其他原因导致中断,下次再连接时,会根据资源版本号继续传输未完成的部分。可以使本地缓存中的数据与etcd集群中数据保持一致,该函数实现如下所示:
//relistResourceVersion确定反射器应列出或重新列出的资源版本。返回lastSyncResourceVersion,以便此反射器使用不早于在重新列出结果或监视事件中观察到的资源版本重新列出,或者,如果最后一次重新列出导致HTTP 410(Gone)状态代码,则返回“”,以便重新列出将通过仲裁读取使用etcd中可用的最新资源版本。。
func (r *Reflector) relistResourceVersion() string {
r.lastSyncResourceVersionMutex.RLock()
defer r.lastSyncResourceVersionMutex.RUnlock()
if r.isLastSyncResourceVersionGone {
//由于此反射器发出分页列表请求,并且如果lastSyncResourceVersion过期,所有分页列表请求都会跳过监视缓存,因此我们设置ResourceVersion=“”并再次列出,以使用从etcd的一致读取将反射器重新建立为最新可用的ResourceVersion。
// Since this reflector makes paginated list requests, and all paginated list requests skip the watch cache
// if the lastSyncResourceVersion is expired, we set ResourceVersion="" and list again to re-establish reflector
// to the latest available ResourceVersion, using a consistent read from etcd.
return ""
}
if r.lastSyncResourceVersion == "" {
// For performance reasons, initial list performed by reflector uses "0" as resource version to allow it to
// be served from the watch cache if it is enabled.
// 出于性能原因,reflector执行的初始列表使用“0”作为资源版本,以允许在启用监视缓存的情况下从监视缓存提供服务。
return "0"
}
return
// watchHandler watches w and keeps *resourceVersion up to date.
// watchHandler监视w并使resourceVersion保持最新。
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
eventCount := 0
// Stopping the watcher should be idempotent and if we return from this function there's no way
// we're coming back in with the same watch interface.
// 停止监视程序应该是幂等的,如果我们从这个函数返回,我们就不可能返回相同的监视界面。
defer w.Stop()
loop:
for {
select {
case <-stopCh:
return errorStopRequested
case err := <-errc:
return err
case event, ok := <-w.ResultChan():
if !ok {
break loop
}
if event.Type == watch.Error {
return apierrors.FromObject(event.Object)
}
if r.expectedType != nil {
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
continue
}
}
if r.expectedGVK != nil {
if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
continue
}
}
meta, err := meta.Accessor(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
continue
}
newResourceVersion := meta.GetResourceVersion()
switch event.Type {
case watch.Added:
err := r.store.Add(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
}
case watch.Modified:
err := r.store.Update(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
}
case watch.Deleted:
// TODO: Will any consumers need access to the "last known
// state", which is passed in event.Object? If so, may need
// to change this.
err := r.store.Delete(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
}
case watch.Bookmark:
// A `Bookmark` means watch has synced here, just update the resourceVersion
// “Bookmark”表示watch已在此处同步,只需更新resourceVersion
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
}
*resourceVersion = newResourceVersion
r.setLastSyncResourceVersion(newResourceVersion)
eventCount++
}
}
watchDuration := r.clock.Since(start)
if watchDuration < 1*time.Second && eventCount == 0 {
return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
}
klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)
return nil
}
// staging/src/k8s.io/client-go/tools/cache/listwatch.go
// Lister是一个知道如何进行初始化list的对象
// Lister is any object that knows how to perform an initial list.
type Lister interface {
// List应该会返回一个type list对象;这个
// List should return a list type object; the Items field will be extracted, and the
// ResourceVersion field will be used to start the watch in the right place.
List(options metav1.ListOptions) (runtime.Object, error)
}
// watcher是一个知道如何开始watch一个资源的对象
// Watcher is any object that knows how to start a watch on a resource.
type Watcher interface {
// Watch should begin a watch at the specified version.
Watch(options metav1.ListOptions) (watch.Interface, error)
}
// ListerWatcher是一个进行初始化list和watch一个资源的对象
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
Lister
Watcher
}
// ListFunc knows how to list resources
type ListFunc func(options metav1.ListOptions) (runtime.Object, error)
// WatchFunc knows how to watch resources
type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)
// ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface.
// It is a convenience function for users of NewReflector, etc.
// ListFunc and WatchFunc must not be nil
type ListWatch struct {
ListFunc ListFunc
WatchFunc WatchFunc
// DisableChunking requests no chunking for this list watcher.
DisableChunking bool
}
// Getter interface knows how to access Get method from RESTClient.
// Getter接口知道如何从RESTClient访问Get方法。
type Getter interface {
Get() *restclient.Request
}
// NewListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector.
func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
optionsModifier := func(options *metav1.ListOptions) {
options.FieldSelector = fieldSelector.String()
}
return NewFilteredListWatchFromClient(c, resource, namespace, optionsModifier)
}
// NewFilteredListWatchFromClient creates a new ListWatch from the specified client, resource, namespace, and option modifier.
// Option modifier is a function takes a ListOptions and modifies the consumed ListOptions. Provide customized modifier function
// to apply modification to ListOptions with a field selector, a label selector, or any other desired options.
// NewFilteredListWatchFromClient从指定的客户端、资源、命名空间和选项修饰符创建新的ListWatch。选项修饰符是一个函数,它接受ListOptions并修改已使用的ListOptions。提供自定义修改器功能,以使用字段选择器、标签选择器或任何其他所需选项对ListOptions进行修改。
func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Do(context.TODO()).
Get()
}
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
options.Watch = true
optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Watch(context.TODO())
}
return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}
// List a set of apiserver resources
// 列出一组apiserver资源
func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
// ListWatch is used in Reflector, which already supports pagination.
// Don't paginate here to avoid duplication.
// ListWatch用于Reflector,它已经支持分页。不要在此处分页以避免重复。
return lw.ListFunc(options)
}
// Watch a set of apiserver resources
// watch一组apiserver资源
func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {
return lw.WatchFunc(options)
}
Reflector反射器中最核心的ListAndWatch实现,从上面实现我们可以看出获取到的数据最终都存储到本地的store,也就是DelteFIFO。
- Reflector利用ClientSet客户端List全量对象
- 将全量对象采用Replace接口同步到本地Store(DelFIFO)中,并更新资源的版本号
- 开启一个协程定时执行resync操作,同步就是把全量对象以同步事件的方式通知出去
- 然后ClientSet客户端监控Watch资源,监控当前资源股版本号以后的对象
- 一旦对象发生变化,那么就会根据变化类型(新增、更新、删除)调用DeltaFIFO相应接口,产生一个相应对象的Delta,同时更新当前资源的版本
更多推荐
所有评论(0)