搜了一些Zookeeper的相关书籍和博客,但好多基本都是讲Zookeeper的架构、用途,尤其分布式中的应用讲的真的是天花乱坠,但看完还是不会写代码,搞得自己理论丰富的一批,实践完全懵逼。对于Zookeeper的Java客户端API使用,基本没有涉及或者讲清楚,要么就是讲的很模糊。果然还是得自己来,通过Zookeeper的API来学习一下Zookeeper的功能,然后依据这些功能去思考怎么用?为什么用?逐渐摸索Zookeeper的应用。

1. 环境准备

1. 安装jdk,这就不细说了。

2.安装Zookeeper集群,网上很多详细教程,但还是简单贴出来自己的配置文件。可以直接复制。只是其中一个Zookeeper节点的配置文件,其余两个仅仅改动一下端口、dataDir以及dataLogDir的路径即可。

# 服务器与客户端之间交互的基本时间单元(ms) 
tickTime=2000   
#zookeeper集群中的包含多台server, 其中一台为leader, 集群中其余的server为follower. initLimit参数配置初始化连接时, follower和leader之间的最长心跳时间. 此时该参数设置为10, #说明时间限制为10倍tickTime.
initLimit=10  
# 该参数配置leader和follower之间发送消息, 请求和应答的最大时间长度. 此时该参数设置为5, 说明时间限制为5倍tickTime. 
syncLimit=5
# zookeeper中使用的基本时间单位, 毫秒值.
tickTime=2000
#存储内存中数据库快照的位置
dataDir=/usr/local/zookeeper-cluster/zookeeper-2181/data
#存储日志的目录,如果没有设置该参数, 将使用和#dataDir相同的设置.
dataLogDir=/usr/local/zookeeper-cluster/zookeeper-2181/logs 
# 用于监听客户端连接的端口,或者说客户端请求连接的端口
clientPort=2181

maxClientCnxns=60
#这一段很重要,集群配置的关键,server.后面跟的数字是标识某台Zookeeper的关键,后续配置集群会使用
server.1=127.0.0.1:2222:2225
server.2=127.0.0.1:3333:3335
server.3=127.0.0.1:4444:4445

#autopurge.snapRetainCount=3

#autopurge.purgeInterval=1

3. 引入jar包,这里推荐使用Apache的java客户端。

		<dependency>
			<groupId>org.apache.zookeeper</groupId>
			<artifactId>zookeeper</artifactId>
			<version>3.5.7</version>
		</dependency>

2.API使用

1. 首先,创建Zookeeper客户端对象。很简单,直接new一个就行,但主要是注意构造方法的参数。Zookeeper构造方法重载版本比较多,必须了解其中的每个参数代表的含义。

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly, HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException 

(1)String connectString :Zookeeper集群的每个节点的IP地址和端口号,用逗号分隔。例如

String zkNodes = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";

(2)int sessionTimeout:会话超时时间,单位是毫秒,在sessionTimeout时间之内,客户端会和服务端直接通过server发送PING请求来保持会话的有效性,俗称“心跳检测”,同时server重新激活client对应的会话。

Session是指当Client创建一个同Server的连接时产生的会话。连接Connected之后Session状态就开启,Zookeeper服务器和Client采用长连接方式(Client会不停地向Server发送心跳)保证session在不出现网络问题、服务器宕机或Client宕机情况下可以一直存在。因此,在正常情况下,session会一直有效,并且ZK集群上所有机器都会保存这个Session信息。

如果超出sessionTimeout的时间服务端仍未接收到客户端的心跳检测请求,那么服务端就会将客户端看做下线状态,会将存储的session给删除。在ZK中,很多数据和状态都是和会话绑定的,一旦会话失效,那么ZK就开始清除和这个会话有关的信息,包括这个会话创建的临时节点和注册的所有Watcher。

但还有另一种情况就是,客户端正常,但是当前会话所连接的Zookeeper集群节点宕机或者其他原因心跳检测失败,也就是无法ping,ZK Client会马上捕获到这个异常,封装为一个ConnectionLoss的事件,然后启动自动重连机制在地址列表中选择新的地址进行重连。重连会有三种结果:

  • 在session timeout时间内重连成功,client会重新收到一个syncconnected的event,并将连接重新持久化为connected状态
  • 超过session timeout时间段后重连成功,client会收到一个expired的event,并将连接持久化为closed状态
  • 一直重连不上,client将不会收到任何event

(3)Watcher watcher:这是Zookeeper中非常重要的一个特征,由于客户端与Zookeeper之间的连接是采用长连接,所以,可以通过客户端在Zookeeper上注册一个事件监听器,也就是Watcher对象,当Zookeeper中发生某一个事件时就会回调该Watcher对象中的方法。这个Watcher对象的作用非常重要,这个事件监听器将会一直存在,直到Zookeeper客户端关闭连接。

(4)long sessionId:每一个会话session的建立,Zookeeper都会为自动该会话分配一个全局唯一的会话id,所以一般该id不会由我们指定,一般都不会指定传递该参数。

(5)boolean canBeReadOnly:是否提供只读服务(不提供写服务)。

(6)HostProvider aHostProvider:随机提供host进行连接。没啥用,默认的即可。

(7)ZKClientConfig clientConfig:连接参数配置,

(8)byte[] sessionPasswd:提供连接zookeeper的sessionId和密码,通过这两个确定唯一一台客户端,目的是可以提供重复会话。

实际上,真正必须的参数也就是connectString、sessionTimeout和watcher,其余的默认即可。所以Zookeeper也提供了这三个参数的重载版本,也是最常用的构造方法。

注意:zookeeper客户端和服务器端会话的建立是一个异步的过程,也就是说在程序中,我们程序方法在处理完客户端初始化后,立即返回(程序往下执行代码,这样,大多数情况下我们并没有真正构建好一个可用会话,在会话的声明周期处于"CONNECTING"时才算真正建立完毕) 

创建Zookeeper客户端并建立连接的示例代码如下:

    public static void createZookeeperClient() throws IOException, InterruptedException {
        /*
        由于Zookeeper客户端对象的建立和连接是异步执行,所以很有可能会因为Zookeeper对象尚未建立连接,
        主线程就继续执行导致的执行报错,这一点可以通过一些线程同步类辅助,保证Zookeeper连接完全建立成功后在进行后续操作
         */
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        ZooKeeper zooKeeper = new ZooKeeper(zkNodes, 5000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                //已建立连接
                if (watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) {
                    countDownLatch.countDown();
                }
            }
        });
        countDownLatch.await();
        //do something...
    }

2. Watcher对象的使用:Watcher可以实现对Zookeeper的监控,这个特性是来自于Zookeeper客户端和服务端采用TCP长连接的方式进行通信。当Zookeeper发生某个事件时,会通过Watcher对象进行回调通知。Watcher是一个接口,所以必须要实现一个具体实现类,仅有一个接口方法process(),该方法的参数WatchedEvent就是Zookeeper服务端发生的事件,其中包含了该事件的所有信息。客户端在向zk服务器注册Watcher的同时,会将Watcher对象存储在客户端的WatchManager中。当zk服务端触发Watcher事件后,会向客户端发送通知,客户端线程从WatcherManager中取出对应的Watcher对象。来执行回调逻辑。

public class WatcherTest1 implements Watcher {
    @Override
    public void process(WatchedEvent watchedEvent) {
        
    }
}

对于WatchedEvent,其源码比较简单,其中只包含了三个主要的域变量,分别是EventType eventType, KeeperState keeperState, String path。eventType表示事件类型,keeperState表示连接状态,path就表示事件的发生在哪个数据节点路径上发生的。

public class WatchedEvent {
    private final KeeperState keeperState;
    private final EventType eventType;
    private String path;

   ····
}

EventType和KeeperState都是枚举类型

        public static enum EventType {
            None(-1),//无此节点
            NodeCreated(1),//节点已创建成功
            NodeDeleted(2),//节点已删除成功
            NodeDataChanged(3),//节点数据改变
            NodeChildrenChanged(4),//子节点被创建、被删除会发生事件触发
            DataWatchRemoved(5),//数据监视已被移除
            ChildWatchRemoved(6);//子节点监视已被移除

            private final int intValue;
          ....
        }
        public static enum KeeperState {
            
            
            Unknown(-1),//从3.1.0版本开始被废弃
            Disconnected(0),//客户端和服务器处于断开连接状态
            NoSyncConnected(1),//从3.1.0版本开始被废弃
            SyncConnected(3),//客户端和服务器处于连接状态
            AuthFailed(4),//权限验证失败状态,通常同时也会收到AuthFailedException
            ConnectedReadOnly(5),//只读连接
            SaslAuthenticated(6),//权限验证通过
            Expired(-112),//此时客户端会话失效,通常同时也会收到SessionExpiredException
            Closed(7);//连接资源关闭

            ....
        }

Watcher特性

 一次性

无论是服务端还是客户端,一旦一个 Watcher 被触发,ZooKeeper 都将其从相应的存储中移除。因此,开发人员在 Watcher 的使用上要记住的一点是需要反复注册。例如,如果客户端执行 getData("/znode1",true),后面对 /znode1 的更改或删除,客户端都会获得 /znode1 的监控事件通知。如果 /znode1 再次更改,如果客户端没有执行新一次设置新监视点的读取,是不会发送监视事件通知的。但是,有一个监听器对象是例外,在创建Zookeeper对象时,调用的构造方法中必须传递的Watcher对象就是和Zookeeper对象具有相同的生命周期,直到Zookeeper客户端关闭连接,该Watcher才会关闭对服务端的监听。

客户端串行执行

客户端Watcher回调的过程是一个串行同步的过程,这为我们保证了顺序,同时,需要开发人员注意的一点是,千万不要因为一个Watcher的处理逻辑影响了整个客户端的Watcher回调。

轻量

WatchedEvent 是 ZooKeeper 整个 Watcher 通知机制的最小通知单元,这个数据结构中只包含三部分内容:通知状态、事件类型和节点路径。也就是说,Watcher 通知非常简单,只会告诉客户端发生了事件,而不会说明事件的具体内容。

另外,客户端向服务端注册 Watcher 的时候,并不会把客户端真实的 Watcher 对象传递给服务端,仅仅只是在客户端请求中使用 boolean 类型属性进行了标记,同时服务端也仅仅只是保存了当前连接的 ServerCnxn 对象。如此轻量的Watcher机制设计,在网络开销和服务端内存开销上都是非常廉价的。

3. 创建节点:Zookeeper的Java客户端对于数据操作(增删改查)都有同步和异步两种实现方式。同步操作一般会有返回值,并且会抛出相应的异常。异步操作没有返回值,也不会抛出异常。此外异步方法参数在同步方法参数的基础上,会增加Callback和context两个参数。

同步创建节点:全参数版本

    public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode, Stat stat, long ttl)

(1)path指的就是数据节点的路径。

(2)data就是要存储的字符串的字节数组形式。(不支持序列化方式,如果需要实现序列化,可使用java相关的序列化框架,如Hession)

(3)acl指节点权限,统一使用Ids.OPEN_ACL_UNSAFE权限即可(一般在权限没有太高要求的场景下,没必要关注)

(4)createMode节点类型,创建节点的类型:CreateMode.* 提供四种节点类型,分别是PERSISTENT (持久节点)、PERSISTENT_SEQUENTIAL(持久顺序节点)、EPHEMRAL(临时节点)、EPHEMRAL_SEQUENTIAL(临时顺序节点)

(5)stat节点状态信息:在创建节点时可以手动指定节点的状态信息,但一般无需传入该参数。(非必须参数)

(6)ttl:过期时间,如果该节点在ttl时间之内未发生变动,就会被删除。(非必须参数)

所以常用的同步创建节点方法的重载版本如下

public String create(String path, byte[] data, List<ACL> acl, CreateMode createMode)

示例代码如下

public static void createNode() throws IOException, KeeperException, InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        ZooKeeper zooKeeper = new ZooKeeper(zkNodes, 5000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                //已建立连接
                if (watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) {
                    countDownLatch.countDown();
                }
            }
        });
        countDownLatch.await();
        //节点权限设置
        ACL acl = new ACL(ZooDefs.Perms.ALL,ZooDefs.Ids.ANYONE_ID_UNSAFE);
        List<ACL> acls = new ArrayList<ACL>();
        acls.add(acl);
        //创建节点
        zooKeeper.create("/demo", "helloworld".getBytes(), acls, CreateMode.PERSISTENT);

        System.out.println("over");
    }

异步创建节点:


public void create(String path, byte[] data, List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx) 

除了create同步方法中的四个必须参数以外,异步模式的create方法还增加了callback和context两个参数。这个callback中的processResult方法会在节点创建好之后被调用,它有四个参数。第一个是int类型的resultCode,为服务端响应码, 0表示调用成功,-4表示端口连接,-110表示指定节点存在,-112表示会话已经过期。第二个参数是创建节点的路径。第三个参数是context,当一个StringCallback类型对象作为多个create方法的参数时,这个参数就很有用。第四个参数是创建节点的名字,其实与path参数相同。异步的方法不会抛出异常,而是会在回调StringCallback中处理所有的事件。

    public interface StringCallback extends AsyncCallback {
        void processResult(int var1, String var2, Object var3, String var4);
    }

示例代码

    public static void createNode() throws IOException, KeeperException, InterruptedException {

        //创建连接对象省略


        final CountDownLatch countDownLatch = new CountDownLatch(1);
        //节点权限设置
        ACL acl = new ACL(ZooDefs.Perms.ALL,ZooDefs.Ids.ANYONE_ID_UNSAFE);
        List<ACL> acls = new ArrayList<ACL>();
        acls.add(acl);

        zk.create("/demo", "helloworld".getBytes(), acls, CreateMode.PERSISTENT,new IStringCallBack(), countDownLatch);
        countDownLatch.await();
        System.out.println("over");
    }

    static class IStringCallBack implements AsyncCallback.StringCallback {

        @Override
        public void processResult(int rc, String path, Object ctx, String name) {
            switch (KeeperException.Code.get(rc)) {
                case CONNECTIONLOSS:
                    System.out.println("CONNECTIONLOSS");
                    break;
                case OK:
                    System.out.println("OK - {" + path + ", " + name + ", " + ctx + "}");
                    CountDownLatch countDownLatch = (CountDownLatch) ctx;
                    countDownLatch.countDown();
                    break;
                case NODEEXISTS:
                    System.out.println(path + "exists");
                    break;
                default:
                    System.out.println("DEFAULT");
                    break;
            }
        }

    }

4. 删除节点:和创建节点类似,分同步和异步。

(1)同步

public void delete(String path, int version)

参数path,表示节点路径

参数version,表示版本号,即表示本次删除操作是针对该数据的某个版本进行操作。只有当version参数的值与节点状态信息中的dataVersion值相等时,数据修改才能成功,否则会抛出BadVersion异常。这是为了防止丢失数据的更新,在ZooKeeper提供的API中,所有的对已有节点的写数据操作都有version参数。

(2)异步

public void delete(String path, int version, VoidCallback cb, Object ctx)

5. 修改节点数据

(1)同步

public Stat setData(String path, byte[] data, int version)

(2)异步

public void setData(String path, byte[] data, int version, StatCallback cb, Object ctx)

参数path:表示数据节点路径

参数data:表示要设置的数据

参数version:表示修改数据版本

6. 获取节点的数据

(1)同步:

public byte[] getData(String path, boolean watch, Stat stat)

zooKeeper.getData方法的返回值就是节点中存储的数据值,它有三个参数,第一个参数是节点的路径,用于表示要获取哪个节点中的数据。第三个参数stat用于存储节点的状态信息,在调用getData方法前,会先构造一个空的Stat类型对象作为参数传给getData方法,当getData方法调用返回后,节点的状态信息会被填充到stat对象中。

private void getDataSync() throws KeeperException, InterruptedException {
    Stat stat = new Stat();
    // getData的返回值是该节点的数据值,节点的状态信息会赋值给stat对象
    byte[] data = zooKeeper.getData("/node_1",true, stat);
    System.out.println(new String(data));
    System.out.println(stat);
}

第二个参数是一个bool类型的watch,这个参数比较重要。当watch为true时,表示我们想要监控这个节点的数据变化,而使用的监听器对象就是我们在创建Zookeeper客户端时指定的那个监听器对象,这个boolean参数时,false表示不对该节点监控。当节点的数据发生变化时,我们就可以拿到zk服务器推送给我们的通知。

第二个参数,我们还可以使用自定义的Watcher对象,但这种监控只能生效一次,这是getData方法的另一个重载版本

public byte[] getData(String path, Watcher watcher, Stat stat)

(2)异步

public void getData(String path, Watcher watcher, DataCallback cb, Object ctx)
public void getData(String path, boolean watch, DataCallback cb, Object ctx)
private void getDataAsync() {
        zooKeeper.getData("/node", true, new AsyncCallback.DataCallback() {
            public void processResult(int resultCode, String path, Object ctx, byte[] data, Stat stat) {
                System.out.println(resultCode);
                System.out.println(path);
                System.out.println(ctx);
                System.out.println(new String(data));//data就是获取到的数据
                System.out.println(stat);
            }
        }, "异步获取节点的数据");
    }

 

 

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐