1. Curator客户端的依赖包

curator是Netflix公司开源的⼀套Zookeeper客户端框架,和ZKClient⼀样,Curator解决了很多 Zookeeper客户端⾮常底层的细节开发工作,包括连接重连,反复注册Watcher和 NodeExistsException异常等,是最流⾏的Zookeeper客户端之⼀。从编码风格上来讲,它提供了基于 Fluent的编程风格⽀持

打开Curator的官网,我们可以看到,Curator包含了以下几个包:

  • curator-framework:对zookeeper的底层api的一些封装;

  • curator-client:提供一些客户端的操作,例如重试策略等;

  • curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等。

Maven依赖 最新版查看

  		<dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>5.4.0</version>
        </dependency>

1.2. Curator 创建会话

使用 curator-framework 包中的工厂类CuratorFrameworkFactory中的静态方法newClient,来创建客户端会话。

1.使用CuratorFramework这个工⼚类的两个静态⽅法来创建⼀个客户端

public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)

public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)

其中参数RetryPolicy提供重试策略的接⼝,可以让用户实现⾃定义的重试策略,默认提供了以下实现, 分别为ExponentialBackoffRetry(基于backoff的重连策略)、RetryNTimes(重连N次策略)、 RetryForever(永远重试策略)

2.通过调用CuratorFramework中的start()⽅法来启动会话

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
CuratorFramework client =
CuratorFrameworkFactory.newClient("127.0.0.1:2181",retryPolicy);
client.start();
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",
 5000,1000,retryPolicy);
client.start();

其实进⼀步查看源代码可以得知,其实这两种⽅法内部实现⼀样,只是对外包装成不同的⽅法。它们的 底层都是通过第三个⽅法builder来实现的

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
private static CuratorFramework Client = CuratorFrameworkFactory.builder()
     .connectString("server1:2181,server2:2181,server3:2181")
     .sessionTimeoutMs(50000)
     .connectionTimeoutMs(30000)
     .retryPolicy(retryPolicy)
     .build();
client.start();

参数:

  • connectString:zk的server地址,多个server之间使用英⽂逗号分隔开
  • connectionTimeoutMs:连接超时时间,如上是30s,默认是15s
  • sessionTimeoutMs:会话超时时间,如上是50s,默认是60s
  • retryPolicy:失败重试策略
    • ExponentialBackoffRetry:构造器含有三个参数 ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
      • baseSleepTimeMs:初始的sleep时间,用于计算之后的每次重试的sleep时间,
        • 计算公式:当前sleep时间=baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount+1)))
        • maxRetries:最⼤重试次数 maxSleepMs:最⼤sleep时间,如果上述的当前sleep计算出来⽐这个⼤,那么sleep用 这个时间,默认的最⼤时间是Integer.MAX_VALUE毫秒。
    • 其他,查看org.apache.curator.RetryPolicy接⼝的实现类
  • start():完成会话的创建

代码如下:

public class ZkClientFactory {

    /**
     * @param connectionString    zk的连接地址
     * @param retryPolicy         重试策略
     * @param connectionTimeoutMs 连接
     * @param sessionTimeoutMs
     * @return CuratorFramework 实例
     */
    public static CuratorFramework createWithOptions(
            String connectionString,
            RetryPolicy retryPolicy,
            String namespace,
            int connectionTimeoutMs,
            int sessionTimeoutMs) {

        // builder 模式创建 CuratorFramework 实例
        return CuratorFrameworkFactory.builder()
                .connectString(connectionString)
                .retryPolicy(retryPolicy)
                .connectionTimeoutMs(connectionTimeoutMs)
                .sessionTimeoutMs(sessionTimeoutMs)
                .namespace(namespace)
                // 其他的创建选项
                .build();
    }
}

需要注意的是 namespace 含有隔离命名空间,即客户端对Zookeeper上数据节点的任何操作都是相对 namespace ⽬录进⾏的,这有利于实现不同的Zookeeper的业务之间的隔离

1.3. CRUD 之 Create 创建节点

使用create()方法,最后使用forPath带上需要创建的节点路径。

client.create()
      .creatingParentContainersIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .forPath("path","init".getBytes());

Zookeeper的节点创建模式:

使用withMode()方法,设置节点的类型。zookeeper节点有四种类型:

  • (1)PERSISTENT 持久节点

  • (2)PERSISTENT_SEQUENTIAL 持久顺序节点

  • (3)PHEMERAL 临时节

  • (4)EPHEMERAL_SEQUENTIAL 临时顺序节点

下面详细介绍一下四种节点的区别和联系。

(1)持久节点(PERSISTENT)

所谓持久节点,是指在节点创建后,就一直存在,直到有删除操作来主动清除这个节点。持久节点的生命周期是永久有效,不会因为创建该节点的客户端会话失效而消失。

(2)持久顺序节点(PERSISTENT_SEQUENTIAL)

这类节点的生命周期和持久节点是一致的。额外的特性是,在ZK中,每个父节点会为他的第一级子节点维护一份次序,会记录每个子节点创建的先后顺序。如果在创建子节点的时候,可以设置这个属性,那么在创建节点过程中,ZK会自动为给定节点名加上一个表示次序的数字后缀,作为新的节点名。这个次序后缀的范围是整型的最大值。

比如,在创建节点的时候只需要传入节点 “/test_”,这样之后,zookeeper自动会给”test_”后面补充数字次序。

(3)临时节点(EPHEMERAL)

和持久节点不同的是,临时节点的生命周期和客户端会话绑定。也就是说,如果客户端会话失效,那么这个节点就会自动被清除掉。注意,这里提到的是会话失效,而非连接断开。这里还要注意一件事,就是当你客户端会话失效后,所产生的节点也不是一下子就消失了,也要过一段时间,大概是10秒以内,可以试一下,本机操作生成节点,在服务器端用命令来查看当前的节点数目,你会发现客户端已经stop,但是产生的节点还在。

另外,在临时节点下面不能创建子节点。

(4)临时顺序节点(EPHEMERAL_SEQUENTIAL)

此节点是属于临时节点,不过带有顺序,客户端会话结束节点就消失。

创建一个节点,初始内容为空

client.create().forPath("path");

注意:如果没有设置节点属性,节点创建模式默认为持久化节点,内容默认为空

创建一个节点,附带初始化内容

client.create().forPath("path","init".getBytes());

创建一个节点,指定创建模式(临时节点),内容为空

client.create().withMode(CreateMode.EPHEMERAL).forPath("path");

创建一个节点,指定创建模式(临时节点),附带初始化内容

client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes());

创建一个节点,指定创建模式(临时节点),附带初始化内容,并且自动递归创建父节点

这个creatingParentContainersIfNeeded()接口非常有用,因为一般情况开发人员在创建一个子节点必须判断它的父节点是否存在,如果不存在直接创建会抛出NoNodeException,使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点。

1.4. CRUD 之Read获取节点数据

与节点读取的有关的方法,主要有三个:

(1)首先是判断节点是否存在,使用checkExists方法。

(2)其次是获取节点的数据,使用getData方法。

(3)最后是获取子节点列表,使用getChildren方法。

演示代码如下:

读取一个节点的数据内容

client.getData().forPath("path");

注意,此方法返的返回值是byte[ ];

读取一个节点的数据内容,同时获取到该节点的stat

Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("path");

1.5. CRUD 之update更新节点

更新一个节点的数据内容

client.setData().forPath("path","data".getBytes());

注意:该接口会返回一个Stat实例

更新一个节点的数据内容,强制指定版本进行更新

client.setData().withVersion(10086).forPath("path","data".getBytes());

检查节点是否存在

client.checkExists().forPath("path");

注意:该方法返回一个Stat实例,用于检查ZNode是否存在的操作. 可以调用额外的方法(监控或者后台处理)并在最后调用forPath( )指定要操作的ZNode

获取某个节点的所有子节点路径

client.getChildren().forPath("path");

注意:该方法的返回值为List,获得ZNode的子节点Path列表。 可以调用额外的方法(监控、后台处理或者获取状态watch, background or get stat) 并在最后调用forPath()指定要操作的父ZNode

异步更新的代码如下:

client.setData().inBackground(callback)
                .forPath(zkPath, payload);

1.6. CRUD 之delete删除节点

删除一个节点

client.delete().forPath("path");

注意,此方法只能删除叶子节点,否则会抛出异常。

删除一个节点,并且递归删除其所有的子节点

client.delete().deletingChildrenIfNeeded().forPath("path");

删除一个节点,强制指定版本进行删除

client.delete().withVersion(10086).forPath("path");

删除一个节点,强制保证删除

client.delete().guaranteed().forPath("path");

guaranteed()接口是一个保障措施,只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到删除节点成功。

**注意:**上面的多个流式接口是可以自由组合的,例如:

client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(10086).forPath("path");

1.7 异步接口

上面提到的创建、删除、更新、读取等方法都是同步的,Curator提供异步接口,引入了BackgroundCallback接口用于处理异步接口调用之后服务端返回的结果信息。BackgroundCallback接口中一个重要的回调值为CuratorEvent,里面包含事件类型、响应吗和节点的详细信息。

CuratorEventType

事件类型对应CuratorFramework实例的方法
CREATE#create()
DELETE#delete()
EXISTS#checkExists()
GET_DATA#getData()
SET_DATA#setData()
CHILDREN#getChildren()
SYNC#sync(String,Object)
GET_ACL#getACL()
SET_ACL#setACL()
WATCHED#Watcher(Watcher)
CLOSING#close()

响应码(#getResultCode())

响应码意义
0OK,即调用成功
-4ConnectionLoss,即客户端与服务端断开连接
-110NodeExists,即节点已经存在
-112SessionExpired,即会话过期

一个异步创建节点的例子如下:

Executor executor = Executors.newFixedThreadPool(2);
client.create()
     .creatingParentsIfNeeded()
     .withMode(CreateMode.EPHEMERAL)
     .inBackground((curatorFramework, curatorEvent) -> {   
     System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
     },executor)
     .forPath("path");

注意:如果#inBackground()方法不指定executor,那么会默认使用Curator的EventThread去进行异步处理。

2. Curator食谱(高级特性)

提醒:首先你必须添加curator-recipes依赖,下文仅仅对recipes一些特性的使用进行解释和举例,不打算进行源码级别的探讨

  		<dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.4.0</version>
        </dependency>

重要提醒:强烈推荐使用ConnectionStateListener监控连接的状态,当连接状态为LOST,curator-recipes下的所有Api将会失效或者过期,尽管后面所有的例子都没有使用到ConnectionStateListener。

2.1 缓存

Zookeeper原生支持通过注册Watcher来进行事件监听,但是开发者需要反复注册(Watcher只能单次注册单次使用)。Cache是Curator中对事件监听的包装,可以看作是对事件监听的本地缓存视图,能够自动为开发者处理反复注册监听。Curator提供了三种Watcher(Cache)来监听结点的变化。

2.1.1 Path Cache

Path Cache用来监控一个ZNode的子节点. 当一个子节点增加, 更新,删除时, Path Cache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态,而状态的更变将通过PathChildrenCacheListener通知。

实际使用时会涉及到四个类:

  • PathChildrenCache
  • PathChildrenCacheEvent
  • PathChildrenCacheListener
  • ChildData

通过下面的构造函数创建Path Cache:

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)

想使用cache,必须调用它的start方法,使用完后调用close方法。 可以设置StartMode来实现启动的模式

StartMode 有下面几种:

  1. NORMAL:正常初始化。
  2. BUILD_INITIAL_CACHE:在调用 start() 之前会调用 rebuild()
  3. POST_INITIALIZED_EVENT: 当 Cache 初始化数据后发送一个 PathChildrenCacheEvent.Type#INITIALIZED 事件

public void addListener(PathChildrenCacheListener listener) 可以增加 listener 监听缓存的变化。

getCurrentData() 方法返回一个 List 对象,可以遍历所有的子节点。

设置/更新、移除其实是使用client (CuratorFramework)来操作, 不通过PathChildrenCache操作:

	@Test
    public void testPathCache() throws Exception {


        PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
        cache.start();
        PathChildrenCacheListener cacheListener = (client1, event) -> {
            System.out.println("事件类型:" + event.getType());
            ChildData data = event.getData();
            if (null != data) {
                System.out.println("节点数据:" + data.getPath() + " = " + new String(data.getData()));
            }
        };
        cache.getListenable().addListener(cacheListener);
        client.create().creatingParentsIfNeeded().forPath(PATH + "/test01", "01".getBytes());
        Thread.sleep(10);
        client.create().creatingParentsIfNeeded().forPath(PATH + "/test02", "02".getBytes());
        Thread.sleep(10);
        client.setData().forPath(PATH + "/test01", "01_V2".getBytes());
        Thread.sleep(10);
        for (ChildData data : cache.getCurrentData()) {
            System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
        }
        client.delete().forPath(PATH + "/test01");
        Thread.sleep(10);
        client.delete().forPath(PATH + "/test02");
        Thread.sleep(1000 * 5);
        cache.close();
        client.close();
        System.out.println("OK!");
    }

**注意:**如果new PathChildrenCache(client, PATH, true)中的参数cacheData值设置为false,则示例中的event.getData().getData()、data.getData()将返回null,cache将不会缓存节点数据。

**注意:**示例中的Thread.sleep(10)可以注释掉,但是注释后事件监听的触发次数会不全,这可能与PathCache的实现原理有关,不能太过频繁的触发事件!

2.1.2 node Cache

Node Cache与Path Cache类似,Node Cache只是监听某一个特定的节点。它涉及到下面的三个类:

  • NodeCache - Node Cache实现类
  • NodeCacheListener - 节点监听器
  • ChildData - 节点数据

**注意:**使用cache,依然要调用它的start()方法,使用完后调用close()方法。

getCurrentData()将得到节点当前的状态,通过它的状态可以得到当前的值。

	@Test
    public void testNodeCache() throws Exception {

        client.create().creatingParentsIfNeeded().forPath(PATH);
        final NodeCache cache = new NodeCache(client, PATH);
        NodeCacheListener listener = () -> {
            ChildData data = cache.getCurrentData();
            if (null != data) {
                System.out.println("节点数据:" + new String(data.getData()));
            } else {
                System.out.println("节点被删除!");
            }
        };
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");

**注意:**示例中的Thread.sleep(10)可以注释,但是注释后事件监听的触发次数会不全,这可能与NodeCache的实现原理有关,不能太过频繁的触发事件!

**注意:**NodeCache只能监听一个节点的状态变化。

2.1.3 Tree Cache

Tree Cache可以监控整个树上的所有节点(本节点和子节点),类似于PathCache和NodeCache的组合,主要涉及到下面四个类:

  • TreeCache - Tree Cache实现类
  • TreeCacheListener - 监听器类
  • TreeCacheEvent - 触发的事件类
  • ChildData - 节点数据
	@Test
    public void testTreeCache() throws Exception {


        client.create().creatingParentsIfNeeded().forPath(PATH);

        TreeCache cache = new TreeCache(client, PATH);
        TreeCacheListener listener = (client1, event) -> {
            byte[] bytes = client.getData().forPath(PATH);
            System.out.println("bytes = " + bytes);
            System.out.println("事件类型:" + event.getType() +
                    " | 路径:" + (null != event.getData() ? event.getData().getPath() : null));
        };
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);

        client.create().forPath(PATH + "/SubTree", "SubTree".getBytes());
        client.setData().forPath(PATH + "/SubTree", "00PATHSubTree".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }

**注意:**TreeCache在初始化(调用start()方法)的时候会回调TreeCacheListener实例一个事TreeCacheEvent,而回调的TreeCacheEvent对象的Type为INITIALIZED,ChildData为null,此时event.getData().getPath()很有可能导致空指针异常,这里应该主动处理并避免这种情况。

3. Leader选举

在分布式计算中, leader elections是很重要的一个功能, 这个选举过程是这样子的: 指派一个进程作为组织者,将任务分发给各节点。 在任务开始前, 哪个节点都不知道谁是leader(领导者)或者coordinator(协调者). 当选举算法开始执行后, 每个节点最终会得到一个唯一的节点作为任务leader. 除此之外, 选举还经常会发生在leader意外宕机的情况下,新的leader要被选举出来。

在zookeeper集群中,leader负责写操作,然后通过Zab协议实现follower的同步,leader或者follower都可以处理读操作。

Curator 有两种leader选举的recipe,分别是LeaderSelectorLeaderLatch

LeaderSelector 前者是所有存活的客户端不间断的轮流做Leader,大同社会。

LeaderLatch 后者是一旦选举出Leader,除非有客户端挂掉重新触发选举,否则不会交出领导权。

3.1LeaderLatch

LeaderLatch有两个构造函数:

public LeaderLatch(CuratorFramework client, String latchPath)
public LeaderLatch(CuratorFramework client, String latchPath,  String id)

LeaderLatch的启动:

leaderLatch.start( );

一旦启动,LeaderLatch会和其它使用相同latchPath的其它LeaderLatch交涉,然后其中一个最终会被选举为leader,可以通过hasLeadership方法查看LeaderLatch实例是否leader:

leaderLatch.hasLeadership( );//返回true说明当前实例是leader

类似JDK的CountDownLatch, LeaderLatch在请求成为leadership会block(阻塞),一旦不使用LeaderLatch了,必须调用close方法。 如果它是leader,会释放leadership, 其它的参与者将会选举一个leader。

 public void await() throws InterruptedException,EOFException
/*Causes the current thread to wait until this instance acquires leadership
unless the thread is interrupted or closed.*/
public boolean await(long timeout,TimeUnit unit)throws InterruptedException

异常处理: LeaderLatch实例可以增加ConnectionStateListener来监听网络连接问题。 当 SUSPENDED 或 LOST 时, leader不再认为自己还是leader。当LOST后连接重连后RECONNECTED,LeaderLatch会删除先前的ZNode然后重新创建一个。LeaderLatch用户必须考虑导致leadership丢失的连接问题。 强烈推荐你使用ConnectionStateListener。

一个LeaderLatch的使用例子:

 	@Test
    public void testLeaderLatch() throws Exception {

        List<CuratorFramework> clients = Lists.newArrayList();

        List<LeaderLatch> examples = Lists.newArrayList();

        try {
            for (int i = 0; i < CLIENT_QTY; i++) {

                CuratorFramework client = getClient();
                clients.add(client);
                LeaderLatch latch = new LeaderLatch(client, PATH, "Client #" + i);
                latch.addListener(new LeaderLatchListener() {

                    @Override
                    public void isLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am Leader");
                    }

                    @Override
                    public void notLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am not Leader");
                    }
                });
                examples.add(latch);
                client.start();
                latch.start();
            }
            Thread.sleep(1000);
            LeaderLatch currentLeader = null;
            do {
                for (LeaderLatch latch : examples) {
                    if (latch.hasLeadership()) {
                        currentLeader = latch;
                        System.out.println("current leader is " + currentLeader.getId());
                    }
                }
            } while (currentLeader == null);


            System.out.println("release the leader " + currentLeader.getId());
            currentLeader.close();
            //currentLeader.start();

            Thread.sleep(5000);

            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
        } finally {
            for (LeaderLatch latch : examples) {
                if (null != latch.getState() && latch.getState() != LeaderLatch.State.CLOSED)
                    CloseableUtils.closeQuietly(latch);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
        }
    }

首先我们创建了10个LeaderLatch,启动后它们中的一个会被选举为leader。 因为选举会花费一些时间,start后并不能马上就得到leader。
通过hasLeadership查看自己是否是leader, 如果是的话返回true。
可以通过.getLeader().getId()可以得到当前的leader的ID。
只能通过close释放当前的领导权。
await是一个阻塞方法, 尝试获取leader地位,但是未必能上位。

3.2 LeaderSelector

LeaderSelector使用的时候主要涉及下面几个类:

  • LeaderSelector
  • LeaderSelectorListener
  • LeaderSelectorListenerAdapter
  • CancelLeadershipException

核心类是LeaderSelector,它的构造函数如下:

public LeaderSelector(CuratorFramework client, String mutexPath,LeaderSelectorListener listener)
public LeaderSelector(CuratorFramework client, String mutexPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener)

类似LeaderLatch,LeaderSelector必须start: leaderSelector.start(); 一旦启动,当实例取得领导权时你的listener的takeLeadership()方法被调用。而takeLeadership()方法只有领导权被释放时才返回。 当你不再使用LeaderSelector实例时,应该调用它的close方法。

异常处理 LeaderSelectorListener类继承ConnectionStateListener。LeaderSelector必须小心连接状态的改变。如果实例成为leader, 它应该响应SUSPENDED 或 LOST。 当 SUSPENDED 状态出现时, 实例必须假定在重新连接成功之前它可能不再是leader了。 如果LOST状态出现, 实例不再是leader, takeLeadership方法返回。

重要: 推荐处理方式是当收到SUSPENDED 或 LOST时抛出CancelLeadershipException异常.。这会导致LeaderSelector实例中断并取消执行takeLeadership方法的异常.。这非常重要, 你必须考虑扩展LeaderSelectorListenerAdapter. LeaderSelectorListenerAdapter提供了推荐的处理逻辑。

下面的一个例子摘抄自官方:

 public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter implements Closeable {
    private final String name;
    private final LeaderSelector leaderSelector;
    private final AtomicInteger leaderCount = new AtomicInteger();
 
    public LeaderSelectorAdapter(CuratorFramework client, String path, String name) {
        this.name = name;
        leaderSelector = new LeaderSelector(client, path, this);
        leaderSelector.autoRequeue();
    }
 
    public void start() throws IOException {
        leaderSelector.start();
    }
 
    @Override
    public void close() throws IOException {
        leaderSelector.close();
    }
 
    @Override
    public void takeLeadership(CuratorFramework client) throws Exception {
        final int waitSeconds = (int) (5 * Math.random()) + 1;
        System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
        System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
        } catch (InterruptedException e) {
            System.err.println(name + " was interrupted.");
            Thread.currentThread().interrupt();
        } finally {
            System.out.println(name + " relinquishing leadership.\n");
        }
    }
}

你可以在takeLeadership进行任务的分配等等,并且不要返回,如果你想要要此实例一直是leader的话可以加一个死循环。调用 leaderSelector.autoRequeue();保证在此实例释放领导权之后还可能获得领导权。 在这里我们使用AtomicInteger来记录此client获得领导权的次数, 它是”fair”, 每个client有平等的机会获得领导权。

对比可知,LeaderLatch必须调用close()方法才会释放领导权,而对于LeaderSelector,通过LeaderSelectorListener可以对领导权进行控制, 在适当的时候释放领导权,这样每个节点都有可能获得领导权。从而,LeaderSelector具有更好的灵活性和可控性,建议有LeaderElection应用场景下优先使用LeaderSelector。

4.分布式锁

提醒:

1.推荐使用ConnectionStateListener监控连接的状态,因为当连接LOST时你不再拥有锁

2.分布式的锁全局同步, 这意味着任何一个时间点不会有两个客户端都拥有相同的锁。

4.1 可重入共享锁—Shared Reentrant Lock

Shared意味着锁是全局可见的, 客户端都可以请求锁。 Reentrant和JDK的ReentrantLock类似,即可重入, 意味着同一个客户端在拥有锁的同时,可以多次获取,不会被阻塞。 它是由类InterProcessMutex来实现。 它的构造函数为:

public InterProcessMutex(CuratorFramework client, String path)

通过acquire()获得锁,并提供超时机制:

    /**
     * Acquire the mutex - blocking until it's available. Note: the same thread
     * can call acquire re-entrantly. Each call to acquire must be balanced by a call
     * to {@link #release()}
     *
     * @throws Exception ZK errors, connection interruptions
     */
    @Override
    public void acquire() throws Exception

通过release()方法释放锁。 InterProcessMutex 实例可以重用。Revoking ZooKeeper recipes wiki定义了可协商的撤销机制。 为了撤销mutex, 调用下面的方法:

    /**
     * Perform one release of the mutex if the calling thread is the same thread that acquired it. If the
     * thread had made multiple calls to acquire, the mutex will still be held when this method returns.
     *
     * @throws Exception ZK errors, interruptions, current thread does not own the lock
     */
    @Override
    public void release() throws Exception

Revoking ZooKeeper recipes wiki定义了可协商的撤销机制。 为了撤销mutex, 调用下面的方法:

将锁设为可撤销的. 当别的进程或线程想让你释放锁时Listener会被调用。
public void makeRevocable(RevocationListener<T> listener)

如果你请求撤销当前的锁, 调用attemptRevoke()方法,注意锁释放时RevocationListener将会回调。

public static void attemptRevoke(CuratorFramework client,String path) throws Exception

测试代码

 private InterProcessMutex createLock() {
        return new InterProcessMutex(client, PATH);
    }


    @Test
    public void testShareLock() throws InterruptedException {

        CountDownLatch countDownLatch = new CountDownLatch(QTY);
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        List<InterProcessMutex> locks = new ArrayList<>();
        for (int i = 0; i < QTY; ++i) {
            int finalI = i;
            Runnable runnable = () -> {
                InterProcessMutex lock = createLock();
                locks.add(lock);

                boolean acquire = false;
                try {
                    lock.acquire(10000, TimeUnit.SECONDS);
                    System.out.println("acquire Lock and run  " + finalI + " time:" + System.currentTimeMillis());
                    Thread.sleep(10000);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                } finally {
                    try {
                        lock.release();
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }

                System.out.println("acquire Lock end : " + finalI + " time:" + System.currentTimeMillis());
                countDownLatch.countDown();
            };


            service.execute(runnable);
            //service.execute(runnable);
        }

        countDownLatch.await();
    }

4.2 不可重入共享锁—Shared Lock

这个锁和上面的InterProcessMutex相比,就是少了Reentrant的功能,也就意味着它不能在同一个线程中重入。这个类是InterProcessSemaphoreMutex,使用方法和InterProcessMutex类似

private InterProcessSemaphoreMutex createInterProcessSemaphoreMutexLock() {
        return new InterProcessSemaphoreMutex(client, PATH);
    }

    @Test
    public void testInterProcessSemaphoreMutex() throws InterruptedException {


        CountDownLatch countDownLatch = new CountDownLatch(QTY);

        ExecutorService service = Executors.newFixedThreadPool(QTY);
        List<InterProcessSemaphoreMutex> locks = new ArrayList<>();
        for (int i = 0; i < QTY; ++i) {
            int finalI = i;
            Runnable runnable = () -> {
                InterProcessSemaphoreMutex lock = createInterProcessSemaphoreMutexLock();
                locks.add(lock);

                boolean acquire = false;
                try {
                    lock.acquire(10000, TimeUnit.SECONDS);
                    //lock.acquire(10000, TimeUnit.SECONDS);
                    System.out.println("acquire Lock and run  " + finalI + " time:" + System.currentTimeMillis());
                    Thread.sleep(10000);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                } finally {
                    try {
                        lock.release();
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }

                System.out.println("acquire Lock end : " + finalI + " time:" + System.currentTimeMillis());
                countDownLatch.countDown();
            };

			service.execute(runnable);
            service.execute(runnable);
        }

        countDownLatch.await();
    }

运行后发现,有且只有一个client成功获取第一个锁(第一个acquire()方法返回true),然后它自己阻塞在第二个acquire()方法,获取第二个锁超时;其他所有的客户端都阻塞在第一个acquire()方法超时并且抛出异常。

这样也就验证了InterProcessSemaphoreMutex实现的锁是不可重入的。

4.3 可重入读写锁—Shared Reentrant Read Write Lock

类似JDK的ReentrantReadWriteLock。一个读写锁管理一对相关的锁。一个负责读操作,另外一个负责写操作。读操作在写锁没被使用时可同时由多个进程使用,而写锁在使用时不允许读(阻塞)。

此锁是可重入的。一个拥有写锁的线程可重入读锁,但是读锁却不能进入写锁。这也意味着写锁可以降级成读锁, 比如请求写锁 —>请求读锁—>释放读锁 ---->释放写锁。从读锁升级成写锁是不行的。

可重入读写锁主要由两个类实现:InterProcessReadWriteLockInterProcessMutex。使用时首先创建一个InterProcessReadWriteLock实例,然后再根据你的需求得到读锁或者写锁,读写锁的类型是InterProcessMutex

@Test
    public void testReadWriteLock() throws InterruptedException {

        CountDownLatch countDownLatch = new CountDownLatch(QTY);

        ExecutorService service = Executors.newFixedThreadPool(QTY);
        List<InterProcessReadWriteLock> locks = new ArrayList<>();
        for (int i = 0; i < QTY; ++i) {
            int finalI = i;
            Runnable runnable = () -> {
                InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, PATH);
                InterProcessReadWriteLock.ReadLock readLock = lock.readLock();
                InterProcessReadWriteLock.WriteLock writeLock = lock.writeLock();
                locks.add(lock);

                boolean acquire = false;
                try {
                    if (readLock.acquire(-1, TimeUnit.SECONDS)) {
                        System.out.println("acquire readLock and run  " + finalI + " time:" + System.currentTimeMillis());
                        readLock.release();
                        Thread.sleep(1000);
                        System.out.println("acquire readLock end  " + finalI + " time:" + System.currentTimeMillis());
                    }
                    if (finalI / 2 == 0 && writeLock.acquire(-1, TimeUnit.SECONDS)) {
                        System.out.println("acquire writeLock and run  " + finalI + " time:" + System.currentTimeMillis());
                        Thread.sleep(1000);
                        System.out.println("acquire writeLock end  " + finalI + " time:" + System.currentTimeMillis());
                    }
                    System.out.println("acquire Lock and run  " + finalI + " time:" + System.currentTimeMillis());
                    Thread.sleep(10000);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }

                System.out.println("acquire Lock end : " + finalI + " time:" + System.currentTimeMillis());
                countDownLatch.countDown();
            };


            service.execute(runnable);
            //service.execute(runnable);
        }

        countDownLatch.await();
    }

4.4 信号量—Shared Semaphore

一个计数的信号量类似JDK的Semaphore。 JDK中Semaphore维护的一组许可(permits),而Curator中称之为租约(Lease)。 有两种方式可以决定semaphore的最大租约数。第一种方式是用户给定path并且指定最大LeaseSize。第二种方式用户给定path并且使用SharedCountReader类。如果不使用SharedCountReader, 必须保证所有实例在多进程中使用相同的(最大)租约数量,否则有可能出现A进程中的实例持有最大租约数量为10,但是在B进程中持有的最大租约数量为20,此时租约的意义就失效了。

这次调用acquire()会返回一个租约对象。 客户端必须在finally中close这些租约对象,否则这些租约会丢失掉。 但是, 但是,如果客户端session由于某种原因比如crash丢掉, 那么这些客户端持有的租约会自动close, 这样其它客户端可以继续使用这些租约。 租约还可以通过下面的方式返还:

public void returnAll(Collection<Lease> leases)
public void returnLease(Lease lease)

注意你可以一次性请求多个租约,如果Semaphore当前的租约不够,则请求线程会被阻塞。 同时还提供了超时的重载方法。

public Lease acquire()
public Collection<Lease> acquire(int qty)
public Lease acquire(long time, TimeUnit unit)
public Collection<Lease> acquire(int qty, long time, TimeUnit unit)

Shared Semaphore使用的主要类包括下面几个:

  • InterProcessSemaphoreV2
  • Lease
  • SharedCountReader
@Test
    public void testInterProcessSemaphore() throws Exception {
        InterProcessSemaphoreV2 interProcessSemaphoreV2 = new InterProcessSemaphoreV2(client, PATH, QTY);

        ExecutorService service = Executors.newFixedThreadPool(QTY);
        CountDownLatch countDownLatch = new CountDownLatch(2);


        Runnable runnable1 = () -> {
            Collection<Lease> acquire = null;
            try {
                acquire = interProcessSemaphoreV2.acquire(2, 1000000, TimeUnit.SECONDS);
                System.out.println("acquire = runnable1" + acquire);
                Thread.sleep(10000);

                interProcessSemaphoreV2.returnAll(acquire);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }

            countDownLatch.countDown();
        };

        Runnable runnable2 = () -> {
            Collection<Lease> acquire = null;
            try {
                acquire = interProcessSemaphoreV2.acquire(1, 1000000, TimeUnit.SECONDS);
                System.out.println("acquire = runnable2" + acquire);
                Thread.sleep(10000);
                interProcessSemaphoreV2.returnAll(acquire);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }

            countDownLatch.countDown();
        };

        Runnable runnable3 = () -> {
            Collection<Lease> acquire = null;
            try {
                acquire = interProcessSemaphoreV2.acquire(QTY, 1000000, TimeUnit.SECONDS);
                System.out.println("acquire = runnable3" + acquire);
                Thread.sleep(10000);
                interProcessSemaphoreV2.returnAll(acquire);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
            countDownLatch.countDown();
        };


        service.execute(runnable1);
        Thread.sleep(1000);
        service.execute(runnable2);
        Thread.sleep(1000);
        service.execute(runnable3);

        countDownLatch.await();
    }

首先我们先获得了2个租约, 10s后我们把它还给了semaphore。 接着请求了一个租约,因为semaphore还有3个租约,所以请求可以满足,返回一个租约,还剩2个租约。 然后再请求5个租约,因为租约不够,阻塞到超时,还是没能满足,返回结果为null(租约不足会阻塞到超时,然后返回null,不会主动抛出异常;如果不设置超时时间,会一致阻塞)。

上面说讲的锁都是公平锁(fair)。 总ZooKeeper的角度看, 每个客户端都按照请求的顺序获得锁,不存在非公平的抢占的情况。

4.5 多共享锁对象 —Multi Shared Lock

Multi Shared Lock是一个锁的容器。 当调用acquire(), 所有的锁都会被acquire(),如果请求失败,所有的锁都会被release。 同样调用release时所有的锁都被release(失败被忽略)。 基本上,它就是组锁的代表,在它上面的请求释放操作都会传递给它包含的所有的锁。

主要涉及两个类:

  • InterProcessMultiLock
  • InterProcessLock

它的构造函数需要包含的锁的集合,或者一组ZooKeeper的path。

public InterProcessMultiLock(List<InterProcessLock> locks)
public InterProcessMultiLock(CuratorFramework client, List<String> paths)
public class MultiSharedLockDemo {

    private static final String PATH1 = "/examples/locks1";
    private static final String PATH2 = "/examples/locks2";

    private static final String PATH = "/curator-test";
    static CuratorFramework client;

    public static void main(String[] args) throws Exception {


        before();

        InterProcessLock lock1 = new InterProcessMutex(client, PATH1);
        InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2);

        InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));

        if (!lock.acquire(10, TimeUnit.SECONDS)) {
            throw new IllegalStateException("could not acquire the lock");
        }
        System.out.println("has got all lock");

        System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
        System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());

        try {
            //access resource exclusively
            System.out.println("lock = " + lock);
            Thread.sleep(1000);
        } finally {
            System.out.println("releasing the lock");
            lock.release(); // always release the lock in a finally block
        }
        System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
        System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());
    }

    public static void before() {

        ExponentialBackoffRetry retryPolicy =
                new ExponentialBackoffRetry(100, 3);
        client = ZkClientFactory.createWithOptions(
                "192.168.1.13:2181", retryPolicy, null, 3000, 20000);

        client.start();
    }
}

新建一个InterProcessMultiLock, 包含一个重入锁和一个非重入锁。 调用acquire()后可以看到线程同时拥有了这两个锁。 调用release()看到这两个锁都被释放了。

5.分布式计数器

顾名思义,计数器是用来计数的, 利用ZooKeeper可以实现一个集群共享的计数器。 只要使用相同的path就可以得到最新的计数器值, 这是由ZooKeeper的一致性保证的。Curator有两个计数器, 一个是用int来计数(SharedCount),一个用long来计数(DistributedAtomicLong)。

5.1 分布式int计数器—SharedCount

这个类使用int类型来计数。 主要涉及三个类。

  • SharedCount
  • SharedCountReader
  • SharedCountListener

SharedCount代表计数器, 可以为它增加一个SharedCountListener,当计数器改变时此Listener可以监听到改变的事件,而SharedCountReader可以读取到最新的值, 包括字面值和带版本信息的值VersionedValue。

 public class SharedCounterDemo implements SharedCountListener {
 
    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";
 
    public static void main(String[] args) throws IOException, Exception {
        final Random rand = new Random();
        SharedCounterDemo example = new SharedCounterDemo();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
 
            SharedCount baseCount = new SharedCount(client, PATH, 0);
            baseCount.addListener(example);
            baseCount.start();
 
            List<SharedCount> examples = Lists.newArrayList();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final SharedCount count = new SharedCount(client, PATH, 0);
                examples.add(count);
                Callable<Void> task = () -> {
                    count.start();
                    Thread.sleep(rand.nextInt(10000));
                    System.out.println("Increment:" + count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10)));
                    return null;
                };
                service.submit(task);
            }
 
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
 
            for (int i = 0; i < QTY; ++i) {
                examples.get(i).close();
            }
            baseCount.close();
        }
        Thread.sleep(Integer.MAX_VALUE);
    }
 
    @Override
    public void stateChanged(CuratorFramework arg0, ConnectionState arg1) {
        System.out.println("State changed: " + arg1.toString());
    }
 
    @Override
    public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
        System.out.println("Counter's value is changed to " + newCount);
    }
}

在这个例子中,我们使用baseCount来监听计数值(addListener方法来添加SharedCountListener )。 任意的SharedCount, 只要使用相同的path,都可以得到这个计数值。 然后我们使用5个线程为计数值增加一个10以内的随机数。相同的path的SharedCount对计数值进行更改,将会回调给baseCount的SharedCountListener。

count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10))

这里我们使用trySetCount去设置计数器。 第一个参数提供当前的VersionedValue,如果期间其它client更新了此计数值, 你的更新可能不成功, 但是这时你的client更新了最新的值,所以失败了你可以尝试再更新一次。 而setCount是强制更新计数器的值

注意计数器必须start,使用完之后必须调用close关闭它。

5.2 分布式long计数器—DistributedAtomicLong

再看一个Long类型的计数器。 除了计数的范围比SharedCount大了之外, 它首先尝试使用乐观锁的方式设置计数器, 如果不成功(比如期间计数器已经被其它client更新了), 它使用InterProcessMutex方式来更新计数值。

可以从它的内部实现DistributedAtomicValue.trySet()中看出:

   AtomicValue<byte[]>   trySet(MakeValue makeValue) throws Exception
    {
        MutableAtomicValue<byte[]>  result = new MutableAtomicValue<byte[]>(null, null, false);
 
        tryOptimistic(result, makeValue);
        if ( !result.succeeded() && (mutex != null) )
        {
            tryWithMutex(result, makeValue);
        }
 
        return result;
    }

计数器有一系列的操作:

  • get(): 获取当前值
  • increment(): 加一
  • decrement(): 减一
  • add(): 增加特定的值
  • subtract(): 减去特定的值
  • trySet(): 尝试设置计数值
  • forceSet(): 强制设置计数值

必须检查返回结果的succeeded(), 它代表此操作是否成功。 如果操作成功, preValue()代表操作前的值, postValue()代表操作后的值。

 public class DistributedAtomicLongDemo {
 
    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";
 
    public static void main(String[] args) throws IOException, Exception {
        List<DistributedAtomicLong> examples = Lists.newArrayList();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10));
 
                examples.add(count);
                Callable<Void> task = () -> {
                    try {
                        AtomicValue<Long> value = count.increment();
                        System.out.println("succeed: " + value.succeeded());
                        if (value.succeeded())
                            System.out.println("Increment: from " + value.preValue() + " to " + value.postValue());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return null;
                };
                service.submit(task);
            }
 
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
}

6. 分布式队列

使用Curator也可以简化Ephemeral Node (临时节点)的操作。Curator也提供ZK Recipe的分布式队列实现。 利用ZK的 PERSISTENTS_EQUENTIAL节点, 可以保证放入到队列中的项目是按照顺序排队的。 如果单一的消费者从队列中取数据, 那么它是先入先出的,这也是队列的特点。 如果你严格要求顺序,你就的使用单一的消费者,可以使用Leader选举只让Leader作为唯一的消费者。

但是, 根据Netflix的Curator作者所说, ZooKeeper真心不适合做Queue,或者说ZK没有实现一个好的Queue,详细内容可以看 Tech Note 4, 原因有五:

  1. ZK有1MB 的传输限制。 实践中ZNode必须相对较小,而队列包含成千上万的消息,非常的大。
  2. 如果有很多节点,ZK启动时相当的慢。 而使用queue会导致好多ZNode. 你需要显著增大 initLimit 和 syncLimit.
  3. ZNode很大的时候很难清理。Netflix不得不创建了一个专门的程序做这事。
  4. 当很大量的包含成千上万的子节点的ZNode时, ZK的性能变得不好
  5. ZK的数据库完全放在内存中。 大量的Queue意味着会占用很多的内存空间。

尽管如此, Curator还是创建了各种Queue的实现。 如果Queue的数据量不太多,数据量不太大的情况下,酌情考虑,还是可以使用的。

6.1 分布式队列—DistributedQueue

DistributedQueue是最普通的一种队列。 它设计以下四个类:

  • QueueBuilder - 创建队列使用QueueBuilder,它也是其它队列的创建类
  • QueueConsumer - 队列中的消息消费者接口
  • QueueSerializer - 队列消息序列化和反序列化接口,提供了对队列中的对象的序列化和反序列化
  • DistributedQueue - 队列实现类

QueueConsumer是消费者,它可以接收队列的数据。处理队列中的数据的代码逻辑可以放在QueueConsumer.consumeMessage()中。

正常情况下先将消息从队列中移除,再交给消费者消费。但这是两个步骤,不是原子的。可以调用Builder的lockPath()消费者加锁,当消费者消费数据时持有锁,这样其它消费者不能消费此消息。如果消费失败或者进程死掉,消息可以交给其它进程。这会带来一点性能的损失。最好还是单消费者模式使用队列。

 public class DistributedQueueDemo {
 
    private static final String PATH = "/example/queue";
 
    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework clientA = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientA.start();
        CuratorFramework clientB = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientB.start();
        DistributedQueue<String> queueA;
        QueueBuilder<String> builderA = QueueBuilder.builder(clientA, createQueueConsumer("A"), createQueueSerializer(), PATH);
        queueA = builderA.buildQueue();
        queueA.start();
 
        DistributedQueue<String> queueB;
        QueueBuilder<String> builderB = QueueBuilder.builder(clientB, createQueueConsumer("B"), createQueueSerializer(), PATH);
        queueB = builderB.buildQueue();
        queueB.start();
        for (int i = 0; i < 100; i++) {
            queueA.put(" test-A-" + i);
            Thread.sleep(10);
            queueB.put(" test-B-" + i);
        }
        Thread.sleep(1000 * 10);// 等待消息消费完成
        queueB.close();
        queueA.close();
        clientB.close();
        clientA.close();
        System.out.println("OK!");
    }
 
    /**
     * 队列消息序列化实现类
     */
    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {
            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }
 
            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }
        };
    }
 
    /**
     * 定义队列消费者
     */
    private static QueueConsumer<String> createQueueConsumer(final String name) {
        return new QueueConsumer<String>() {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("连接状态改变: " + newState.name());
            }
 
            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("消费消息(" + name + "): " + message);
            }
        };
    }
}

例子中定义了两个分布式队列和两个消费者,因为PATH是相同的,会存在消费者抢占消费消息的情况。

6.2 带Id的分布式队列—DistributedIdQueue

DistributedIdQueue和上面的队列类似,但是可以为队列中的每一个元素设置一个ID。 可以通过ID把队列中任意的元素移除。 它涉及几个类:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedQueue
//创建
builder.buildIdQueue()
//放入元素
queue.put(aMessage, messageId);
//移除元素时
int numberRemoved = queue.remove(messageId);

6.3 优先级分布式队列—DistributedPriorityQueue

优先级队列对队列中的元素按照优先级进行排序。 Priority越小, 元素越靠前, 越先被消费掉

通过builder.buildPriorityQueue(minItemsBeforeRefresh)方法创建。 当优先级队列得到元素增删消息时,它会暂停处理当前的元素队列,然后刷新队列。minItemsBeforeRefresh指定刷新前当前活动的队列的最小数量。 主要设置你的程序可以容忍的不排序的最小值。

放入队列时需要指定优先级:

queue.put(aMessage, priority);

列子

 public class DistributedPriorityQueueDemo {
 
    private static final String PATH = "/example/queue";
 
    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedPriorityQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));
 
            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildPriorityQueue(0);
            queue.start();
 
            for (int i = 0; i < 10; i++) {
                int priority = (int) (Math.random() * 100);
                System.out.println("test-" + i + " priority:" + priority);
                queue.put("test-" + i, priority);
                Thread.sleep((long) (50 * Math.random()));
            }
 
            Thread.sleep(20000);
 
        } catch (Exception ex) {
 
        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }
 
    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {
 
            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }
 
            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }
 
        };
    }
 
    private static QueueConsumer<String> createQueueConsumer() {
 
        return new QueueConsumer<String>() {
 
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }
 
            @Override
            public void consumeMessage(String message) throws Exception {
                Thread.sleep(1000);
                System.out.println("consume one message: " + message);
            }
 
        };
    }
 
}

有时候你可能会有错觉,优先级设置并没有起效。那是因为优先级是对于队列积压的元素而言,如果消费速度过快有可能出现在后一个元素入队操作之前前一个元素已经被消费,这种情况下DistributedPriorityQueue会退化为DistributedQueue。

6.4 分布式延迟队列—DistributedDelayQueue

JDK中也有DelayQueue,不知道你是否熟悉。 DistributedDelayQueue也提供了类似的功能, 元素有个delay值, 消费者隔一段时间才能收到元素。

通过下面的语句创建:

QueueBuilder<MessageType>    builder = QueueBuilder.builder(client, consumer, serializer, path);
... more builder method calls as needed ...
DistributedDelayQueue<MessageType> queue = builder.buildDelayQueue();
//放入元素时可以指定delayUntilEpoch:

queue.put(aMessage, delayUntilEpoch);

注意delayUntilEpoch不是离现在的一个时间间隔, 比如20毫秒,而是未来的一个时间戳,如 System.currentTimeMillis() + 10秒。 如果delayUntilEpoch的时间已经过去,消息会立刻被消费者接收。

 public class DistributedDelayQueueDemo {
 
    private static final String PATH = "/example/queue";
 
    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedDelayQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));
 
            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildDelayQueue();
            queue.start();
 
            for (int i = 0; i < 10; i++) {
                queue.put("test-" + i, System.currentTimeMillis() + 10000);
            }
            System.out.println(new Date().getTime() + ": already put all items");
 
 
            Thread.sleep(20000);
 
        } catch (Exception ex) {
 
        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }
 
    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {
 
            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }
 
            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }
 
        };
    }
 
    private static QueueConsumer<String> createQueueConsumer() {
 
        return new QueueConsumer<String>() {
 
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }
 
            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println(new Date().getTime() + ": consume one message: " + message);
            }
 
        };
    }
}

7. 分布式屏障—Barrier

分布式Barrier是这样一个类: 它会阻塞所有节点上的等待进程,直到某一个被满足, 然后所有的节点继续进行。

比如赛马比赛中, 等赛马陆续来到起跑线前。 一声令下,所有的赛马都飞奔而出。

7.1 DistributedBarrier

DistributedBarrier类实现了栅栏的功能。 它的构造函数如下:

public DistributedBarrier(CuratorFramework client, String barrierPath)

首先你需要设置栅栏,它将阻塞在它上面等待的线程:

setBarrier();

然后需要阻塞的线程调用方法等待放行条件:

public void waitOnBarrier()

当条件满足时,移除栅栏,所有等待的线程将继续执行:

removeBarrier();

异常处理 DistributedBarrier 会监控连接状态,当连接断掉时waitOnBarrier()方法会抛出异常。

 public class DistributedBarrierDemo {
 
    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";
 
    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH);
            controlBarrier.setBarrier();
 
            for (int i = 0; i < QTY; ++i) {
                final DistributedBarrier barrier = new DistributedBarrier(client, PATH);
                final int index = i;
                Callable<Void> task = () -> {
                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " waits on Barrier");
                    barrier.waitOnBarrier();
                    System.out.println("Client #" + index + " begins");
                    return null;
                };
                service.submit(task);
            }
            Thread.sleep(10000);
            System.out.println("all Barrier instances should wait the condition");
            controlBarrier.removeBarrier();
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
 
            Thread.sleep(20000);
        }
    }
}

这个例子创建了controlBarrier来设置栅栏和移除栅栏。 我们创建了5个线程,在此Barrier上等待。 最后移除栅栏后所有的线程才继续执行。

7.2 双栅栏—DistributedDoubleBarrier

双栅栏允许客户端在计算的开始和结束时同步。当足够的进程加入到双栅栏时,进程开始计算, 当计算完成时,离开栅栏。 双栅栏类是DistributedDoubleBarrier。 构造函数为:

public DistributedDoubleBarrier(CuratorFramework client,
                                String barrierPath,
                                int memberQty)

memberQty是成员数量,当enter()方法被调用时,成员被阻塞,直到所有的成员都调用了enter()。 当leave()方法被调用时,它也阻塞调用线程,直到所有的成员都调用了leave()。 就像百米赛跑比赛, 发令枪响, 所有的运动员开始跑,等所有的运动员跑过终点线,比赛才结束。

 
public class DistributedDoubleBarrierDemo {
 
    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";
 
    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY);
                final int index = i;
                Callable<Void> task = () -> {
 
                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " enters");
                    barrier.enter();
                    System.out.println("Client #" + index + " begins");
                    Thread.sleep((long) (3000 * Math.random()));
                    barrier.leave();
                    System.out.println("Client #" + index + " left");
                    return null;
                };
                service.submit(task);
            }
 
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
}

代码

https://github.com/andanyoung/springboot/tree/master/zookeeper

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐