client-go: Informer机制之reflector源码分析
client-go: Informer机制之reflector源码分析目的为了能充分了解Inform机制的原理,我们需要了解Inform机制的起点——reflector,那么,reflector是如何将数据从api-server拉下来?又是如何将数据存入本地的呢?解决这两个疑问就是本篇文章的重点。希望大家也能在此过程中能顺便了解k8s中list-watch机制缓存对象数据的原理。源码分析的大致流程
client-go: Informer机制之reflector源码分析
目的
为了能充分了解Inform机制的原理,我们需要了解Inform机制的起点——reflector,那么,reflector是如何将数据从api-server拉下来?又是如何将数据存入本地的呢?解决这两个疑问就是本篇文章的重点。希望大家也能在此过程中能顺便了解k8s中list-watch机制缓存对象数据的原理。
源码分析的大致流程
首先,需要了解reflector结构体中的各个属性,然后是reflector是如何初始化,最后针对reflector中几个核心方法进行源码解析。
reflector
reflector直接英译结果是反光板,我觉得开发人员想表达的意思是一种映射/对称的含义,就是将api-server中对象数据获取到并实时的更新进本地,使得本地数据和etcd数据的完全一样,我觉得本地对象数据可以称为k8s资源数据的一份快照。reflector实际的作用是监控指定资源的Kubernetes资源,当监控的资源发生变化时触发相应的变更事件,例如Added(资源添加)事件、Updated(资源更新)事件、Deleted(资源删除)事件,并将其资源对象存放到本地缓存DeltaFIFO中。
reflector属性
看下reflector结构体的属性以及里面的注释
type Reflector struct {
name string // 名称
expectedTypeName string // 期待的事件类型名称,用于判断和监控到的事件是否一致
expectedType reflect.Type // 期待事件类型,用于判断和监控到的事件是否一致
expectedGVK *schema.GroupVersionKind // 期待的GVK,用于判断和监控到的对象的GVK是否一致
store Store // deltalFIFO队列还是Indexer
listerWatcher ListerWatcher // 封装list 和 watch接口的实例
backoffManager wait.BackoffManager // 退避管理器
resyncPeriod time.Duration // 重新同步的间隔
ShouldResync func() bool // 标记是否开启重新同步
clock clock.Clock // 时钟
paginatedResult bool // list的时候是否需要分页
lastSyncResourceVersion string // 上一次同步的资源版本
isLastSyncResourceVersionUnavailable bool // 上一次同步资源不可用状态
lastSyncResourceVersionMutex sync.RWMutex // 上一次同步资源版本的读写锁
WatchListPageSize int64 //页大小
watchErrorHandler WatchErrorHandler // watch接口的错误处理
}
reflector的初始化
这边可以看到有三种方式:NewReflector,NewNamedReflector和NewNamespaceKeyedIndexerAndReflector
// 传入listwatcher对象,期待类型,deltafifo,重新同步周期
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
// 调用的下面的新建方法
return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}
// 与上一个初始化的区别在于可以摄入Name
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
realClock := &clock.RealClock{}
r := &Reflector{
name: name, // 设置名字
listerWatcher: lw, // listWatcher
store: store, // 本地存储
backoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock), // 退避管理器
resyncPeriod: resyncPeriod, // 重新同步周期
clock: realClock, // 时钟
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler), // 错误处理器
}
r.setExpectedType(expectedType) // 设置期待类型
return r
}
// 新建Indexer和reflector。
func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) {
// index指定KeyFunc
indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{NamespaceIndex: MetaNamespaceIndexFunc})
reflector = NewReflector(lw, expectedType, indexer, resyncPeriod) // 调用第一个函数
return indexer, reflector
}
reflector中的核心方法
1、Run方法:
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {// 将根据backoffManager周期性运行这个函数
if err := r.ListAndWatch(stopCh); err != nil { // 调用listAndWatch方法
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}
2、ListAndWatch方法:
这个方法有点长,主要分为list、定时同步和watch三个部分;
-
List部分逻辑:设置分页参数;执行list方法;将list结果同步进DeltaFIFO队列中;
-
定时同步:定时同步以协程的方式运行,使用定时器实现定期同步;
-
Watch部分逻辑:在for循环里;执行watch函数获取resultchan;监听resultchan中数据并处理;
以下是ListAndWatch方法的逻辑图:
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
var resourceVersion string
options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}// list接口的参数,设置lastSyncResourceVersion上一同步版本
if err := func() error {// 匿名函数
initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
defer initTrace.LogIfLong(10 * time.Second)
var list runtime.Object
var paginatedResult bool
var err error
listCh := make(chan struct{}, 1) // list通到
panicCh := make(chan interface{}, 1) // panic错误通道
go func() { // 以协程方式运行
defer func() {
if r := recover(); r != nil {
panicCh <- r
}
}()
// 新建pager,放入list方法作为处理函数
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.List(opts) // 该方法返回list结果
}))
switch {
case r.WatchListPageSize != 0:// 设置分页大小
pager.PageSize = r.WatchListPageSize
case r.paginatedResult:
case options.ResourceVersion != "" && options.ResourceVersion != "0": // 资源版本已经有了
pager.PageSize = 0
}
list, paginatedResult, err = pager.List(context.Background(), options) // list的结果放在list变量中
if isExpiredError(err) || isTooLargeResourceVersionError(err) {
r.setIsLastSyncResourceVersionUnavailable(true)
list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}) // 尝试重新获取List结果
}
close(listCh) // 关闭通道
}()
// 检查三项类似检查开关的配置,都没问题才继续
select {
case <-stopCh: // 是否被停止
return nil
case r := <-panicCh: // 是否发生无法弥补的错误
panic(r)
case <-listCh:// 收到list通道的关闭信息,说明list的记过已经有了,就在list变量中
}
if err != nil {
return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err)
}
if options.ResourceVersion == "0" && paginatedResult {
r.paginatedResult = true
}
r.setIsLastSyncResourceVersionUnavailable(false) // list was successful
initTrace.Step("Objects listed")
listMetaInterface, err := meta.ListAccessor(list) // 解析List
if err != nil {
return fmt.Errorf("unable to understand list result %#v: %v", list, err)
}
resourceVersion = listMetaInterface.GetResourceVersion()// 获取资源版本
initTrace.Step("Resource version extracted")
items, err := meta.ExtractList(list)// 从list对象中获取对象数组
if err != nil {
return fmt.Errorf("unable to understand list result %#v (%v)", list, err)
}
initTrace.Step("Objects extracted")
// 将数据塞入deltaFIFO中
if err := r.syncWith(items, resourceVersion); err != nil { // 这边将list的结果items的数据放入detalFIFO中
return fmt.Errorf("unable to sync list result: %v", err)
}
initTrace.Step("SyncWith done")
r.setLastSyncResourceVersion(resourceVersion)
initTrace.Step("Resource version updated")
return nil
}(); err != nil {
return err
}
resyncerrc := make(chan error, 1) // 重新同步错误通道
cancelCh := make(chan struct{}) // 取消通道
defer close(cancelCh)
go func() { // 协程,一直再跑
resyncCh, cleanup := r.resyncChan() // 返回重新同步的定时通道,里面有计时器, cleanup是定时器关闭函数
defer func() {
cleanup() // Call the last one written into cleanup
}()
for {
select {
case <-resyncCh: // 定时器 阻塞式, 定时时间到,就跳出阻塞
case <-stopCh: // 是否被停止
return
case <-cancelCh: // 是否被取消
return
}
// 下面是定时重新同步流程
if r.ShouldResync == nil || r.ShouldResync() { // 判断是否应该同步
klog.V(4).Infof("%s: forcing resync", r.name)
// 开始同步,将indexer的数据和deltafifo进行同步
if err := r.store.Resync(); err != nil { // 同步出错
resyncerrc <- err
return // 退出
}
}
cleanup() // 当前定时器停止
resyncCh, cleanup = r.resyncChan() // 重新启用定时器定时 触发 设置
}
}()
// 开始watch 循环
for {
// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
select {
case <-stopCh:
return nil
default:
}
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) // 超时时间设定,避免夯住
options = metav1.ListOptions{ // watch接口的参数
ResourceVersion: resourceVersion,
TimeoutSeconds: &timeoutSeconds,
AllowWatchBookmarks: true,
}
start := r.clock.Now()
w, err := r.listerWatcher.Watch(options) // 执行watch,返回结果中有resultChan,就是w
if err != nil {
if utilnet.IsConnectionRefused(err) { // 拒绝连接的话,需要重试
time.Sleep(time.Second)
continue
}
return err
}
// 调用watch长连接,从通道中获取值,要是通道关闭就退出, watch的处理函数
if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
if err != errorStopRequested {
switch {
case isExpiredError(err): // 超时错误
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
default:
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
}
}
return nil
}
}
}
3、LastSyncResourceVersion
该函数主要获取上一次同步的资源版本
func (r *Reflector) LastSyncResourceVersion() string {
r.lastSyncResourceVersionMutex.RLock()
defer r.lastSyncResourceVersionMutex.RUnlock()
return r.lastSyncResourceVersion
}
4、resyncChan
返回一个定时通道和清理函数,清理函数就是停止计时器。这边的定时重新同步是使用定时器实现的。
func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
if r.resyncPeriod == 0 { // 未设置重新同步周期
return neverExitWatch, func() bool { return false }
}
t := r.clock.NewTimer(r.resyncPeriod) // 定时器
return t.C(), t.Stop
}
5、syncWith
将从apiserver list的资源对象结果同步进DeltaFIFO队列中,调用队列的Replace方法实现。
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
found := make([]interface{}, 0, len(items))
for _, item := range items {
found = append(found, item)
}
return r.store.Replace(found, resourceVersion)
}
6、watchHandler
watch的处理:接收watch的接口作为参数,watch接口对外方法是Stop和Resultchan,前者关闭结果通道,后者获取通道。
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
eventCount := 0
defer w.Stop() // 关闭watch通道
loop:
for {
select {
case <-stopCh:
return errorStopRequested // 收到停止通道的
case err := <-errc: // 错误通道
return err
case event, ok := <-w.ResultChan(): // 从resultChan通道中获取事件
if !ok { // 通道被关闭
break loop // 跳出循环
}
if event.Type == watch.Error { // 事件类型是ERROR
return apierrors.FromObject(event.Object)
}
if r.expectedType != nil { // 查看reflector是设置了期望获取的资源类型
// 这是在判断期待的类型和监听到的事件类型是否一致
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
continue
}
}
if r.expectedGVK != nil {
// GVK是否一致
if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
continue
}
}
meta, err := meta.Accessor(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
continue
}
newResourceVersion := meta.GetResourceVersion()
switch event.Type { // 根据事件类型,对delta队列进行增删改操作
case watch.Added: // 创建事件
err := r.store.Add(event.Object) // 将该事件放入deltalFIFO
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
}
case watch.Modified:
err := r.store.Update(event.Object) // 将该事件放入deltalFIFO
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
}
case watch.Deleted:
err := r.store.Delete(event.Object) // 将该事件放入deltalFIFO
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
}
case watch.Bookmark: // 意思是”表示监听已在此处同步,只需更新
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
}
*resourceVersion = newResourceVersion
r.setLastSyncResourceVersion(newResourceVersion)
eventCount++
}
}
watchDuration := r.clock.Since(start)
if watchDuration < 1*time.Second && eventCount == 0 {
return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
}
klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount)
return nil
}
7、relistResourceVersion
返回上一次同步资源版本
func (r *Reflector) relistResourceVersion() string {
r.lastSyncResourceVersionMutex.RLock() // 上锁
defer r.lastSyncResourceVersionMutex.RUnlock()
if r.isLastSyncResourceVersionUnavailable { // 上次失败,返回空
return ""
}
if r.lastSyncResourceVersion == "" { // 上次同步为空,返回0
return "0"
}
return r.lastSyncResourceVersion // 返回该有数值
}
总结
本篇主要讲解了reflector的源码实现,它在informer机制中的主要的功能就是使用ListAndWatch从api server中实时获取资源对象数据,然后将资源对象放入DeltaFIFO队列,相关逻辑在Run方法、ListAndWatch方法和watchHandler方法中。
更多推荐
所有评论(0)