Path Cache

Path Cache其实就是用于对zk节点的监听。不论是子节点的新增、更新或者移除的时候,Path Cache都能对子节点集合的状态和数据变化做出响应。

1. 关键 API

org.apache.curator.framework.recipes.cache.PathChildrenCache

org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent

org.apache.curator.framework.recipes.cache.PathChildrenCacheListener

org.apache.curator.framework.recipes.cache.ChildData

2. 机制说明

PathChildrenCache内部使用一个命令模式来封装各种操作:

  • 操作接口:org.apache.curator.framework.recipes.cache.Operation
    • 刷新操作:org.apache.curator.framework.recipes.cache.RefreshOperation
    • 触发事件操作:org.apache.curator.framework.recipes.cache.EventOperation
    • 获取数据操作:org.apache.curator.framework.recipes.cache.GetDataOperation

而这些操作对象,都在构造器中接受PathChildrenCache引用,这样可以在操作中,处理cache(回调):

EventOperation(PathChildrenCache cache, PathChildrenCacheEvent event)
{
    this.cache = cache;
    this.event = event;
}
GetDataOperation(PathChildrenCache cache, String fullPath)
{
    this.cache = cache;
    this.fullPath = PathUtils.validatePath(fullPath);
}
RefreshOperation(PathChildrenCache cache, PathChildrenCache.RefreshMode mode)
{
    this.cache = cache;
    this.mode = mode;
}

而这些操作,还使用了一个单线程的线程池来调用,从而形成了异步调用。

  • 使用了一个private final Set<Operation> operationsQuantizer = Sets.newSetFromMap(Maps.<Operation, Boolean>newConcurrentMap());来作为线程池的任务接收队列
    • 使用set,避免了并发情况下重复操作
    • 由于单线程,使得各种操作都是按序执行的
  • 所以为了避免curator的监听机制阻塞
    • childrenWatcher以及dataWatcher中,都使用异步执行命令的方式

触发操作:

void offerOperation(final Operation operation)
{
    if ( operationsQuantizer.add(operation) )
    {
        submitToExecutor
        (
            new Runnable()
            {
                @Override
                public void run()
                {
                    try
                    {
                        operationsQuantizer.remove(operation);
                        operation.invoke();
                    }
                    catch ( InterruptedException e )
                    {
                        //We expect to get interrupted during shutdown,
                        //so just ignore these events
                        if ( state.get() != State.CLOSED )
                        {
                            handleException(e);
                        }
                        Thread.currentThread().interrupt();
                    }
                    catch ( Exception e )
                    {
                        ThreadUtils.checkInterrupted(e);
                        handleException(e);
                    }
                }
            }
        );
    }
}
private synchronized void submitToExecutor(final Runnable command)
{
    if ( state.get() == State.STARTED )
    {
        executorService.submit(command);
    }
}
  • 考虑到了各种操作的中断
  • 考虑到了状态
  • 统一操作的异常处理
  • 投递方法submitToExecutor使用了synchronized
    • 因为可能监听器触发,所以需要对状态进行检查
      • 如先关闭,然后再被某个监听器回掉,导致不必要的操作
    • 而检查动作不是原子的,所以需要同步锁

3. 用法

3.1 创建

public PathChildrenCache(CuratorFramework client,
                         String path,
                         boolean cacheData)
  • cacheData
    • 如果设置true,是否需要缓存数据

3.2 使用

  • Cache必须在使用前调用start()方法
    • 有两个start()方法
      1. void start()
        • 无参
      2. void start(PathChildrenCache.StartMode mode)
        • 可以通过参数,选择如何初始化
        • StartMode
          • NORMAL
          • BUILD_INITIAL_CACHE
          • POST_INITIALIZED_EVENT
  • 使用完成后需要调用close()方法
  • 任何时候,调用getCurrentData()都可以得到状态信息
  • 可以添加监听器,当数据发生变动时回调执行
    • public void addListener(PathChildrenCacheListener listener)

4. 错误处理

PathChildrenCache实例会通过ConnectionStateListener监听链接状态。 如果链接状态发生变化,缓存会被重置(PathChildrenCacheListener会受到一个RESET事件)

5. 源码分析

5.1 类定义

public class PathChildrenCache implements Closeable{}
  • 实现了java.io.Closeable接口

5.2 成员变量

public class PathChildrenCache implements Closeable
{
    private final Logger log = LoggerFactory.getLogger(getClass());
    private final CuratorFramework client;
    private final String path;
    private final CloseableExecutorService executorService;
    private final boolean cacheData;
    private final boolean dataIsCompressed;
    private final ListenerContainer<PathChildrenCacheListener> listeners = new ListenerContainer<PathChildrenCacheListener>();
    private final ConcurrentMap<String, ChildData> currentData = Maps.newConcurrentMap();
    private final AtomicReference<Map<String, ChildData>> initialSet = new AtomicReference<Map<String, ChildData>>();
    private final Set<Operation> operationsQuantizer = Sets.newSetFromMap(Maps.<Operation, Boolean>newConcurrentMap());
    private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
    private final EnsureContainers ensureContainers;

    private enum State
    {
        LATENT,
        STARTED,
        CLOSED
    }

    private static final ChildData NULL_CHILD_DATA = new ChildData("/", null, null);

    private static final boolean USE_EXISTS = Boolean.getBoolean("curator-path-children-cache-use-exists");

    private volatile Watcher childrenWatcher = new Watcher()
    {
        @Override
        public void process(WatchedEvent event)
        {
            offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.STANDARD));
        }
    };

    private volatile Watcher dataWatcher = new Watcher()
    {
        @Override
        public void process(WatchedEvent event)
        {
            try
            {
                if ( event.getType() == Event.EventType.NodeDeleted )
                {
                    remove(event.getPath());
                }
                else if ( event.getType() == Event.EventType.NodeDataChanged )
                {
                    offerOperation(new GetDataOperation(PathChildrenCache.this, event.getPath()));
                }
            }
            catch ( Exception e )
            {
                ThreadUtils.checkInterrupted(e);
                handleException(e);
            }
        }
    };

    @VisibleForTesting
    volatile Exchanger<Object> rebuildTestExchanger;
    
    private volatile ConnectionStateListener connectionStateListener = new ConnectionStateListener()
    {
        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState)
        {
            handleStateChange(newState);
        }
    };
    private static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("PathChildrenCache");
}
  • log
  • client
  • path
    • 缓存对应的zk节点路径
  • executorService
    • org.apache.curator.utils.CloseableExecutorService
    • 线程池
    • 用以执行各种操作
    • 参见第2章节
  • cacheData
    • 是否需要缓存数据
  • dataIsCompressed
    • 数据是否已压缩
  • listeners
    • org.apache.curator.framework.listen.ListenerContainer
    • 监听器容器(管理多个监听器)
    • 业务监听器
    • 可以添加自己的监听器
  • currentData
    • java.util.concurrent.ConcurrentMap
    • 当前数据
    • <String, ChildData>
    • 存放着多个org.apache.curator.framework.recipes.cache.ChildData
  • initialSet
    • AtomicReference
    • 初始化集合
    • 放置节点,以此来跟踪各个节点是否初始化
      • 如果全部节点都初始化完成,则会触发PathChildrenCacheEvent.Type.INITIALIZED事件
  • operationsQuantizer
    • 相当于线程池的任务接收队列
  • state
    • 状态
    • AtomicReference
  • ensureContainers
    • org.apache.curator.framework.EnsureContainers
    • 可以线程安全的创建path节点
  • State
    • 内部枚举
      • LATENT
      • STARTED
      • CLOSED
  • NULL_CHILD_DATA
    • 私有常量
    • 空数据节点
  • USE_EXISTS
    • 私有常量
    • 使用系统配置中curator-path-children-cache-use-exists的值
  • childrenWatcher
    • volatile
    • 子节点变动的监听器
  • dataWatcher
    • volatile
    • 数据变动监听器
  • rebuildTestExchanger
    • java.util.concurrent.Exchanger
    • 用于并发线程间传值
    • 在重建缓存时通过此对象传递一个信号对象
    • 用于测试
  • connectionStateListener
    • 链接状态监听器
  • defaultThreadFactory
    • 线程工厂

5.3 构造器

public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode)
{
    this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true));
}

public PathChildrenCache(CuratorFramework client, String path, PathChildrenCacheMode mode, ThreadFactory threadFactory)
{
    this(client, path, mode != PathChildrenCacheMode.CACHE_PATHS_ONLY, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));
}

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)
{
    this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(defaultThreadFactory), true));
}

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory)
{
    this(client, path, cacheData, false, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));
}

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory)
{
    this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(Executors.newSingleThreadExecutor(threadFactory), true));
}

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService)
{
    this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(executorService));
}

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService)
{
    this.client = client;
    this.path = PathUtils.validatePath(path);
    this.cacheData = cacheData;
    this.dataIsCompressed = dataIsCompressed;
    this.executorService = executorService;
    ensureContainers = new EnsureContainers(client, path);
}

有7个构造器,最终都是调用最后一个。不过从中也可以看出:

  • 默认使用newSingleThreadExecutor单线程线程池
  • 默认不对数据进行压缩处理

5.4 启动

缓存在使用前需要调用start()

public enum StartMode
    {
        NORMAL,
        BUILD_INITIAL_CACHE,
        POST_INITIALIZED_EVENT
    }

public void start() throws Exception
{
    start(StartMode.NORMAL);
}

@Deprecated
public void start(boolean buildInitial) throws Exception
{
    start(buildInitial ? StartMode.BUILD_INITIAL_CACHE : StartMode.NORMAL);
}

public void start(StartMode mode) throws Exception
{
    Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "already started");
    mode = Preconditions.checkNotNull(mode, "mode cannot be null");

    client.getConnectionStateListenable().addListener(connectionStateListener);

    switch ( mode )
    {
        case NORMAL:
        {
            offerOperation(new RefreshOperation(this, RefreshMode.STANDARD));
            break;
        }

        case BUILD_INITIAL_CACHE:
        {
            rebuild();
            break;
        }

        case POST_INITIALIZED_EVENT:
        {
            initialSet.set(Maps.<String, ChildData>newConcurrentMap());
            offerOperation(new RefreshOperation(this, RefreshMode.POST_INITIALIZED));
            break;
        }
    }
}

private void processChildren(List<String> children, RefreshMode mode) throws Exception
{
    Set<String> removedNodes = Sets.newHashSet(currentData.keySet());
    for ( String child : children ) {
        removedNodes.remove(ZKPaths.makePath(path, child));
    }

    for ( String fullPath : removedNodes )
    {
        remove(fullPath);
    }

    for ( String name : children )
    {
        String fullPath = ZKPaths.makePath(path, name);

        if ( (mode == RefreshMode.FORCE_GET_DATA_AND_STAT) || !currentData.containsKey(fullPath) )
        {
            getDataAndStat(fullPath);
        }

        updateInitialSet(name, NULL_CHILD_DATA);
    }
    maybeOfferInitializedEvent(initialSet.get());
}
  • 无参的start()
    • 默认使用StartMode.NORMAL策略
  • 不建议使用start(boolean buildInitial)
    • true
      • 使用StartMode.BUILD_INITIAL_CACHE策略
    • false
      • 使用StartMode.NORMAL策略
  • 启动时添加了链接状态的监听器

可以看到启动过程有三种策略:

  1. NORMAL模式
    1. 执行刷新命令org.apache.curator.framework.recipes.cache.RefreshOperation命令模式
      • 使用RefreshMode.STANDARD刷新模式
      • 调用org.apache.curator.framework.recipes.cache.PathChildrenCache#refresh方法
        1. 调用org.apache.curator.framework.EnsureContainers#ensure创建节点
        2. 在节点上添加childrenWatcher监听器
        3. 回调触发org.apache.curator.framework.recipes.cache.PathChildrenCache#processChildren进行刷新
          1. 清理掉已缓存在本地的数据中的其他节点
            1. 筛选出不是本cache的数据节点
            2. 从本地初始集合中清理掉
          2. 如果缓存节点还没用同步到本地,或者指定为RefreshMode.FORCE_GET_DATA_AND_STAT模式
            1. 则立即同步节点数据与状态
              1. 如果不需要缓存数据,则只检查节点是否存在(只缓存节点以及状态,不含数据)
              2. 否则读取数据(如果需要解压则解压数据)并构建ChildData缓存
                1. 新数据放入currentData
                2. 根据情况触发事件(唤起监听器)
                  • PathChildrenCacheEvent.Type.CHILD_ADDED事件
                  • PathChildrenCacheEvent.Type.CHILD_UPDATED事件
                3. 更新initialSet数据(将未同步的NULL_CHILD_DATA数据替换成读取的数据)
          3. 更新initialSet
            1. 如果initialSet的Map不为空
              • NORMAL模式下,这里为空
              • 可以参见POST_INITIALIZED_EVENT模式
  2. BUILD_INITIAL_CACHE模式
    1. 调用rebuild方法(此方法会阻塞执行)
      • 重新查询所有需要的数据
      • 不会触发任何事件
      1. 安全创建path
      2. 清空currentData缓存
      3. 重新加载path下子节点,逐个结点重构缓存
        • 逐个读取节点数据和状态
        • 构建ChildData放入currentData
      4. 通过rebuildTestExchanger发送要给信号对象
  3. POST_INITIALIZED_EVENT模式
    1. 初始化initialSet
    2. RefreshMode.POST_INITIALIZED模式刷新缓存
      • 参见NORMAL模式,但不同的是
        • 更新initialSet
          1. 如果initialSet的Map不为空
            • POST_INITIALIZED_EVENT模式下,这里已经初始化了Map
          2. 如果initialSet中的数据都已经同步完成(都不等于NULL_CHILD_DATA
            1. initialSet制空
            2. 触发PathChildrenCacheEvent.Type.INITIALIZED事件

5.5 节点发生变化

在启动start()已经给path上增加了一个监听器childrenWatcher

private volatile Watcher childrenWatcher = new Watcher()
{
    @Override
    public void process(WatchedEvent event)
    {
        offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.STANDARD));
    }
};
  • RefreshMode.STANDARD模式刷新缓存
    • 会对本地的缓存数据和zk节点做比较
    • 只是处理新的缓存数据
  • 注意操作的参数PathChildrenCache.this
    • this不同了

5.6 数据发生变化

在每次获取缓存数据时(getDataAndStat方法),在每个缓存上添加了监听器dataWatcher

private volatile Watcher dataWatcher = new Watcher()
{
    @Override
    public void process(WatchedEvent event)
    {
        try
        {
            if ( event.getType() == Event.EventType.NodeDeleted )
            {
                remove(event.getPath());
            }
            else if ( event.getType() == Event.EventType.NodeDataChanged )
            {
                offerOperation(new GetDataOperation(PathChildrenCache.this, event.getPath()));
            }
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            handleException(e);
        }
    }
};
  • 节点删除时
    • 清理缓存
    • 触发PathChildrenCacheEvent.Type.CHILD_REMOVED事件
  • 数据发生变化时
    • 执行GetDataOperation操作
      • 也就是再次执行getDataAndStat方法
  • 注意操作的参数PathChildrenCache.this
    • this不同了

5.7 获取当前数据

public List<ChildData> getCurrentData()
{
    return ImmutableList.copyOf(Sets.<ChildData>newTreeSet(currentData.values()));
}

public ChildData getCurrentData(String fullPath)
{
    return currentData.get(fullPath);
}

都是从本地数据中获取

5.8 清理

5.8.1 清理缓存

public void clear()
{
    currentData.clear();
}
public void clearAndRefresh() throws Exception
{
    currentData.clear();
    offerOperation(new RefreshOperation(this, RefreshMode.STANDARD));
}

清空本地数据

如果需要则使用RefreshMode.STANDARD模式,刷新

5.8.2 清理缓存数据

public void clearDataBytes(String fullPath)
{
    clearDataBytes(fullPath, -1);
}
public boolean clearDataBytes(String fullPath, int ifVersion)
{
    ChildData data = currentData.get(fullPath);
    if ( data != null )
    {
        if ( (ifVersion < 0) || (ifVersion == data.getStat().getVersion()) )
        {
            if ( data.getData() != null )
            {
                currentData.replace(fullPath, data, new ChildData(data.getPath(), data.getStat(), null));
            }
            return true;
        }
    }
    return false;
}

保留缓存信息,但是数据部分制空

5.9 链接状态变化

在启动时(start())中为链接添加了connectionStateListener监听器:

private volatile ConnectionStateListener connectionStateListener = new ConnectionStateListener()
{
    @Override
    public void stateChanged(CuratorFramework client, ConnectionState newState)
    {
        handleStateChange(newState);
    }
};

private void handleStateChange(ConnectionState newState)
{
    switch ( newState )
    {
    case SUSPENDED:
    {
        offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_SUSPENDED, null)));
        break;
    }

    case LOST:
    {
        offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_LOST, null)));
        break;
    }

    case CONNECTED:
    case RECONNECTED:
    {
        try
        {
            offerOperation(new RefreshOperation(this, RefreshMode.FORCE_GET_DATA_AND_STAT));
            offerOperation(new EventOperation(this, new PathChildrenCacheEvent(PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED, null)));
        }
        catch ( Exception e )
        {
            ThreadUtils.checkInterrupted(e);
            handleException(e);
        }
        break;
    }
    }
}

主要都是根据链接状态,触发不同的操作,以及触发业务监听器来执行。

  • 由于数据都是缓存,所以在链接丢失,中断时,仅仅时触发事件,并没有将数据置为不可用
  • 当链接建立CONNECTED,以及恢复时RECONNECTED都触发了一次RefreshMode.FORCE_GET_DATA_AND_STAT模式的刷新操作。

5.10 关闭

在使用完之后,需要调用close()方法:

public void close() throws IOException
{
    if ( state.compareAndSet(State.STARTED, State.CLOSED) )
    {
        client.getConnectionStateListenable().removeListener(connectionStateListener);
        listeners.clear();
        executorService.close();
        client.clearWatcherReferences(childrenWatcher);
        client.clearWatcherReferences(dataWatcher);

        // TODO
        // This seems to enable even more GC - I'm not sure why yet - it
        // has something to do with Guava's cache and circular references
        connectionStateListener = null;
        childrenWatcher = null;
        dataWatcher = null;
    }
}
  • 原子操作,将状态更新为CLOSED
  • 移除链接状态监听器
  • 清空业务监听器
  • 关闭线程池
  • 清空节点监听器
  • 清空数据监听器

6. 小结

PathChildrenCache虽然名字带有Cache。 但其实并不是一个完整的缓存。

应该说,它仅仅是对path下诸多节点进行统一的管理。 当这些节点发生变动,或者数据发生变化时,都可以被PathChildrenCache发现,并同步到本地Map中。以此来达到一个缓存的概念。

从API中也能发现,它只能获取数据。至于放置缓存,则需要另外实现。

  • 其实也简单,直接向path下新建节点并写入数据就行

可以通过getListenable().addListener(listener);添加自定义监听器,从而实现对缓存进行更细致的控制。

7. 示例

这里可以参考官方的示例

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐