由于新版本TreeCache/NodeCache/PathCache 监听器已经过时了,新的构造方法贴出来了,记录一下,三种缓存监听器都写在一起了,按需选择,有问题欢迎指正,谢谢

package com.example.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.File;
import java.util.concurrent.TimeUnit;

/**
 * config for curator framework and cache listener build within
 * 
 * @author Hongyi Zheng
 * @date 2020/12/14
 */
@Configuration
@Slf4j
public class ZkConfiguration {
    
    // 这些配置最好抽出来放在配置中心
    private final String hosts = "localhost:2181";
    private final String namespace = "zk-demo";
    private int sessionTimeoutMs = 10_000;
    private int connectionTimeoutMs = 15_000;
    private int maxWaitTimeMs = 10_000;

    private CuratorFramework client;

    @Bean("curator")
    public CuratorFramework getClient() throws InterruptedException {
        /*
            重试策略使用指数补偿重试机制 arg1 = 基础等待时间Ms 如果重试失败 第二次的时长会基于baseSleepTime指数增长
            arg2 = 最大重试次数 源码可以看到最大重试29次 大于29次的会默认等于29
          */
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        /*
            指定命名空间来隔离不同环境的节点,这里我用来隔离不同项目的节点
         */
        client = CuratorFrameworkFactory.builder()
                .connectString(hosts)
                .sessionTimeoutMs(sessionTimeoutMs)
                .connectionTimeoutMs(connectionTimeoutMs)
                .retryPolicy(retryPolicy)
                .namespace(namespace)
                .build();
        client.start();
        startListener();
        destroyOnFailure();
        return client;
    }

    /**
     * close client manually
     *
     * @throws InterruptedException e
     */
    private void destroyOnFailure() throws InterruptedException {
        // block & wait until connected or maxWaitTime 
        // if client is not null close and shut down
        if (!client.blockUntilConnected(maxWaitTimeMs, TimeUnit.MILLISECONDS) && client != null) {
            client.close();
            System.exit(0);
        }
    }


    /**
     * create a TreeCacheListener for childEvent listening
     */
    public void startListener() {
        //由于new TreeCacheListener/NodeCache/PathChildrenCache 新版本已经过时,
        //这里使用curatorCache提供的工厂方法 构造treeCacheListener
        CuratorCacheListener listener = CuratorCacheListener
                .builder()
                // 3 types of cache listener, choose one in case
                .forPathChildrenCache(File.separator, client, (c, e) -> {
                    // child node listener
                    switch (e.getType()) {
                        default:
                            break;
                    }
                })
                .forNodeCache(() -> {
                    //do something when node changed               
                })
                .forTreeCache(client, (c, e) -> {
                    log.info("status changed:{}", e.getType());
                    switch (e.getType()) {
                        case INITIALIZED:
                            whenInit(e);
                            break;
                        case NODE_ADDED:
                            whenAdded(e);
                            break;
                        case NODE_UPDATED:
                            whenUpdated(e);
                            break;
                        case NODE_REMOVED:
                            whenRemoved(e);
                            break;
                        default:
                            break;
                    }
                })
                .build();
        CuratorCache cache = CuratorCache.builder(client, File.separator).build();
        log.info("CuratorCache building complete ,listener path :{}", File.separator + namespace);
        cache.listenable().addListener(listener);
        cache.start();
    }

    private void whenInit(TreeCacheEvent e) {
        log.info("TreeCacheListener initialized, status :{}", e.getType());
    }

    private void whenAdded(TreeCacheEvent e) {
        Stat stat = e.getData().getStat();
        log.info("node stat :{}", stat);
        log.info("node created: {}", e.toString());
    }

    private void whenUpdated(TreeCacheEvent e) {
        log.info("node update: Old :{}-{} ,New :{}-{} ",
                e.getOldData().getPath(), e.getOldData().getData(),
                e.getData().getPath(), e.getData().getData());
    }

    private void whenRemoved(TreeCacheEvent e) {
        log.info("node removed:{}-{}", e.getData().getPath(), e.getData().getData());
    }
}

 

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐