背景

  • 通常在运维开发中, 为了避免后端存储的压力,都会增加一个缓存,缓存资源数据,避免每次都从后端获取
  • 今天介绍另外一个好玩的东西watchCache
  • watchCache是一个固定大小的用于监测缓存数据变化的数组

主要作用是缓存固定大小的最新的数据, 因为有些情况下,我们并不一定需要一个全量的数据, 比如在k8s开始watch一个资源对象, 那其实只需要吧watch的对应的版本最新的数据,给你最近一批被修改的数据就可以了,并不需要全部的数据

数据结构

  • 事件类型
// EventType 事件类型
type EventType string

const (
	// Added 事件类型
	Added EventType = "ADDED"
	// Moidifed 修改
	Moidifed EventType = "MODIFED"
	// Deleted 删除
	Deleted EventType = "DELETED"
	// Error 错误
	Error EventType = "ERROR"
)
// Event 事件类型
type Event struct {
	Type   EventType
	Object Object
}
复制代码
  • 存储
// Store Object对象存储接口
type Store interface {
	Add(obj interface{}) error
	Update(obj interface{}) error
	Delete(obj interface{}) error
	List() []interface{}
	Get(obj interface{}) (item interface{}, exits bool, err error)
	GetByKey(key string) (item interface{}, exits bool, err error)
	Replace([]interface{}, string) error
}
复制代码
  • 存储事件

// storeElement 存储通过验证后的数据, 避免在list/watch多次计算
type storeElement struct {
	Key           string
	Object        Object
	Labels        Set
	Fields        Set
	Uninitialized bool
}
复制代码
  • Versioner
// Versioner  Object版本信息接口
type Versioner interface {
	// 更新对象缓存
	UpdateObject(obj Object, resourceVersion uint64) error
	ObjectResourceVersion(obj Object) (uint64, error)
	ParseResourceVersion(resourceVersion string) (uint64, error)
}
复制代码
  • watchCacheEvent
// watchCacheEvent 观察到事件对象
type watchCacheEvent struct {
	// 事件类型
	Type EventType
	// 获取修改后的对象数据
	Object           Object
	ObjLabels        Set
	ObjFields        Set
	ObjUninitialized bool
	// 保存之前的数据
	PrevObject    Object
	PrevObjFields Set
	PrevObjLables Set
	// 之前对象是否被初始化
	PrevObjUninitialized bool
	Key                  string
	ResourceVersion      uint64
}
复制代码
  • watchCacheElement
type watchCacheElement struct {
	resourceVersion uint64
	watchCacheEvent *watchCacheEvent
}
复制代码
  • watchCache
type watchCache struct {
	sync.RWMutex

	// 等待获取刷新数据
	cond *sync.Cond

	// 容量
	capacity int

	// 获取对象的key
	keyFunc func(Object) (string, error)

	// 获取对象的属性, 第一个set是所有的label, 第二个是属性
	getAttrsFunc func(Object) (Set, Set, bool, error)

	//
	cache      []watchCacheElement
	startIndex int
	endIndex   int

	// 存储数据
	store Store

	resourceVersion uint64

	onEvent   func(*watchCacheEvent)
	onReplace func()

	versioner Versioner

	clock Clock
}
复制代码

转换流程

  • 调用对外暴漏的Get/Add/Update/Delete接口, 进行数据格式化: Event(事件数据)、updateFunc(后端存储的具体操作)、resourceVersion(当前数据的版本, 通过versioner实现)
  • 调用processEvent开始进行逻辑处理
    • 通过传入的Event和store获取的Object(存储中当前最新的数据)组装成storeElement
    • 将storeElement和resourceversion组合成watCacheEvent
    • 调用updateCache更新缓存
    • 调用updateFunc, 更新后端存储数据

完整代码

package cacher

import (
	"fmt"
	"sort"
	"sync"
	"time"
)

// EventType 事件类型
type EventType string

const (
	// Added 事件类型
	Added EventType = "ADDED"
	// Moidifed 修改
	Moidifed EventType = "MODIFED"
	// Deleted 删除
	Deleted EventType = "DELETED"
	// Error 错误
	Error EventType = "ERROR"
)

const blockTimeout = 3 * time.Second

// Event 事件类型
type Event struct {
	Type   EventType
	Object Object
}

// Clock 时间
type Clock struct{}

// Now 当前时间
func (Clock) Now() time.Time {
	return time.Now()
}

// After Same as time.After(d).
func (Clock) After(d time.Duration) <-chan time.Time {
	return time.After(d)
}

// Since returns time since the specified timestamp.
func (Clock) Since(ts time.Time) time.Duration {
	return time.Since(ts)
}

// Object 资源抽象基类
type Object interface{}

// Set 获取对象属性的时,返回对象的属性和label集合
type Set map[string]string

// Store Object对象存储接口
type Store interface {
	Add(obj interface{}) error
	Update(obj interface{}) error
	Delete(obj interface{}) error
	List() []interface{}
	Get(obj interface{}) (item interface{}, exits bool, err error)
	GetByKey(key string) (item interface{}, exits bool, err error)
	Replace([]interface{}, string) error
}

// storeElement 存储通过验证后的数据, 避免在list/watch多次计算
type storeElement struct {
	Key           string
	Object        Object
	Labels        Set
	Fields        Set
	Uninitialized bool
}

// Versioner Object版本信息接口
type Versioner interface {
	// 更新对象缓存
	UpdateObject(obj Object, resourceVersion uint64) error
	ObjectResourceVersion(obj Object) (uint64, error)
	ParseResourceVersion(resourceVersion string) (uint64, error)
}

// watchCacheEvent 观察到事件对象
type watchCacheEvent struct {
	// 事件类型
	Type EventType
	// 获取修改后的对象数据
	Object           Object
	ObjLabels        Set
	ObjFields        Set
	ObjUninitialized bool
	// 保存之前的数据
	PrevObject    Object
	PrevObjFields Set
	PrevObjLables Set
	// 之前对象是否被初始化
	PrevObjUninitialized bool
	Key                  string
	ResourceVersion      uint64
}

type watchCacheElement struct {
	resourceVersion uint64
	watchCacheEvent *watchCacheEvent
}

type watchCache struct {
	sync.RWMutex

	// 等待获取刷新数据
	cond *sync.Cond

	// 容量
	capacity int

	// 获取对象的key
	keyFunc func(Object) (string, error)

	// 获取对象的属性, 第一个set是所有的label, 第二个是属性
	getAttrsFunc func(Object) (Set, Set, bool, error)

	//
	cache      []watchCacheElement
	startIndex int
	endIndex   int

	// 存储数据
	store Store

	resourceVersion uint64

	onEvent   func(*watchCacheEvent)
	onReplace func()

	versioner Versioner

	clock Clock
}

func (w *watchCache) Get(obj interface{}) (interface{}, bool, error) {
	object, ok := obj.(Object)
	if !ok {
		return nil, false, fmt.Errorf("obj does not implememnt Object interfa: %v", obj)
	}
	key, err := w.keyFunc(object)
	if err != nil {
		return nil, false, fmt.Errorf("couldn't compute key: %v", err)
	}
	return w.store.Get(&storeElement{Key: key, Object: object})
}

func (w *watchCache) Add(obj interface{}) error {
	object, resourceVersion, err := w.objectToVersionRuntimeObject(obj)
	if err != nil {
		return err
	}
	event := Event{Type: Added, Object: object}

	f := func(elem *storeElement) error { return w.store.Add(elem) }
	return w.processEvent(event, resourceVersion, f)
}

func (w *watchCache) Update(obj interface{}) error {
	object, resourceVersion, err := w.objectToVersionRuntimeObject(obj)
	if err != nil {
		return err
	}
	event := Event{Type: Moidifed, Object: object}

	f := func(elem *storeElement) error { return w.store.Update(elem) }
	return w.processEvent(event, resourceVersion, f)
}

func (w *watchCache) Delete(obj interface{}) error {
	object, resourceVersion, err := w.objectToVersionRuntimeObject(obj)
	if err != nil {
		return err
	}
	event := Event{Type: Deleted, Object: object}

	f := func(elem *storeElement) error { return w.store.Add(elem) }
	return w.processEvent(event, resourceVersion, f)
}

func (w *watchCache) SetOnEvent(onEvent func(*watchCacheEvent)) {
	w.Lock()
	defer w.Unlock()
	w.onEvent = onEvent
}

func (w *watchCache) SetOnReplace(onReplace func()) {
	w.Lock()
	defer w.Unlock()
	w.onReplace = onReplace
}

func (w *watchCache) objectToVersionRuntimeObject(obj interface{}) (Object, uint64, error) {
	object, ok := obj.(Object)
	if !ok {
		return nil, 0, fmt.Errorf("obj does not implement Object interface: %v", obj)
	}
	resourceVersion, err := w.versioner.ObjectResourceVersion(object)
	if err != nil {
		return nil, 0, err
	}
	return object, resourceVersion, nil
}

// GetAllEventsSinceThreadUnsafe 保存了最近更新的缓存, 当游list/watch请求的时候,直接从当前cache里面获取对应数据, 从而避免对list/watch都进行后端数据的查询
func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*watchCacheEvent, error) {
	size := w.endIndex - w.startIndex
	oldest := w.resourceVersion + 1
	if size > 0 {
		oldest = w.cache[w.startIndex%w.capacity].resourceVersion
	}
	if resourceVersion == 0 {
		allItems := w.store.List()
		result := make([]*watchCacheEvent, len(allItems))
		for i, item := range allItems {
			elem, ok := item.(*storeElement)
			if !ok {
				return nil, fmt.Errorf("not a storeElement: %v", elem)
			}
			objLabels, objFields, objUninitialized, err := w.getAttrsFunc(elem.Object)
			if err != nil {
				return nil, err
			}
			result[i] = &watchCacheEvent{
				Type:             Added,
				Object:           elem.Object,
				ObjLabels:        objLabels,
				ObjFields:        objFields,
				ObjUninitialized: objUninitialized,
				Key:              elem.Key,
				ResourceVersion:  w.resourceVersion,
			}
		}
		return result, nil
	}
	if resourceVersion < oldest-1 {
		return nil, fmt.Errorf("too old reosurce version:%d (%d)", resourceVersion, oldest-1)
	}
	f := func(i int) bool {
		return w.cache[(w.startIndex+i)%w.capacity].resourceVersion > resourceVersion
	}
	first := sort.Search(size, f)
	result := make([]*watchCacheEvent, size-first)
	for i := 0; i < size-first; i++ {
		result[i] = w.cache[(w.startIndex+first+i)%w.capacity].watchCacheEvent
	}
	return result, nil
}

func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
	version, err := w.versioner.ParseResourceVersion(resourceVersion)
	if err != nil {
		return err
	}

	toReplace := make([]interface{}, len(objs))
	for _, obj := range objs {
		object, ok := obj.(Object)
		if !ok {
			return fmt.Errorf("didn't get Object for replace: %v", obj)
		}
		key, err := w.keyFunc(obj)
		if err != nil {
			return fmt.Errorf("couldn't compute key: %v", err)
		}
		objLabels, objFields, objUninitialized, err := w.getAttrsFunc(object)
		if err != nil {
			return err
		}
		toReplace = append(toReplace, &storeElement{
			Key:           key,
			Object:        object,
			Labels:        objLabels,
			Fields:        objFields,
			Uninitialized: objUninitialized,
		})
	}

	w.Lock()
	defer w.Unlock()

	w.startIndex = 0
	w.endIndex = 0
	if err := w.store.Replace(toReplace, resourceVersion); err != nil {
		return err
	}
	w.resourceVersion = version
	if w.onReplace != nil {
		w.onReplace()
	}
	w.cond.Broadcast()
	return nil
}

func (w *watchCache) processEvent(event Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
	key, err := w.keyFunc(event.Object)
	if err != nil {
		return fmt.Errorf("coundln't compute key: %v", err)
	}
	elem := &storeElement{Key: key, Object: event.Object}
	elem.Labels, elem.Fields, elem.Uninitialized, err = w.getAttrsFunc(event.Object)
	if err != nil {
		return err
	}

	watchCacheEvent := &watchCacheEvent{
		Type:            event.Type,
		Object:          event.Object,
		ObjLabels:       elem.Labels,
		ObjFields:       elem.Fields,
		Key:             key,
		ResourceVersion: resourceVersion,
	}

	w.Lock()
	defer w.Unlock()
	previous, exits, err := w.store.Get(elem)
	if err != nil {
		return err
	}
	if exits {
		previousElem := previous.(*storeElement)
		watchCacheEvent.PrevObject = previousElem.Object
		watchCacheEvent.PrevObjLables = previousElem.Labels
		watchCacheEvent.PrevObjLables = previousElem.Fields
		watchCacheEvent.PrevObjUninitialized = previousElem.Uninitialized
	}

	if w.onEvent != nil {
		w.onEvent(watchCacheEvent)
	}

	w.updateCache(resourceVersion, watchCacheEvent)
	w.resourceVersion = resourceVersion
	w.cond.Broadcast()
	return updateFunc(elem)
}

func (w *watchCache) updateCache(resourceVersion uint64, event *watchCacheEvent) {
	if w.endIndex == w.startIndex+w.capacity {
		// 容量已满
		w.startIndex++
	}
	w.cache[w.endIndex%w.capacity] = watchCacheElement{resourceVersion, event}
	w.endIndex++
}

func (w *watchCache) WaitUntilFreshAndGet(resourceVersion uint64, key string) (interface{}, bool, uint64, error) {
	err := w.waitUtilFreshAndBlock(resourceVersion)
	defer w.RUnlock()
	if err != nil {
		return nil, false, 0, err
	}
	value, exists, err := w.store.GetByKey(key)
	return value, exists, w.resourceVersion, err
}

func (w *watchCache) WaitUtilFreshAndList(resourceVersion uint64) ([]interface{}, uint64, error) {
	err := w.waitUtilFreshAndBlock(resourceVersion)
	defer w.RUnlock()
	if err != nil {
		return nil, 0, err
	}
	return w.store.List(), w.resourceVersion, nil
}

func (w *watchCache) GetByKey(key string) (interface{}, bool, error) {
	return w.store.GetByKey(key)
}

func (w *watchCache) waitUtilFreshAndBlock(resourceVersion uint64) error {
	startTime := w.clock.Now()
	go func() {
		<-w.clock.After(blockTimeout)
		w.cond.Broadcast()
	}()

	w.RLock()
	for w.resourceVersion < resourceVersion {
		if w.clock.Since(startTime) >= blockTimeout {
			return fmt.Errorf("Too large resource version: %v, current: %v", resourceVersion, w.resourceVersion)
		}
		w.cond.Wait()
	}
	return nil
}

func newWatchCache(
	capacity int,
	keyFunc func(Object) (string, error),
	getAttrsFunc func(Object) (Set, Set, bool, error),
	versioner Versioner) *watchCache {
	wc := &watchCache{
		capacity:        capacity,
		keyFunc:         keyFunc,
		getAttrsFunc:    getAttrsFunc,
		cache:           make([]watchCacheElement, capacity),
		startIndex:      0,
		endIndex:        0,
		store:           Store{},
		resourceVersion: 0,
		clock:           Clock{},
		versioner:       versioner,
	}
	wc.cond = sync.NewCond(wc.RLocker())
	return wc
}
复制代码

总结

目前暂时理解了这些,有一点需要注意的, 这个地方是watchCache, 它会将所有的增删改查的操作都放到自己的cache里,但在Get时候,并不回去读取缓存中的数据,只有在watch中才回从当前的cache数组中读取对象, 接下来继续看cache, 后面得反过来再了解下

转载于:https://juejin.im/post/5b70073a51882561131aa574

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐