client-go源码分析--Reflector
前言在informer机制流程分析中,简单介绍并梳理了Reflector函数执行流程。整个controller framework中,Reflector的角色是生产者,从k8s api-server中获取runtime.Object,并添加到DeltaFIFO中;indexer是消费者,从DeltaFIFO中获取并处理runtime.Object。1 Reflector创建和运行在c...
前言
在informer机制流程分析中,简单介绍并梳理了Reflector函数执行流程。
整个controller framework中,Reflector的角色是生产者,从k8s api-server中获取runtime.Object,并添加到DeltaFIFO中;indexer是消费者,从DeltaFIFO中获取并处理runtime.Object。
1 Reflector创建和运行
在controller执行Run方法是创建,然后运行,实际是在 func (r *Reflector) ListAndWatch
中执行的。
func (c *controller) Run(stopCh <-chan struct{}) {
。。。
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
。。。
wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)
}
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
wait.Until(func() {
if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err)
}
}, r.period, stopCh)
}
2 List&Watch
2.1 简介
Controller framework通过List&Watch机制完成k8s资源对象和Index缓存之间的同步。
- List: controller启动时执行一次,k8s资源对象全量更新到Indexer。
- Watch: controller运行时持续监控,k8s资源对象增量更新到Indexer。
2.2 List&Watch的实现
接下来我们详细分析函数func (r *Reflector) ListAndWatch
,流程图如下:
具体代码如下:
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)
var resourceVersion string
// Explicitly set "0" as resource version - it's fine for the List()
// to be served from cache and potentially be delayed relative to
// etcd contents. Reflector framework will catch up via Watch() eventually.
options := metav1.ListOptions{ResourceVersion: "0"}
if err := func() error {
initTrace := trace.New("Reflector " + r.name + " ListAndWatch")
defer initTrace.LogIfLong(10 * time.Second)
var list runtime.Object
var err error
listCh := make(chan struct{}, 1)
panicCh := make(chan interface{}, 1)
go func() { // 启动一个子协程(goroutine)执行List,主协程阻塞等待List执行完成。
defer func() {
if r := recover(); r != nil {
panicCh <- r
}
}()
// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
// list request will return the full response.
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.List(opts)
}))
if r.WatchListPageSize != 0 {
pager.PageSize = r.WatchListPageSize
}
// Pager falls back to full list if paginated list calls fail due to an "Expired" error.
list, err = pager.List(context.Background(), options)
close(listCh)
}()
// 主协程阻塞等待List执行完成。
select {
case <-stopCh:
return nil
case r := <-panicCh:
panic(r)
case <-listCh:
}
if err != nil {
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
}
initTrace.Step("Objects listed")
listMetaInterface, err := meta.ListAccessor(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
}
resourceVersion = listMetaInterface.GetResourceVersion()
initTrace.Step("Resource version extracted")
items, err := meta.ExtractList(list) // Meta.ExtractList(list)把List结果转化成runtime.Object数组。
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
}
initTrace.Step("Objects extracted")
if err := r.syncWith(items, resourceVersion); err != nil { // 写入DeltaFIFO,全量同步到Indexer。
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
}
initTrace.Step("SyncWith done")
r.setLastSyncResourceVersion(resourceVersion)
initTrace.Step("Resource version updated")
return nil
}(); err != nil {
return err
}
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
// reflector这里有一个非常有趣的地方,就是resyncPeriod这个参数,根据源代码注释,看起来是每个一段时间完整调用List API,全
// 量更新数据,但是实际上,这个更新是 把Index中的所有数据 重新同步到DeltaFIFO。
go func() {
resyncCh, cleanup := r.resyncChan() // resyncCh返回的就是一个定时器,如果resyncPeriod这个为0那么就会返回一个永久定时器,cleanup函数是用来清理定时器的
defer func() {
cleanup() // Call the last one written into cleanup
}()
for {
select {
case <-resyncCh:
case <-stopCh:
return
case <-cancelCh:
return
}
if r.ShouldResync == nil || r.ShouldResync() {
klog.V(4).Infof("%s: forcing resync", r.name)
//这里的store是deltafifo,在sharedIndexInformer.run()中定义
if err := r.store.Resync(); err != nil {
resyncerrc <- err
return
}
}
cleanup()
resyncCh, cleanup = r.resyncChan()
}
}()
for {
// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
select {
case <-stopCh:
return nil
default:
}
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options = metav1.ListOptions{
ResourceVersion: resourceVersion,
// We want to avoid situations of hanging watchers. Stop any wachers that do not
// receive any events within the timeout window.
TimeoutSeconds: &timeoutSeconds,
// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
// Reflector doesn't assume bookmarks are returned at all (if the server do not support
// watch bookmarks, it will ignore this field).
AllowWatchBookmarks: true,
}
w, err := r.listerWatcher.Watch(options)
if err != nil {
switch err {
case io.EOF:
// watch closed normally
case io.ErrUnexpectedEOF:
klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err)
default:
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))
}
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
// It doesn't make sense to re-list all objects because most likely we will be able to restart
// watch where we ended.
// If that's the case wait and resend watch request.
if urlError, ok := err.(*url.Error); ok {
if opError, ok := urlError.Err.(*net.OpError); ok {
if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {
time.Sleep(time.Second)
continue
}
}
}
return nil
}
// r.WatchHandler增量同步runtime.Object到indexer。
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
if err != errorStopRequested {
switch {
case apierrs.IsResourceExpired(err):
klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
default:
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
}
}
return nil
}
}
}
- Reflector.Run()调用ListAndWatch(), 启动一个子协程(goroutine)执行List,主协程阻塞等待List执行完成。
- Meta.ExtractList(list)把List结果转化成runtime.Object数组。
- r.syncWith(items, resourceVersion)写入DeltaFIFO,全量同步到Indexer。
- r.resyncChan()也是在一个子协程里执行。
- 循环执行r.ListerWatcher.Watch(optiopns)。
- r.WatchHandler增量同步runtime.Object到indexer。
2.3 r.ListerWatcher.List & r.ListerWatcher.Watch
ListerWatcher是接口类型,定义如下:
type ListerWatcher interface {
Lister
Watcher
}
listwatch.go文件,NewListWatchFromClient创建了ListWatch对象,该对象实现了ListerWatcher interface。r.ListerWatcher.List(options)和r.ListerWatcher.Watch(options),是调用了某个实现了ListerWatcher接口的对象的List,Watch函数。当用户用NewListWatchFromClient创建一个ListWatch对象时,r.ListerWatcher.List(options)实际上调用了NewFilteredListWatchFromClient里的listFunc,而r.ListerWatcher.Watch(options)调用了watchFunc。
type ListWatch struct {
ListFunc ListFunc
WatchFunc WatchFunc
// DisableChunking requests no chunking for this list watcher.
DisableChunking bool
}
func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
optionsModifier := func(options *metav1.ListOptions) {
options.FieldSelector = fieldSelector.String()
}
return NewFilteredListWatchFromClient(c, resource, namespace, optionsModifier)
}
// List a set of apiserver resourcesfunc (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
if !lw.DisableChunking {
return pager.New(pager.SimplePageFunc(lw.ListFunc)).List(context.TODO(), options)
}
return lw.ListFunc(options)
}
// Watch a set of apiserver resourcesfunc (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {
return lw.WatchFunc(options)
}
2.4 r.syncWith(items, resourceVersion)
r.syncWith --> r.store.Replace --> func (c *cache) Replace --> c.cacheStorage.Replace --> func (c *threadSafeMap) Replace
所以实际调用的是func (c *threadSafeMap) Replace方法。
2.5 r.watchHandlerr.watchHandler
从watch.Interface中获取event(Add,Modified,Deleted),并调用DeltaFIFO的Add,Modified,Deleted函数触发DeltaFIFO对象增量同步。
2.6 r.resyncChan()
Resync核心函数是r.store.Resync(),这里的store是deltafifo,在sharedIndexInformer.run()中定义。该函数实际上调用DeltaFIFO的Resync函数做对象周期性同步。函数的本质是从localstore中遍历item,将Delta{sync, obj}写入到DeltaFIFO中。
参考
https://www.cnblogs.com/charlieroro/p/10330390.html
https://blog.csdn.net/weixin_42663840/article/details/81699303
https://blog.csdn.net/li_101357/article/details/89763992
https://www.jianshu.com/p/1daeae7b6970
更多推荐
所有评论(0)