zookeeper获取所有节点——基于curator框架

本文主要介绍基于curator框架访问zookeeper所有节点。
本文主要参考:
https://github.com/Netflix/curator
http://curator.apache.org/

好,下面上货。
1、首先创建一个maven工程
mvn archetype:generate -DarchetypeCatalog=internal

2、添加依赖:
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework -->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.10</version>
        </dependency>

3、看一下工程结构:


4、zkUtils中的内容:
CreateClient.java
package com.xueyou.zkdemo.zkUtils;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CreateClient {
    public static CuratorFramework createSimple(String connectionString) {
        // these are reasonable arguments for the ExponentialBackoffRetry. The first
        // retry will wait 1 second - the second will wait up to 2 seconds - the
        // third will wait up to 4 seconds.
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);

        // The simplest way to get a CuratorFramework instance. This will use default values.
        // The only required arguments are the connection string and the retry policy
        return CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
    }

    public static CuratorFramework createWithOptions(String connectionString, RetryPolicy retryPolicy, int connectionTimeoutMs, int sessionTimeoutMs) {
        // using the CuratorFrameworkFactory.builder() gives fine grained control
        // over creation options. See the CuratorFrameworkFactory.Builder javadoc
        // details
        return CuratorFrameworkFactory.builder()
                .connectString(connectionString)
                .retryPolicy(retryPolicy)
                .connectionTimeoutMs(connectionTimeoutMs)
                .sessionTimeoutMs(sessionTimeoutMs)
                // etc. etc.
                .build();
    }
}

CuratorZkClientBridge.java
package com.xueyou.zkdemo.zkUtils;


import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
public class CuratorZkClientBridge {
    private final CuratorFramework curator;
    private final AtomicReference<CuratorListener> listener = new AtomicReference<CuratorListener>(null);

    /**
     * @param curator Curator instance to bridge
     */
    public CuratorZkClientBridge(CuratorFramework curator)
    {
        this.curator = curator;
    }

    /**
     * Return the client
     *
     * @return client
     */
    public CuratorFramework getCurator()
    {
        return curator;
    }

    public void connect(final Watcher watcher)
    {
        if ( watcher != null )
        {
            CuratorListener     localListener = new CuratorListener()
            {
                public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception
                {
                    if ( event.getWatchedEvent() != null )
                    {
                        watcher.process(event.getWatchedEvent());
                    }
                }
            };
            curator.getCuratorListenable().addListener(localListener);
            listener.set(localListener);

            try
            {
                BackgroundCallback callback = new BackgroundCallback()
                {
                    public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
                    {
                        WatchedEvent        fakeEvent = new WatchedEvent(Watcher.Event.EventType.None, curator.getZookeeperClient().isConnected() ? Watcher.Event.KeeperState.SyncConnected : Watcher.Event.KeeperState.Disconnected, null);
                        watcher.process(fakeEvent);
                    }
                };
                curator.checkExists().inBackground(callback).forPath("/foo");
            }
            catch ( Exception e )
            {
                throw new RuntimeException(e);
            }
        }
    }

    public void close() throws InterruptedException
    {
        // NOTE: the curator instance is NOT closed here

        CuratorListener localListener = listener.getAndSet(null);
        if ( localListener != null )
        {
            curator.getCuratorListenable().removeListener(localListener);
        }
    }

    public String create(String path, byte[] data, CreateMode mode) throws KeeperException, InterruptedException
    {
        try
        {
            return curator.create().withMode(mode).forPath(path, data);
        }
        catch ( Exception e )
        {
            adjustException(e);
        }
        return null;    // will never execute
    }

    public void delete(String path) throws InterruptedException, KeeperException
    {
        try
        {
            curator.delete().forPath(path);
        }
        catch ( Exception e )
        {
            adjustException(e);
        }
    }

    public boolean exists(String path, boolean watch) throws KeeperException, InterruptedException
    {
        try
        {
            return watch ? (curator.checkExists().watched().forPath(path) != null) : (curator.checkExists().forPath(path) != null);
        }
        catch ( Exception e )
        {
            adjustException(e);
        }
        return false;   // will never execute
    }

    public List<String> getChildren(String path, boolean watch) throws KeeperException, InterruptedException
    {
        try
        {
            return watch ? curator.getChildren().watched().forPath(path) : curator.getChildren().forPath(path);
        }
        catch ( Exception e )
        {
            adjustException(e);
        }
        return null;   // will never execute
    }

    public byte[] readData(String path, Stat stat, boolean watch) throws KeeperException, InterruptedException
    {
        try
        {
            if ( stat != null )
            {
                return watch ? curator.getData().storingStatIn(stat).watched().forPath(path) : curator.getData().storingStatIn(stat).forPath(path);
            }
            else
            {
                return watch ? curator.getData().watched().forPath(path) : curator.getData().forPath(path);
            }
        }
        catch ( Exception e )
        {
            adjustException(e);
        }
        return null;   // will never execute
    }

    public void writeData(String path, byte[] data, int expectedVersion) throws KeeperException, InterruptedException
    {
        writeDataReturnStat(path, data, expectedVersion);
    }

    public Stat writeDataReturnStat(String path, byte[] data, int expectedVersion) throws KeeperException, InterruptedException {
        try
        {
            curator.setData().withVersion(expectedVersion).forPath(path, data);
        }
        catch ( Exception e )
        {
            adjustException(e);
        }
        return null; // will never execute
    }

    public ZooKeeper.States getZookeeperState()
    {
        try
        {
            return curator.getZookeeperClient().getZooKeeper().getState();
        }
        catch ( Exception e )
        {
            throw new RuntimeException(e);
        }
    }

    public long getCreateTime(String path) throws KeeperException, InterruptedException
    {
        try
        {
            Stat            stat = curator.checkExists().forPath(path);
            return (stat != null) ? stat.getCtime() : 0;
        }
        catch ( Exception e )
        {
            adjustException(e);
        }
        return 0;
    }

    public String getServers()
    {
        return curator.getZookeeperClient().getCurrentConnectionString();
    }

    private void adjustException(Exception e) throws KeeperException, InterruptedException
    {
        if ( e instanceof KeeperException )
        {
            throw (KeeperException)e;
        }

        if ( e instanceof InterruptedException )
        {
            throw (InterruptedException)e;
        }

        throw new RuntimeException(e);
    }
}

5、例子程序,遍历zookeeper所有节点。
package com.xueyou.zkdemo;

import com.xueyou.zkdemo.zkUtils.CreateClient;
import com.xueyou.zkdemo.zkUtils.CuratorZkClientBridge;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.KeeperException;

import java.util.ArrayList;
import java.util.List;

/**
 * Hello world!
 */
public class App {
    public static String connectionString = "192.168.0.99:2181,192.168.0.99:2182,192.168.0.99:2183";
    public static List<String> res = new ArrayList<>();

    public static void main(String[] args) {
        System.out.println("Hello World!");
        CuratorFramework curatorFramework = CreateClient.createSimple(connectionString);
        curatorFramework.start();
        //doSomething to zookeeper
        CuratorZkClientBridge curatorZkClientBridge = new CuratorZkClientBridge(curatorFramework);
        System.out.println(getNode(curatorZkClientBridge, "/"));
        curatorFramework.close();
    }

    public static List<String> getNode(CuratorZkClientBridge curatorZkClientBridge, String parentNode) {
        try {
            List<String> tmpList = curatorZkClientBridge.getChildren(parentNode, false);
            for (String tmp : tmpList) {
                String childNode = parentNode.equals("/") ? parentNode + tmp : parentNode + "/" + tmp;
                res.add(childNode);
                getNode(curatorZkClientBridge, childNode);
            }
            return res;
        } catch (KeeperException e) {
            e.printStackTrace();
            return null;
        } catch (InterruptedException e) {
            e.printStackTrace();
            return null;
        }
    }

}

6、运行结果:

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐