背景
- 通常在运维开发中, 为了避免后端存储的压力,都会增加一个缓存,缓存资源数据,避免每次都从后端获取
- 今天介绍另外一个好玩的东西watchCache
- watchCache是一个固定大小的用于监测缓存数据变化的数组
主要作用是缓存固定大小的最新的数据, 因为有些情况下,我们并不一定需要一个全量的数据, 比如在k8s开始watch一个资源对象, 那其实只需要吧watch的对应的版本最新的数据,给你最近一批被修改的数据就可以了,并不需要全部的数据
数据结构
- 事件类型
// EventType 事件类型
type EventType string
const (
// Added 事件类型
Added EventType = "ADDED"
// Moidifed 修改
Moidifed EventType = "MODIFED"
// Deleted 删除
Deleted EventType = "DELETED"
// Error 错误
Error EventType = "ERROR"
)
// Event 事件类型
type Event struct {
Type EventType
Object Object
}
复制代码
- 存储
// Store Object对象存储接口
type Store interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
Get(obj interface{}) (item interface{}, exits bool, err error)
GetByKey(key string) (item interface{}, exits bool, err error)
Replace([]interface{}, string) error
}
复制代码
- 存储事件
// storeElement 存储通过验证后的数据, 避免在list/watch多次计算
type storeElement struct {
Key string
Object Object
Labels Set
Fields Set
Uninitialized bool
}
复制代码
- Versioner
// Versioner Object版本信息接口
type Versioner interface {
// 更新对象缓存
UpdateObject(obj Object, resourceVersion uint64) error
ObjectResourceVersion(obj Object) (uint64, error)
ParseResourceVersion(resourceVersion string) (uint64, error)
}
复制代码
- watchCacheEvent
// watchCacheEvent 观察到事件对象
type watchCacheEvent struct {
// 事件类型
Type EventType
// 获取修改后的对象数据
Object Object
ObjLabels Set
ObjFields Set
ObjUninitialized bool
// 保存之前的数据
PrevObject Object
PrevObjFields Set
PrevObjLables Set
// 之前对象是否被初始化
PrevObjUninitialized bool
Key string
ResourceVersion uint64
}
复制代码
- watchCacheElement
type watchCacheElement struct {
resourceVersion uint64
watchCacheEvent *watchCacheEvent
}
复制代码
- watchCache
type watchCache struct {
sync.RWMutex
// 等待获取刷新数据
cond *sync.Cond
// 容量
capacity int
// 获取对象的key
keyFunc func(Object) (string, error)
// 获取对象的属性, 第一个set是所有的label, 第二个是属性
getAttrsFunc func(Object) (Set, Set, bool, error)
//
cache []watchCacheElement
startIndex int
endIndex int
// 存储数据
store Store
resourceVersion uint64
onEvent func(*watchCacheEvent)
onReplace func()
versioner Versioner
clock Clock
}
复制代码
转换流程
- 调用对外暴漏的Get/Add/Update/Delete接口, 进行数据格式化: Event(事件数据)、updateFunc(后端存储的具体操作)、resourceVersion(当前数据的版本, 通过versioner实现)
- 调用processEvent开始进行逻辑处理
- 通过传入的Event和store获取的Object(存储中当前最新的数据)组装成storeElement
- 将storeElement和resourceversion组合成watCacheEvent
- 调用updateCache更新缓存
- 调用updateFunc, 更新后端存储数据
完整代码
package cacher
import (
"fmt"
"sort"
"sync"
"time"
)
// EventType 事件类型
type EventType string
const (
// Added 事件类型
Added EventType = "ADDED"
// Moidifed 修改
Moidifed EventType = "MODIFED"
// Deleted 删除
Deleted EventType = "DELETED"
// Error 错误
Error EventType = "ERROR"
)
const blockTimeout = 3 * time.Second
// Event 事件类型
type Event struct {
Type EventType
Object Object
}
// Clock 时间
type Clock struct{}
// Now 当前时间
func (Clock) Now() time.Time {
return time.Now()
}
// After Same as time.After(d).
func (Clock) After(d time.Duration) <-chan time.Time {
return time.After(d)
}
// Since returns time since the specified timestamp.
func (Clock) Since(ts time.Time) time.Duration {
return time.Since(ts)
}
// Object 资源抽象基类
type Object interface{}
// Set 获取对象属性的时,返回对象的属性和label集合
type Set map[string]string
// Store Object对象存储接口
type Store interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
List() []interface{}
Get(obj interface{}) (item interface{}, exits bool, err error)
GetByKey(key string) (item interface{}, exits bool, err error)
Replace([]interface{}, string) error
}
// storeElement 存储通过验证后的数据, 避免在list/watch多次计算
type storeElement struct {
Key string
Object Object
Labels Set
Fields Set
Uninitialized bool
}
// Versioner Object版本信息接口
type Versioner interface {
// 更新对象缓存
UpdateObject(obj Object, resourceVersion uint64) error
ObjectResourceVersion(obj Object) (uint64, error)
ParseResourceVersion(resourceVersion string) (uint64, error)
}
// watchCacheEvent 观察到事件对象
type watchCacheEvent struct {
// 事件类型
Type EventType
// 获取修改后的对象数据
Object Object
ObjLabels Set
ObjFields Set
ObjUninitialized bool
// 保存之前的数据
PrevObject Object
PrevObjFields Set
PrevObjLables Set
// 之前对象是否被初始化
PrevObjUninitialized bool
Key string
ResourceVersion uint64
}
type watchCacheElement struct {
resourceVersion uint64
watchCacheEvent *watchCacheEvent
}
type watchCache struct {
sync.RWMutex
// 等待获取刷新数据
cond *sync.Cond
// 容量
capacity int
// 获取对象的key
keyFunc func(Object) (string, error)
// 获取对象的属性, 第一个set是所有的label, 第二个是属性
getAttrsFunc func(Object) (Set, Set, bool, error)
//
cache []watchCacheElement
startIndex int
endIndex int
// 存储数据
store Store
resourceVersion uint64
onEvent func(*watchCacheEvent)
onReplace func()
versioner Versioner
clock Clock
}
func (w *watchCache) Get(obj interface{}) (interface{}, bool, error) {
object, ok := obj.(Object)
if !ok {
return nil, false, fmt.Errorf("obj does not implememnt Object interfa: %v", obj)
}
key, err := w.keyFunc(object)
if err != nil {
return nil, false, fmt.Errorf("couldn't compute key: %v", err)
}
return w.store.Get(&storeElement{Key: key, Object: object})
}
func (w *watchCache) Add(obj interface{}) error {
object, resourceVersion, err := w.objectToVersionRuntimeObject(obj)
if err != nil {
return err
}
event := Event{Type: Added, Object: object}
f := func(elem *storeElement) error { return w.store.Add(elem) }
return w.processEvent(event, resourceVersion, f)
}
func (w *watchCache) Update(obj interface{}) error {
object, resourceVersion, err := w.objectToVersionRuntimeObject(obj)
if err != nil {
return err
}
event := Event{Type: Moidifed, Object: object}
f := func(elem *storeElement) error { return w.store.Update(elem) }
return w.processEvent(event, resourceVersion, f)
}
func (w *watchCache) Delete(obj interface{}) error {
object, resourceVersion, err := w.objectToVersionRuntimeObject(obj)
if err != nil {
return err
}
event := Event{Type: Deleted, Object: object}
f := func(elem *storeElement) error { return w.store.Add(elem) }
return w.processEvent(event, resourceVersion, f)
}
func (w *watchCache) SetOnEvent(onEvent func(*watchCacheEvent)) {
w.Lock()
defer w.Unlock()
w.onEvent = onEvent
}
func (w *watchCache) SetOnReplace(onReplace func()) {
w.Lock()
defer w.Unlock()
w.onReplace = onReplace
}
func (w *watchCache) objectToVersionRuntimeObject(obj interface{}) (Object, uint64, error) {
object, ok := obj.(Object)
if !ok {
return nil, 0, fmt.Errorf("obj does not implement Object interface: %v", obj)
}
resourceVersion, err := w.versioner.ObjectResourceVersion(object)
if err != nil {
return nil, 0, err
}
return object, resourceVersion, nil
}
// GetAllEventsSinceThreadUnsafe 保存了最近更新的缓存, 当游list/watch请求的时候,直接从当前cache里面获取对应数据, 从而避免对list/watch都进行后端数据的查询
func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*watchCacheEvent, error) {
size := w.endIndex - w.startIndex
oldest := w.resourceVersion + 1
if size > 0 {
oldest = w.cache[w.startIndex%w.capacity].resourceVersion
}
if resourceVersion == 0 {
allItems := w.store.List()
result := make([]*watchCacheEvent, len(allItems))
for i, item := range allItems {
elem, ok := item.(*storeElement)
if !ok {
return nil, fmt.Errorf("not a storeElement: %v", elem)
}
objLabels, objFields, objUninitialized, err := w.getAttrsFunc(elem.Object)
if err != nil {
return nil, err
}
result[i] = &watchCacheEvent{
Type: Added,
Object: elem.Object,
ObjLabels: objLabels,
ObjFields: objFields,
ObjUninitialized: objUninitialized,
Key: elem.Key,
ResourceVersion: w.resourceVersion,
}
}
return result, nil
}
if resourceVersion < oldest-1 {
return nil, fmt.Errorf("too old reosurce version:%d (%d)", resourceVersion, oldest-1)
}
f := func(i int) bool {
return w.cache[(w.startIndex+i)%w.capacity].resourceVersion > resourceVersion
}
first := sort.Search(size, f)
result := make([]*watchCacheEvent, size-first)
for i := 0; i < size-first; i++ {
result[i] = w.cache[(w.startIndex+first+i)%w.capacity].watchCacheEvent
}
return result, nil
}
func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
version, err := w.versioner.ParseResourceVersion(resourceVersion)
if err != nil {
return err
}
toReplace := make([]interface{}, len(objs))
for _, obj := range objs {
object, ok := obj.(Object)
if !ok {
return fmt.Errorf("didn't get Object for replace: %v", obj)
}
key, err := w.keyFunc(obj)
if err != nil {
return fmt.Errorf("couldn't compute key: %v", err)
}
objLabels, objFields, objUninitialized, err := w.getAttrsFunc(object)
if err != nil {
return err
}
toReplace = append(toReplace, &storeElement{
Key: key,
Object: object,
Labels: objLabels,
Fields: objFields,
Uninitialized: objUninitialized,
})
}
w.Lock()
defer w.Unlock()
w.startIndex = 0
w.endIndex = 0
if err := w.store.Replace(toReplace, resourceVersion); err != nil {
return err
}
w.resourceVersion = version
if w.onReplace != nil {
w.onReplace()
}
w.cond.Broadcast()
return nil
}
func (w *watchCache) processEvent(event Event, resourceVersion uint64, updateFunc func(*storeElement) error) error {
key, err := w.keyFunc(event.Object)
if err != nil {
return fmt.Errorf("coundln't compute key: %v", err)
}
elem := &storeElement{Key: key, Object: event.Object}
elem.Labels, elem.Fields, elem.Uninitialized, err = w.getAttrsFunc(event.Object)
if err != nil {
return err
}
watchCacheEvent := &watchCacheEvent{
Type: event.Type,
Object: event.Object,
ObjLabels: elem.Labels,
ObjFields: elem.Fields,
Key: key,
ResourceVersion: resourceVersion,
}
w.Lock()
defer w.Unlock()
previous, exits, err := w.store.Get(elem)
if err != nil {
return err
}
if exits {
previousElem := previous.(*storeElement)
watchCacheEvent.PrevObject = previousElem.Object
watchCacheEvent.PrevObjLables = previousElem.Labels
watchCacheEvent.PrevObjLables = previousElem.Fields
watchCacheEvent.PrevObjUninitialized = previousElem.Uninitialized
}
if w.onEvent != nil {
w.onEvent(watchCacheEvent)
}
w.updateCache(resourceVersion, watchCacheEvent)
w.resourceVersion = resourceVersion
w.cond.Broadcast()
return updateFunc(elem)
}
func (w *watchCache) updateCache(resourceVersion uint64, event *watchCacheEvent) {
if w.endIndex == w.startIndex+w.capacity {
// 容量已满
w.startIndex++
}
w.cache[w.endIndex%w.capacity] = watchCacheElement{resourceVersion, event}
w.endIndex++
}
func (w *watchCache) WaitUntilFreshAndGet(resourceVersion uint64, key string) (interface{}, bool, uint64, error) {
err := w.waitUtilFreshAndBlock(resourceVersion)
defer w.RUnlock()
if err != nil {
return nil, false, 0, err
}
value, exists, err := w.store.GetByKey(key)
return value, exists, w.resourceVersion, err
}
func (w *watchCache) WaitUtilFreshAndList(resourceVersion uint64) ([]interface{}, uint64, error) {
err := w.waitUtilFreshAndBlock(resourceVersion)
defer w.RUnlock()
if err != nil {
return nil, 0, err
}
return w.store.List(), w.resourceVersion, nil
}
func (w *watchCache) GetByKey(key string) (interface{}, bool, error) {
return w.store.GetByKey(key)
}
func (w *watchCache) waitUtilFreshAndBlock(resourceVersion uint64) error {
startTime := w.clock.Now()
go func() {
<-w.clock.After(blockTimeout)
w.cond.Broadcast()
}()
w.RLock()
for w.resourceVersion < resourceVersion {
if w.clock.Since(startTime) >= blockTimeout {
return fmt.Errorf("Too large resource version: %v, current: %v", resourceVersion, w.resourceVersion)
}
w.cond.Wait()
}
return nil
}
func newWatchCache(
capacity int,
keyFunc func(Object) (string, error),
getAttrsFunc func(Object) (Set, Set, bool, error),
versioner Versioner) *watchCache {
wc := &watchCache{
capacity: capacity,
keyFunc: keyFunc,
getAttrsFunc: getAttrsFunc,
cache: make([]watchCacheElement, capacity),
startIndex: 0,
endIndex: 0,
store: Store{},
resourceVersion: 0,
clock: Clock{},
versioner: versioner,
}
wc.cond = sync.NewCond(wc.RLocker())
return wc
}
复制代码
总结
目前暂时理解了这些,有一点需要注意的, 这个地方是watchCache, 它会将所有的增删改查的操作都放到自己的cache里,但在Get时候,并不回去读取缓存中的数据,只有在watch中才回从当前的cache数组中读取对象, 接下来继续看cache, 后面得反过来再了解下
所有评论(0)