摘要

Dubbo Provider 在启动时会将自身的服务信息整理成 URL 注册到注册中心,Dubbo Consumer 在启动时会向注册中心订阅感兴趣的 Provider 信息,之后 Provider 和 Consumer 才能建立连接,进行后续的交互。可见,一个稳定、高效的注册中心对基于 Dubbo 的微服务来说是至关重要的

Dubbo 目前支持 Consul、etcd、Nacos、ZooKeeper、Redis 等多种开源组件作为注册中心,并且在 Dubbo 源码也有相应的接入模块,如下图所示:

Dubbo 官方推荐使用 ZooKeeper 作为注册中心,它是在实际生产中最常用的注册中心实现,这也是我们本课时要介绍 ZooKeeper 核心原理的原因。

要与 ZooKeeper 集群进行交互,我们可以使用 ZooKeeper 原生客户端或是 ZkClient、Apache Curator 等第三方开源客户端。在后面介绍 dubbo-registry-zookeeper 模块的具体实现时你会看到,Dubbo 底层使用的是 Apache Curator(GitHub - apache/curator: Apache CuratorApache Curator 是实践中最常用的 ZooKeeper 客户端。

一、ZooKeeper 核心概念

Apache ZooKeeper 是一个针对分布式系统的、可靠的、可扩展的协调服务,它通常作为统一命名服务、统一配置管理、注册中心(分布式集群管理)、分布式锁服务、Leader 选举服务等角色出现。很多分布式系统都依赖与 ZooKeeper 集群实现分布式系统间的协调调度,例如:Dubbo、HDFS 2.x、HBase、Kafka 等。ZooKeeper 已经成为现代分布式系统的标配。ZooKeeper 本身也是一个分布式应用程序,下图展示了 ZooKeeper 集群的核心架构。

1.1 ZooKeeper集群的核心架构

  • Client 节点:从业务角度来看,这是分布式应用中的一个节点,通过 ZkClient 或是其他 ZooKeeper 客户端与 ZooKeeper 集群中的一个 Server 实例维持长连接,并定时发送心跳。从 ZooKeeper 集群的角度来看,它是 ZooKeeper 集群的一个客户端,可以主动查询或操作 ZooKeeper 集群中的数据,也可以在某些 ZooKeeper 节点(ZNode)上添加监听。当被监听的 ZNode 节点发生变化时,例如,该 ZNode 节点被删除、新增子节点或是其中数据被修改等,ZooKeeper 集群都会立即通过长连接通知 Client。
  • Leader 节点:ZooKeeper 集群的主节点,负责整个 ZooKeeper 集群的写操作,保证集群内事务处理的顺序性。同时,还要负责整个集群中所有 Follower 节点与 Observer 节点的数据同步。
  • Follower 节点:ZooKeeper 集群中的从节点,可以接收 Client 读请求并向 Client 返回结果,并不处理写请求,而是转发到 Leader 节点完成写入操作。另外,Follower 节点还会参与 Leader 节点的选举。
  • Observer 节点:ZooKeeper 集群中特殊的从节点,不会参与 Leader 节点的选举,其他功能与 Follower 节点相同。引入 Observer 角色的目的是增加 ZooKeeper 集群读操作的吞吐量,如果单纯依靠增加 Follower 节点来提高 ZooKeeper 的读吞吐量,那么有一个很严重的副作用,就是 ZooKeeper 集群的写能力会大大降低,因为 ZooKeeper 写数据时需要 Leader 将写操作同步给半数以上的 Follower 节点。引入 Observer 节点使得 ZooKeeper 集群在写能力不降低的情况下,大大提升了读操作的吞吐量。

1.2 ZooKeeper数据结构

ZooKeeper 树型存储结构

ZNode 节点类型有如下四种:

  • 持久节点。 持久节点创建后,会一直存在,不会因创建该节点的 Client 会话失效而删除。
  • 持久顺序节点。 持久顺序节点的基本特性与持久节点一致,创建节点的过程中,ZooKeeper 会在其名字后自动追加一个单调增长的数字后缀,作为新的节点名。
  • 临时节点。 创建临时节点的 ZooKeeper Client 会话失效之后,其创建的临时节点会被 ZooKeeper 集群自动删除。与持久节点的另一点区别是,临时节点下面不能再创建子节点。
  • 临时顺序节点。 基本特性与临时节点一致,创建节点的过程中,ZooKeeper 会在其名字后自动追加一个单调增长的数字后缀,作为新的节点名

在每个 ZNode 中都维护着一个 stat 结构,记录了该 ZNode 的元数据,其中包括版本号、操作控制列表(ACL)、时间戳和数据长度等信息,如下表所示:

我们除了可以通过 ZooKeeper Client 对 ZNode 进行增删改查等基本操作,还可以注册 Watcher 监听 ZNode 节点、其中的数据以及子节点的变化。一旦监听到变化,则相应的 Watcher 即被触发,相应的 ZooKeeper Client 会立即得到通知。Watcher 有如下特点:

  • 主动推送。 Watcher 被触发时,由 ZooKeeper 集群主动将更新推送给客户端,而不需要客户端轮询。
  • 一次性。 数据变化时,Watcher 只会被触发一次。如果客户端想得到后续更新的通知,必须要在 Watcher 被触发后重新注册一个 Watcher。
  • 可见性。 如果一个客户端在读请求中附带 Watcher,Watcher 被触发的同时再次读取数据,客户端在得到 Watcher 消息之前肯定不可能看到更新后的数据。换句话说,更新通知先于更新结果。
  • 顺序性。 如果多个更新触发了多个 Watcher ,那 Watcher 被触发的顺序与更新顺序一致。

1.3 Zookeeper消息广播流程

ZooKeeper 集群中三种角色的节点(Leader、Follower 和 Observer)都可以处理 Client 的读请求,因为每个节点都保存了相同的数据副本,直接进行读取即可返回给 Client。

对于写请求,如果 Client 连接的是 Follower 节点(或 Observer 节点),则在 Follower 节点(或 Observer 节点)收到写请求将会被转发到 Leader 节点。下面是 Leader 处理写请求的核心流程:

  1. Leader 节点接收写请求后,会为写请求赋予一个全局唯一的 zxid(64 位自增 id),通过 zxid 的大小比较就可以实现写操作的顺序一致性。
  2. Leader 通过先进先出队列(会给每个 Follower 节点都创建一个队列,保证发送的顺序性),将带有 zxid 的消息作为一个 proposal(提案)分发给所有 Follower 节点。
  3. 当 Follower 节点接收到 proposal 之后,会先将 proposal 写到本地事务日志,写事务成功后再向 Leader 节点回一个 ACK 响应。
  4. 当 Leader 节点接收到过半 Follower 的 ACK 响应之后,Leader 节点就向所有 Follower 节点发送 COMMIT 命令,并在本地执行提交。
  5. 当 Follower 收到消息的 COMMIT 命令之后也会提交操作,写操作到此完成。
  6. 最后,Follower 节点会返回 Client 写请求相应的响应。

1.4 zooKeeper崩溃恢复

上面写请求处理流程中,如果发生 Leader 节点宕机,整个 ZooKeeper 集群可能处于两种状态:

  1. 当 Leader 节点收到半数以上 Follower 节点的 ACK 响应之后,会向各个 Follower 节点广播 COMMIT 命令,同时也会在本地执行 COMMIT 并向连接的客户端进行响应。如果在各个 Follower 收到 COMMIT 命令前 Leader 就宕机了,就会导致剩下的服务器没法执行这条消息。
  2. 当 Leader 节点生成 proposal 之后就宕机了,而其他 Follower 并没有收到此 proposal(或者只有一小部分 Follower 节点收到了这条 proposal),那么此次写操作就是执行失败的。

在 Leader 宕机后,ZooKeeper 会进入崩溃恢复模式,重新进行 Leader 节点的选举。

ZooKeeper 对新 Leader 有如下两个要求:

  1. 对于原 Leader 已经提交了的 proposal,新 Leader 必须能够广播并提交,这样就需要选择拥有最大 zxid 值的节点作为 Leader。
  2. 对于原 Leader 还未广播或只部分广播成功的 proposal,新 Leader 能够通知原 Leader 和已经同步了的 Follower 删除,从而保证集群数据的一致性。

ZooKeeper 选主使用的是 ZAB 协议,如果展开介绍的话内容会非常多,这里我们就通过一个示例简单介绍 ZooKeeper 选主的大致流程。

比如,当前集群中有 5 个 ZooKeeper 节点构成,sid 分别为 1、2、3、4 和 5,zxid 分别为 10、10、9、9 和 8,此时,sid 为 1 的节点是 Leader 节点。实际上,zxid 包含了 epoch(高 32 位)和自增计数器(低 32 位) 两部分。其中,epoch 是“纪元”的意思,标识当前 Leader 周期,每次选举时 epoch 部分都会递增,这就防止了网络隔离之后,上一周期的旧 Leader 重新连入集群造成不必要的重新选举。该示例中我们假设各个节点的 epoch 都相同。

某一时刻,节点 1 的服务器宕机了,ZooKeeper 集群开始进行选主。由于无法检测到集群中其他节点的状态信息(处于 Looking 状态),因此每个节点都将自己作为被选举的对象来进行投票。于是 sid 为 2、3、4、5 的节点,投票情况分别为(2,10)、(3,9)、(4,9)、(5,8),同时各个节点也会接收到来自其他节点的投票(这里以(sid, zxid)的形式来标识一次投票信息)。

  • 对于节点 2 来说,接收到(3,9)、(4,9)、(5,8)的投票,对比后发现自己的 zxid 最大,因此不需要做任何投票变更。
  • 对于节点 3 来说,接收到(2,10)、(4,9)、(5,8)的投票,对比后由于 2 的 zxid 比自己的 zxid 要大,因此需要更改投票,改投(2,10),并将改投后的票发给其他节点。
  • 对于节点 4 来说,接收到(2,10)、(3,9)、(5,8)的投票,对比后由于 2 的 zxid 比自己的 zxid 要大,因此需要更改投票,改投(2,10),并将改投后的票发给其他节点。
  • 对于节点 5 来说,也是一样,最终改投(2,10)。

经过第二轮投票后,集群中的每个节点都会再次收到其他机器的投票,然后开始统计投票,如果有过半的节点投了同一个节点,则该节点成为新的 Leader,这里显然节点 2 成了新 Leader节点

Leader 节点此时会将 epoch 值加 1,并将新生成的 epoch 分发给各个 Follower 节点。各个 Follower 节点收到全新的 epoch 后,返回 ACK 给 Leader 节点,并带上各自最大的 zxid 和历史事务日志信息。Leader 选出最大的 zxid,并更新自身历史事务日志,示例中的节点 2 无须更新。Leader 节点紧接着会将最新的事务日志同步给集群中所有的 Follower 节点,只有当半数 Follower 同步成功,这个准 Leader 节点才能成为正式的 Leader 节点并开始工作。

二、Apache Curator相关原理

2.1 ZKClinet客户端不足

  • ZooKeeper 的 Watcher 是一次性的,每次触发之后都需要重新进行注册。
  • 会话超时之后,没有实现自动重连的机制。
  • ZooKeeper 提供了非常详细的异常,异常处理显得非常烦琐,对开发新手来说,非常不友好。
  • 只提供了简单的 byte[] 数组的接口,没有提供基本类型以及对象级别的序列化。
  • 创建节点时,如果节点存在抛出异常,需要自行检查节点是否存在。
  • 删除节点就无法实现级联删除。

Apache Curator 是 Apache 基金会提供的一款 ZooKeeper 客户端,它提供了一套易用性和可读性非常强的 Fluent 风格的客户端 API ,可以帮助我们快速搭建稳定可靠的 ZooKeeper 客户端程序。

2.2 Curator基础 API 

public class Main { 

    public static void main(String[] args) throws Exception { 

        // Zookeeper集群地址,多个节点地址可以用逗号分隔 

        String zkAddress = "127.0.0.1:2181"; 

        // 重试策略,如果连接不上ZooKeeper集群,会重试三次,重试间隔会递增 

        RetryPolicy retryPolicy =

              new ExponentialBackoffRetry(1000, 3); 

        // 创建Curator Client并启动,启动成功之后,就可以与Zookeeper进行交互了 

        CuratorFramework client =

            CuratorFrameworkFactory.newClient(zkAddress, retryPolicy); 

        client.start(); 

        // 下面简单说明Curator中常用的API 

        // create()方法创建ZNode,可以调用额外方法来设置节点类型、添加Watcher 

        // 下面是创建一个名为"user"的持久节点,其中会存储一个test字符串 

        String path = client.create().withMode(CreateMode.PERSISTENT) 

            .forPath("/user", "test".getBytes()); 

        System.out.println(path); 

        // 输出:/user 

        // checkExists()方法可以检查一个节点是否存在 

        Stat stat = client.checkExists().forPath("/user"); 

        System.out.println(stat!=null); 

        // 输出:true,返回的Stat不为null,即表示节点存在 

        // getData()方法可以获取一个节点中的数据 

        byte[] data = client.getData().forPath("/user"); 

        System.out.println(new String(data)); 

        // 输出:test 

        // setData()方法可以设置一个节点中的数据 

        stat = client.setData().forPath("/user","data".getBytes()); 

        data = client.getData().forPath("/user"); 

        System.out.println(new String(data)); 

        // 输出:data 

        // 在/user节点下,创建多个临时顺序节点 

        for (int i = 0; i < 3; i++) { 

            client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL) 

                .forPath("/user/child-"; 

        } 

        // 获取所有子节点 

        List<String> children = client.getChildren().forPath("/user"); 

        System.out.println(children); 

        // 输出:[child-0000000002, child-0000000001, child-0000000000] 

        // delete()方法可以删除指定节点,deletingChildrenIfNeeded()方法 

        // 会级联删除子节点 

        client.delete().deletingChildrenIfNeeded().forPath("/user"); 

    } 

}

2.3 Background原理

上面介绍的创建、删除、更新、读取等方法都是同步的,Curator 提供异步接口,引入了BackgroundCallback 这个回调接口以及 CuratorListener 这个监听器,用于处理 Background 调用之后服务端返回的结果信息。BackgroundCallback 接口和 CuratorListener 监听器中接收一个 CuratorEvent 的参数,里面包含事件类型、响应码、节点路径等详细信息。

public class Main2 { 

    public static void main(String[] args) throws Exception { 

        // Zookeeper集群地址,多个节点地址可以用逗号分隔 

        String zkAddress = "127.0.0.1:2181"; 

        // 重试策略,如果连接不上ZooKeeper集群,会重试三次,重试间隔会递增 

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3); 

        // 创建Curator Client并启动,启动成功之后,就可以与Zookeeper进行交互了 

        CuratorFramework client = CuratorFrameworkFactory 

            .newClient(zkAddress, retryPolicy); 

        client.start(); 

        // 添加CuratorListener监听器,针对不同的事件进行处理 

        client.getCuratorListenable().addListener( 

          new CuratorListener() { 

            public void eventReceived(CuratorFramework client,

                  CuratorEvent event) throws Exception { 

                switch (event.getType()) { 

                    case CREATE: 

                        System.out.println("CREATE:" +

                              event.getPath()); 

                        break; 

                    case DELETE: 

                        System.out.println("DELETE:" +

                               event.getPath()); 

                        break; 

                    case EXISTS: 

                        System.out.println("EXISTS:" +

                                event.getPath()); 

                        break; 

                    case GET_DATA: 

                        System.out.println("GET_DATA:" +

                          event.getPath() + ","

                              + new String(event.getData())); 

                        break; 

                    case SET_DATA: 

                        System.out.println("SET_DATA:" + 

                                 new String(event.getData())); 

                        break; 

                    case CHILDREN: 

                        System.out.println("CHILDREN:" +

                                event.getPath()); 

                        break; 

                    default: 

                } 

            } 

        }); 

        // 注意:下面所有的操作都添加了inBackground()方法,转换为后台操作 

        client.create().withMode(CreateMode.PERSISTENT) 

            .inBackground().forPath("/user", "test".getBytes()); 

        client.checkExists().inBackground().forPath("/user"); 

        client.setData().inBackground().forPath("/user",

              "setData-Test".getBytes()); 

        client.getData().inBackground().forPath("/user"); 

        for (int i = 0; i < 3; i++) { 

            client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL) 

                .inBackground().forPath("/user/child-"); 

        } 

        client.getChildren().inBackground().forPath("/user"); 

        // 添加BackgroundCallback 

        client.getChildren().inBackground(new BackgroundCallback() { 

            public void processResult(CuratorFramework client,

                CuratorEvent event) throws Exception { 

                System.out.println("in background:"

                      + event.getType() + "," + event.getPath()); 

            } 

        }).forPath("/user"); 

        client.delete().deletingChildrenIfNeeded().inBackground() 

              .forPath("/user"); 

        System.in.read(); 

    } 

} 

// 输出: 

// CREATE:/user 

// EXISTS:/user 

// GET_DATA:/user,setData-Test 

// CREATE:/user/child- 

// CREATE:/user/child- 

// CREATE:/user/child- 

// CHILDREN:/user 

// DELETE:/user

2.4 连接状态监听原理

除了基础的数据操作,Curator 还提供了监听连接状态的监听器——ConnectionStateListener,它主要是处理 Curator 客户端和 ZooKeeper 服务器间连接的异常情况,例如, 短暂或者长时间断开连接。

短暂断开连接时,ZooKeeper 客户端会检测到与服务端的连接已经断开,但是服务端维护的客户端 Session 尚未过期,之后客户端和服务端重新建立了连接;当客户端重新连接后,由于 Session 没有过期,ZooKeeper 能够保证连接恢复后保持正常服务。

而长时间断开连接时,Session 已过期,与先前 Session 相关的 Watcher 和临时节点都会丢失。当 Curator 重新创建了与 ZooKeeper 的连接时,会获取到 Session 过期的相关异常,Curator 会销毁老 Session,并且创建一个新的 Session。由于老 Session 关联的数据不存在了,在 ConnectionStateListener 监听到 LOST 事件时,就可以依靠本地存储的数据恢复 Session 了。

这里 Session 指的是 ZooKeeper 服务器与客户端的会话。客户端启动的时候会与服务器建立一个 TCP 连接,从第一次连接建立开始,客户端会话的生命周期也开始了。客户端能够通过心跳检测与服务器保持有效的会话,也能够向 ZooKeeper 服务器发送请求并接受响应,同时还能够通过该连接接收来自服务器的 Watch 事件通知。

我们可以设置客户端会话的超时时间(sessionTimeout),当服务器压力太大、网络故障或是客户端主动断开连接等原因导致连接断开时,只要客户端在 sessionTimeout 规定的时间内能够重新连接到 ZooKeeper 集群中任意一个实例,那么之前创建的会话仍然有效。ZooKeeper 通过 sessionID 唯一标识 Session,所以在 ZooKeeper 集群中,sessionID 需要保证全局唯一。 由于 ZooKeeper 会将 Session 信息存放到硬盘中,即使节点重启,之前未过期的 Session 仍然会存在。

public class Main3 { 

    public static void main(String[] args) throws Exception { 

        // Zookeeper集群地址,多个节点地址可以用逗号分隔 

        String zkAddress = "127.0.0.1:2181"; 

        // 重试策略,如果连接不上ZooKeeper集群,会重试三次,重试间隔会递增 

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3); 

        // 创建Curator Client并启动,启动成功之后,就可以与Zookeeper进行交互了 

        CuratorFramework client = CuratorFrameworkFactory 

            .newClient(zkAddress, retryPolicy); 

        client.start(); 

        // 添加ConnectionStateListener监听器 

        client.getConnectionStateListenable().addListener( 

          new ConnectionStateListener() { 

            public void stateChanged(CuratorFramework client,

                    ConnectionState newState) { 

                // 这里我们可以针对不同的连接状态进行特殊的处理 

                switch (newState) {

                    case CONNECTED: 

                        // 第一次成功连接到ZooKeeper之后会进入该状态。 

                        // 对于每个CuratorFramework对象,此状态仅出现一次 

                        break; 

                    case SUSPENDED: //   ZooKeeper的连接丢失 

                        break; 

                    case RECONNECTED: // 丢失的连接被重新建立 

                        break; 

                    case LOST:

                        // 当Curator认为会话已经过期时,则进入此状态 

                        break; 

                    case READ_ONLY: // 连接进入只读模式 

                        break; 

                } 

            } 

        }); 

   } 

}

2.5 Watcher原理

Watcher 监听机制是 ZooKeeper 中非常重要的特性,可以监听某个节点上发生的特定事件,例如,监听节点数据变更、节点删除、子节点状态变更等事件。当相应事件发生时,ZooKeeper 会产生一个 Watcher 事件,并且发送到客户端。通过 Watcher 机制,就可以使用 ZooKeeper 实现分布式锁、集群管理等功能。

在 Curator 客户端中,我们可以使用 usingWatcher() 方法添加 Watcher,前面示例中,能够添加 Watcher 的有 checkExists()、getData()以及 getChildren() 三个方法,下面我们来看一个具体的示例:

public class Main4 { 

    public static void main(String[] args) throws Exception { 

        // Zookeeper集群地址,多个节点地址可以用逗号分隔 

        String zkAddress = "127.0.0.1:2181"; 

        // 重试策略,如果连接不上ZooKeeper集群,会重试三次,重试间隔会递增 

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3); 

        // 创建Curator Client并启动,启动成功之后,就可以与Zookeeper进行交互了 

        CuratorFramework client = CuratorFrameworkFactory 

              .newClient(zkAddress, retryPolicy); 

        client.start(); 

        try { 

           client.create().withMode(CreateMode.PERSISTENT) 

                 .forPath("/user", "test".getBytes()); 

        } catch (Exception e) { 

        } 

        // 这里通过usingWatcher()方法添加一个Watcher 

        List<String> children = client.getChildren().usingWatcher( 

          new CuratorWatcher() { 

            public void process(WatchedEvent event) throws Exception { 

                System.out.println(event.getType() + "," +

                    event.getPath()); 

            } 

        }).forPath("/user"); 

        System.out.println(children); 

        System.in.read(); 

    } 

}

2.6 Curator Cache原理

Apache Curator 引入了 Cache 来实现对 ZooKeeper 服务端事件的监听。Cache 是 Curator 中对事件监听的包装,其对事件的监听其实可以近似看作是一个本地缓存视图和远程ZooKeeper 视图的对比过程。同时,Curator 能够自动为开发人员处理反复注册监听,从而大大简化了代码的复杂程度。

实践中常用的 Cache 有三大类:

  • NodeCache。 对一个节点进行监听,监听事件包括指定节点的增删改操作。注意哦,NodeCache 不仅可以监听数据节点的内容变更,也能监听指定节点是否存在,如果原本节点不存在,那么 Cache 就会在节点被创建后触发 NodeCacheListener,删除操作亦然。
  • PathChildrenCache。 对指定节点的一级子节点进行监听,监听事件包括子节点的增删改操作,但是不对该节点的操作监听。
  • TreeCache。 综合 NodeCache 和 PathChildrenCache 的功能,是对指定节点以及其子节点进行监听,同时还可以设置监听的深度。

2.7 Cache 的基本使用

public class Main5 { 

    public static void main(String[] args) throws Exception { 

        // Zookeeper集群地址,多个节点地址可以用逗号分隔 

        String zkAddress = "127.0.0.1:2181"; 

        // 重试策略,如果连接不上ZooKeeper集群,会重试三次,重试间隔会递增 

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3); 

        // 创建Curator Client并启动,启动成功之后,就可以与Zookeeper进行交互了 

        CuratorFramework client = CuratorFrameworkFactory 

           .newClient(zkAddress, retryPolicy); 

        client.start(); 

        // 创建NodeCache,监听的是"/user"这个节点 

        NodeCache nodeCache = new NodeCache(client, "/user"); 

        // start()方法有个boolean类型的参数,默认是false。如果设置为true, 

        // 那么NodeCache在第一次启动的时候就会立刻从ZooKeeper上读取对应节点的 

        // 数据内容,并保存在Cache中。 

        nodeCache.start(true); 

        if (nodeCache.getCurrentData() != null) { 

            System.out.println("NodeCache节点初始化数据为:"

                + new String(nodeCache.getCurrentData().getData())); 

        } else { 

            System.out.println("NodeCache节点数据为空"); 

        } 

        // 添加监听器 

        nodeCache.getListenable().addListener(() -> { 

            String data = new String(nodeCache.getCurrentData().getData()); 

            System.out.println("NodeCache节点路径:" + nodeCache.getCurrentData().getPath() 

                    + ",节点数据为:" + data); 

        }); 

        // 创建PathChildrenCache实例,监听的是"user"这个节点 

        PathChildrenCache childrenCache = new PathChildrenCache(client, "/user", true); 

        // StartMode指定的初始化的模式 

        // NORMAL:普通异步初始化 

        // BUILD_INITIAL_CACHE:同步初始化 

        // POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件 

        childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); 

        // childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); 

        // childrenCache.start(PathChildrenCache.StartMode.NORMAL); 

        List<ChildData> children = childrenCache.getCurrentData(); 

        System.out.println("获取子节点列表:"); 

        // 如果是BUILD_INITIAL_CACHE可以获取这个数据,如果不是就不行 

        children.forEach(childData -> { 

            System.out.println(new String(childData.getData())); 

        }); 

        childrenCache.getListenable().addListener(((client1, event) -> { 

            System.out.println(LocalDateTime.now() + "  " + event.getType()); 

            if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) { 

                System.out.println("PathChildrenCache:子节点初始化成功..."); 

            } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) { 

                String path = event.getData().getPath(); 

                System.out.println("PathChildrenCache添加子节点:" + event.getData().getPath()); 

                System.out.println("PathChildrenCache子节点数据:" + new String(event.getData().getData())); 

            } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) { 

                System.out.println("PathChildrenCache删除子节点:" + event.getData().getPath()); 

            } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) { 

                System.out.println("PathChildrenCache修改子节点路径:" + event.getData().getPath()); 

                System.out.println("PathChildrenCache修改子节点数据:" + new String(event.getData().getData())); 

            } 

        })); 

        // 创建TreeCache实例监听"user"节点 

        TreeCache cache = TreeCache.newBuilder(client, "/user").setCacheData(false).build(); 

        cache.getListenable().addListener((c, event) -> { 

            if (event.getData() != null) { 

                System.out.println("TreeCache,type=" + event.getType() + " path=" + event.getData().getPath()); 

            } else { 

                System.out.println("TreeCache,type=" + event.getType()); 

            } 

        }); 

        cache.start(); 

        System.in.read(); 

    } 

}

2.8 curator-x-discovery 扩展库

为了避免 curator-framework 包过于膨胀,Curator 将很多其他解决方案都拆出来了,作为单独的一个包,例如:curator-recipes、curator-x-discovery、curator-x-rpc 等。

在后面我们会使用到 curator-x-discovery 来完成一个简易 RPC 框架的注册中心模块。curator-x-discovery 扩展包是一个服务发现的解决方案。在 ZooKeeper 中,我们可以使用临时节点实现一个服务注册机制。当服务启动后在 ZooKeeper 的指定 Path 下创建临时节点,服务断掉与 ZooKeeper 的会话之后,其相应的临时节点就会被删除。这个 curator-x-discovery 扩展包抽象了这种功能,并提供了一套简单的 API 来实现服务发现机制。curator-x-discovery 扩展包的核心概念如下:

  • ServiceInstance。 这是 curator-x-discovery 扩展包对服务实例的抽象,由 name、id、address、port 以及一个可选的 payload 属性构成。其存储在 ZooKeeper 中的方式如下图展示的这样。

  • ServiceProvider。 这是 curator-x-discovery 扩展包的核心组件之一,提供了多种不同策略的服务发现方式,具体策略有轮询调度、随机和黏性(总是选择相同的一个)。得到 ServiceProvider 对象之后,我们可以调用其 getInstance() 方法,按照指定策略获取 ServiceInstance 对象(即发现可用服务实例);还可以调用 getAllInstances() 方法,获取所有 ServiceInstance 对象(即获取全部可用服务实例)。
  • ServiceDiscovery。 这是 curator-x-discovery 扩展包的入口类。开始必须调用 start() 方法,当使用完成应该调用 close() 方法进行销毁。
  • ServiceCache。 如果程序中会频繁地查询 ServiceInstance 对象,我们可以添加 ServiceCache 缓存,ServiceCache 会在内存中缓存 ServiceInstance 实例的列表,并且添加相应的 Watcher 来同步更新缓存。查询 ServiceCache 的方式也是 getInstances() 方法。另外,ServiceCache 上还可以添加 Listener 来监听缓存变化。

 2.10 curator-x-discovery 包的使用

public class ZookeeperCoordinator { 

    private ServiceDiscovery<ServerInfo> serviceDiscovery; 

    private ServiceCache<ServerInfo> serviceCache; 

    private CuratorFramework client; 

    private String root; 

    // 这里的JsonInstanceSerializer是将ServerInfo序列化成Json 

    private InstanceSerializer serializer =

        new JsonInstanceSerializer<>(ServerInfo.class); 

    ZookeeperCoordinator(Config config) throws Exception { 

        this.root = config.getPath(); 

        // 创建Curator客户端 

        client = CuratorFrameworkFactory.newClient( 

            config.getHostPort(),  new ExponentialBackoffRetry(...)); 

        client.start(); // 启动Curator客户端

        client.blockUntilConnected();  // 阻塞当前线程,等待连接成功 

        // 创建ServiceDiscovery 

        serviceDiscovery = ServiceDiscoveryBuilder 

                .builder(ServerInfo.class) 

                .client(client) // 依赖Curator客户端 

                .basePath(root) // 管理的Zk路径 

                .watchInstances(true) // 当ServiceInstance加载 

                .serializer(serializer) 

                .build(); 

         serviceDiscovery.start(); // 启动ServiceDiscovery 

        // 创建ServiceCache,监Zookeeper相应节点的变化,也方便后续的读取 

        serviceCache = serviceDiscovery.serviceCacheBuilder() 

                .name(root) 

                .build();

         serviceCache.start(); // 启动ServiceCache 

    } 

    public void registerRemote(ServerInfo serverInfo)throws Exception{ 

         // 将ServerInfo对象转换成ServiceInstance对象 

         ServiceInstance<ServerInfo> thisInstance =

            ServiceInstance.<ServerInfo>builder() 

                    .name(root) 

                    .id(UUID.randomUUID().toString()) // 随机生成的UUID 

                    .address(serverInfo.getHost()) // host 

                    .port(serverInfo.getPort()) // port 

                    .payload(serverInfo) // payload 

                    .build(); 

         // 将ServiceInstance写入到Zookeeper中 

         serviceDiscovery.registerService(thisInstance); 

    } 

    public List<ServerInfo> queryRemoteNodes() { 

        List<ServerInfo> ServerInfoDetails = new ArrayList<>(); 

        // 查询 ServiceCache 获取全部的 ServiceInstance 对象 

        List<ServiceInstance<ServerInfo>> serviceInstances =

            serviceCache.getInstances(); 

        serviceInstances.forEach(serviceInstance -> { 

            // 从每个ServiceInstance对象的playload字段中反序列化得 

            // 到ServerInfo实例 

            ServerInfo instance = serviceInstance.getPayload(); 

            ServerInfoDetails.add(instance); 

        }); 

        return ServerInfoDetails; 

    } 

}

2.11 curator-recipes 简介

Recipes 是 Curator 对常见分布式场景的解决方案,这里我们只是简单介绍一下,具体的使用和原理,就先不做深入分析了。

  • Queues。提供了多种的分布式队列解决方法,比如:权重队列、延迟队列等。在生产环境中,很少将 ZooKeeper 用作分布式队列,只适合在压力非常小的情况下,才使用该解决方案,所以建议你要适度使用。
  • Counters。全局计数器是分布式系统中很常用的工具,curator-recipes 提供了 SharedCount、DistributedAtomicLong 等组件,帮助开发人员实现分布式计数器功能。
  • Locks。java.util.concurrent.locks 中提供的各种锁相信你已经有所了解了,在微服务架构中,分布式锁也是一项非常基础的服务组件,curator-recipes 提供了多种基于 ZooKeeper 实现的分布式锁,满足日常工作中对分布式锁的需求。
  • Barries。curator-recipes 提供的分布式栅栏可以实现多个服务之间协同工作,具体实现有 DistributedBarrier 和 DistributedDoubleBarrier。
  • Elections。实现的主要功能是在多个参与者中选举出 Leader,然后由 Leader 节点作为操作调度、任务监控或是队列消费的执行者。curator-recipes 给出的实现是 LeaderLatch。

博文参考

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐