Curator使用

Curator是Netflix公司一个开源的zookeeper客户端,在原生API接口上进行了包装,解决了很多ZooKeeper客户端非常底层的细节开发。同时内部实现了诸如Session超时重连,Watcher反复注册等功能,实现了Fluent风格的API接口,是使用最广泛的zookeeper客户端之一。

使用Curator需要依赖包:

guava-17.0.jar
zookeeper-3.4.6.jar
curator-framework-3.2.1.jar

创建连接:

public class CreateSession {

    public static void main(String[] args) throws Throwable  {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);//刚开始重试间隔为1秒,之后重试间隔逐渐增加,最多重试不超过三次
        /*RetryPolicy retryPolicy1 = new RetryNTimes(3, 1000);//最大重试次数,和两次重试间隔时间
        RetryPolicy retryPolicy2 = new RetryUntilElapsed(5000, 1000);//会一直重试直到达到规定时间,第一个参数整个重试不能超过时间,第二个参数重试间隔
        //第一种方式
        CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.0.3:2181", 5000,5000,retryPolicy);//最后一个参数重试策略
        */

        //第二种方式
        CuratorFramework client1 = CuratorFrameworkFactory.builder().connectString("192.168.0.3:2181")
                .sessionTimeoutMs(5000)//会话超时时间
                .connectionTimeoutMs(5000)//连接超时时间
                .retryPolicy(retryPolicy)
                .build();

        client1.start();

        String path = client1.create().creatingParentsIfNeeded()//若创建节点的父节点不存在会先创建父节点再创建子节点
                    .withMode(CreateMode.EPHEMERAL)//withMode节点类型,
                    .forPath("/curator/3","131".getBytes());
        System.out.println(path);

        List<String> list = client1.getChildren().forPath("/");
        System.out.println(list);

        //String re = new String(client1.getData().forPath("/curator/3"));//只获取数据内容
        Stat stat = new Stat();
        String re = new String(client1.getData().storingStatIn(stat)//在获取节点内容的同时把状态信息存入Stat对象
                .forPath("/curator/3"));
        System.out.println(re);
        System.out.println(stat);


        client1.delete().guaranteed()//保障机制,若未删除成功,只要会话有效会在后台一直尝试删除
        .deletingChildrenIfNeeded()//若当前节点包含子节点
        .withVersion(-1)//指定版本号
        .forPath("/curator");

        Thread.sleep(Integer.MAX_VALUE);

    }

修改节点数据:

    Stat stat = new Stat();
        String re = new String(client1.getData().storingStatIn(stat)//在获取节点内容的同时把状态信息存入Stat对象
                .forPath("/curator/3"));
        System.out.println(re);
        System.out.println(stat);

        Thread.sleep(10000);
        client1.setData().withVersion(stat.getVersion())//修改前获取一次节点数据得到版本信息
        .forPath("/curator/3", "111".getBytes());

若线程在sleep时,在另一个客户端修改了该节点数据,会抛出异常:

org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /curator/3

异步调用:

ExecutorService es = Executors.newFixedThreadPool(5);//线程池

        RetryPolicy retry = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.0.3:2181", 5000, 5000, retry);
        client.start();

        //Stat stat = client.checkExists().forPath("/node_1");//同步调用
        client.checkExists().inBackground(new BackgroundCallback() {

            @Override
            public void processResult(CuratorFramework curator, CuratorEvent event) throws Exception {//传入客户端对象和事件
                System.out.println(event.getType());
                int re = event.getResultCode();//执行成功为0
                System.out.println(re);

                String path = event.getPath();
                System.out.println(path);
                Stat stat = event.getStat();
                System.out.println(stat);

            }
        },"123",es).forPath("/node_1");//把线程池es传给异步调用

        List<String> list = client.getChildren().forPath("/");
        System.out.println(list);

        Thread.sleep(Integer.MAX_VALUE);

事件监听:

    //节点监听
        RetryPolicy retry = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.0.3:2181", 5000, 5000, retry);
        client.start();

        final NodeCache cache = new NodeCache(client,"/node_1");
        cache.start();

        cache.getListenable().addListener(new NodeCacheListener() {

            @Override
            public void nodeChanged() throws Exception {
                byte[] res = cache.getCurrentData().getData();
                System.out.println("data: " + new String(res));
            }
        });

        Thread.sleep(Integer.MAX_VALUE);


    //子节点监听
    @SuppressWarnings("resource")
        final PathChildrenCache cache = new PathChildrenCache(client,"/node_1",true);
        cache.start();

        cache.getListenable().addListener(new PathChildrenCacheListener() {

            @Override
            public void childEvent(CuratorFramework curator, PathChildrenCacheEvent event) throws Exception {
                switch (event.getType()) {
                case CHILD_ADDED:
                    System.out.println("add:" + event.getData());
                    break;
                case CHILD_UPDATED:
                    System.out.println("update:" + event.getData());
                    break;
                case CHILD_REMOVED:
                    System.out.println("remove:" + event.getData());
                    break;
                default:
                    break;
                }
            }
        });

ACL权限:
        RetryPolicy retry = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.0.3:2181", 5000, 5000, retry);
        client.start();

        //ACL有IP授权和用户名密码访问的模式
        ACL aclRoot = new ACL(Perms.ALL,new Id("digest",DigestAuthenticationProvider.generateDigest("root:root")));
        List<ACL> aclList = new ArrayList<ACL>();
        aclList.add(aclRoot);

        String path = client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
                .withACL(aclList)
                .forPath("/node_3/node_ACL","2".getBytes());
        System.out.println(path);

        CuratorFramework client1 =  CuratorFrameworkFactory.builder().connectString("192.168.0.3:2181")
                .sessionTimeoutMs(5000)//会话超时时间
                .connectionTimeoutMs(5000)//连接超时时间
                .authorization("digest","root:root".getBytes())//权限访问
                .retryPolicy(retry)
                .build();

        client1.start();

        String re = new String(client1.getData().forPath("/node_3/node_ACL"));
        System.out.println(re);
Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐