zookeeper获取所有节点——基于curator框架
zookeeper获取所有节点——基于curator框架本文主要介绍基于curator框架访问zookeeper所有节点。本文主要参考:https://github.com/Netflix/curatorhttp://curator.apache.org/好,下面上货。1、首先创建一个maven工程mvn archetype:generate -Darc
·
zookeeper获取所有节点——基于curator框架
本文主要介绍基于curator框架访问zookeeper所有节点。
本文主要参考:
https://github.com/Netflix/curator
http://curator.apache.org/
好,下面上货。
1、首先创建一个maven工程
mvn archetype:generate -DarchetypeCatalog=internal
2、添加依赖:
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
</dependency>
3、看一下工程结构:
4、zkUtils中的内容:
CreateClient.java
package com.xueyou.zkdemo.zkUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CreateClient {
public static CuratorFramework createSimple(String connectionString) {
// these are reasonable arguments for the ExponentialBackoffRetry. The first
// retry will wait 1 second - the second will wait up to 2 seconds - the
// third will wait up to 4 seconds.
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
// The simplest way to get a CuratorFramework instance. This will use default values.
// The only required arguments are the connection string and the retry policy
return CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
}
public static CuratorFramework createWithOptions(String connectionString, RetryPolicy retryPolicy, int connectionTimeoutMs, int sessionTimeoutMs) {
// using the CuratorFrameworkFactory.builder() gives fine grained control
// over creation options. See the CuratorFrameworkFactory.Builder javadoc
// details
return CuratorFrameworkFactory.builder()
.connectString(connectionString)
.retryPolicy(retryPolicy)
.connectionTimeoutMs(connectionTimeoutMs)
.sessionTimeoutMs(sessionTimeoutMs)
// etc. etc.
.build();
}
}
CuratorZkClientBridge.java
package com.xueyou.zkdemo.zkUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
public class CuratorZkClientBridge {
private final CuratorFramework curator;
private final AtomicReference<CuratorListener> listener = new AtomicReference<CuratorListener>(null);
/**
* @param curator Curator instance to bridge
*/
public CuratorZkClientBridge(CuratorFramework curator)
{
this.curator = curator;
}
/**
* Return the client
*
* @return client
*/
public CuratorFramework getCurator()
{
return curator;
}
public void connect(final Watcher watcher)
{
if ( watcher != null )
{
CuratorListener localListener = new CuratorListener()
{
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception
{
if ( event.getWatchedEvent() != null )
{
watcher.process(event.getWatchedEvent());
}
}
};
curator.getCuratorListenable().addListener(localListener);
listener.set(localListener);
try
{
BackgroundCallback callback = new BackgroundCallback()
{
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
WatchedEvent fakeEvent = new WatchedEvent(Watcher.Event.EventType.None, curator.getZookeeperClient().isConnected() ? Watcher.Event.KeeperState.SyncConnected : Watcher.Event.KeeperState.Disconnected, null);
watcher.process(fakeEvent);
}
};
curator.checkExists().inBackground(callback).forPath("/foo");
}
catch ( Exception e )
{
throw new RuntimeException(e);
}
}
}
public void close() throws InterruptedException
{
// NOTE: the curator instance is NOT closed here
CuratorListener localListener = listener.getAndSet(null);
if ( localListener != null )
{
curator.getCuratorListenable().removeListener(localListener);
}
}
public String create(String path, byte[] data, CreateMode mode) throws KeeperException, InterruptedException
{
try
{
return curator.create().withMode(mode).forPath(path, data);
}
catch ( Exception e )
{
adjustException(e);
}
return null; // will never execute
}
public void delete(String path) throws InterruptedException, KeeperException
{
try
{
curator.delete().forPath(path);
}
catch ( Exception e )
{
adjustException(e);
}
}
public boolean exists(String path, boolean watch) throws KeeperException, InterruptedException
{
try
{
return watch ? (curator.checkExists().watched().forPath(path) != null) : (curator.checkExists().forPath(path) != null);
}
catch ( Exception e )
{
adjustException(e);
}
return false; // will never execute
}
public List<String> getChildren(String path, boolean watch) throws KeeperException, InterruptedException
{
try
{
return watch ? curator.getChildren().watched().forPath(path) : curator.getChildren().forPath(path);
}
catch ( Exception e )
{
adjustException(e);
}
return null; // will never execute
}
public byte[] readData(String path, Stat stat, boolean watch) throws KeeperException, InterruptedException
{
try
{
if ( stat != null )
{
return watch ? curator.getData().storingStatIn(stat).watched().forPath(path) : curator.getData().storingStatIn(stat).forPath(path);
}
else
{
return watch ? curator.getData().watched().forPath(path) : curator.getData().forPath(path);
}
}
catch ( Exception e )
{
adjustException(e);
}
return null; // will never execute
}
public void writeData(String path, byte[] data, int expectedVersion) throws KeeperException, InterruptedException
{
writeDataReturnStat(path, data, expectedVersion);
}
public Stat writeDataReturnStat(String path, byte[] data, int expectedVersion) throws KeeperException, InterruptedException {
try
{
curator.setData().withVersion(expectedVersion).forPath(path, data);
}
catch ( Exception e )
{
adjustException(e);
}
return null; // will never execute
}
public ZooKeeper.States getZookeeperState()
{
try
{
return curator.getZookeeperClient().getZooKeeper().getState();
}
catch ( Exception e )
{
throw new RuntimeException(e);
}
}
public long getCreateTime(String path) throws KeeperException, InterruptedException
{
try
{
Stat stat = curator.checkExists().forPath(path);
return (stat != null) ? stat.getCtime() : 0;
}
catch ( Exception e )
{
adjustException(e);
}
return 0;
}
public String getServers()
{
return curator.getZookeeperClient().getCurrentConnectionString();
}
private void adjustException(Exception e) throws KeeperException, InterruptedException
{
if ( e instanceof KeeperException )
{
throw (KeeperException)e;
}
if ( e instanceof InterruptedException )
{
throw (InterruptedException)e;
}
throw new RuntimeException(e);
}
}
5、例子程序,遍历zookeeper所有节点。
package com.xueyou.zkdemo;
import com.xueyou.zkdemo.zkUtils.CreateClient;
import com.xueyou.zkdemo.zkUtils.CuratorZkClientBridge;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.KeeperException;
import java.util.ArrayList;
import java.util.List;
/**
* Hello world!
*/
public class App {
public static String connectionString = "192.168.0.99:2181,192.168.0.99:2182,192.168.0.99:2183";
public static List<String> res = new ArrayList<>();
public static void main(String[] args) {
System.out.println("Hello World!");
CuratorFramework curatorFramework = CreateClient.createSimple(connectionString);
curatorFramework.start();
//doSomething to zookeeper
CuratorZkClientBridge curatorZkClientBridge = new CuratorZkClientBridge(curatorFramework);
System.out.println(getNode(curatorZkClientBridge, "/"));
curatorFramework.close();
}
public static List<String> getNode(CuratorZkClientBridge curatorZkClientBridge, String parentNode) {
try {
List<String> tmpList = curatorZkClientBridge.getChildren(parentNode, false);
for (String tmp : tmpList) {
String childNode = parentNode.equals("/") ? parentNode + tmp : parentNode + "/" + tmp;
res.add(childNode);
getNode(curatorZkClientBridge, childNode);
}
return res;
} catch (KeeperException e) {
e.printStackTrace();
return null;
} catch (InterruptedException e) {
e.printStackTrace();
return null;
}
}
}
6、运行结果:
更多推荐
已为社区贡献5条内容
所有评论(0)