由于ZK的watch一次性注册原因,以及client断开连接到重新连接上这一段时间差,可能导致zookeeper客户端不能够接收到完所有的ZK事件
不要强依赖于ZK的事件,要知道ZK事件可能丢失,也可能多个事件收到的数据是相同的(注意等幂性)。ZK能做到的是分布式数据一致性服务,可以保证最终一致性。在开发的时候要谨慎处理。
下面是测试以及原因分析

测试结果

  • 监听器
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkException;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher.Event.KeeperState;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class ZKClientListener {

    public static void main(String[] args) throws InterruptedException {

        ZkClient zkClient = new ZkClient("192.168.215.129:2181,192.168.215.130:2181,192.168.215.131:2181");
        final AtomicInteger count = new AtomicInteger(0);

        String testRootPath = "/lbl";

        zkClient.subscribeChildChanges(testRootPath, new IZkChildListener() {

            @Override
            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                System.out.println("org.I0Itec.zkclient.IZkChildListener.handleChildChange");
                System.out.println(parentPath);
                for (String string : currentChilds) {
                    System.out.println(string);
                }
            }
        });

        zkClient.subscribeDataChanges(testRootPath, new IZkDataListener() {

            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println("org.I0Itec.zkclient.IZkDataListener.handleDataDeleted");
                System.out.println(dataPath);
            }

            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
                int i = count.getAndIncrement();
                System.out.println(i + "  " + data);
            }
        });

        while (true) {
            TimeUnit.SECONDS.sleep(5);
        }
    }
}
  • 事件生产者
import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.CreateMode;

import java.util.concurrent.atomic.AtomicInteger;

public class EventProducer {

    public static void main(String[] args) {
        ZkClient zkClient = new ZkClient("192.168.215.129:2181,192.168.215.130:2181,192.168.215.131:2181");

        String testRootPath = "/lbl";

        try {
            zkClient.create(testRootPath, 1, CreateMode.PERSISTENT);

            //写入、读取数据
//          Object readData = zkClient.readData(testRootPath);
//          System.out.println(readData);
//          zkClient.writeData(testRootPath, 2);

            for (int i = 0 ; i < 100; i ++){
                zkClient.writeData(testRootPath, i);
            }
        } finally {
            zkClient.close();
        }

    }
}

这里写图片描述
变更前后数据对比
这里写图片描述

可见1,2两次数据变更的事件丢失了,而且最后收到的几次事件数据都是相同的
这里写图片描述
这里写图片描述

原因分析

所 有的Zookeeper读操作,包括getData()、getChildren()和exists(),都有一个开关,可以在操作的同时再设置一个 watch。在ZooKeeper中,Watch是一个一次性触发器,会在被设置watch的数据发生变化的时候,发送给设置watch的客户端。 watch的定义中有三个关键点:

  • 一次性触发器

    一 个watch事件将会在数据发生变更时发送给客户端。例如,如果客户端执行操作getData(“/znode1″, true),而后 /znode1 发生变更或是删除了,客户端都会得到一个 /znode1 的watch事件。如果 /znode1 再次发生变更,则在客户端没有设置新的watch的情况下,是不会再给这个客户端发送watch事件的。

  • 发送给客户端

    这 就是说,一个事件会发送向客户端,但可能在在操作成功的返回值到达发起变动的客户端之前,这个事件还没有送达watch的客户端。Watch是异步发送 的。但ZooKeeper保证了一个顺序:一个客户端在收到watch事件之前,一定不会看到它设置过watch的值的变动。网络时延和其他因素可能会导 致不同的客户端看到watch和更新返回值的时间不同。但关键点是,每个客户端所看到的每件事都是有顺序的。

  • 被设置了watch的数据

    这 是指节点发生变动的不同方式。你可以认为ZooKeeper维护了两个watch列表:data watch和child watch。getData()和exists()设置data watch,而getChildren()设置child watch。或者,可以认为watch是根据返回值设置的。getData()和exists()返回节点本身的信息,而getChildren()返回 子节点的列表。因此,setData()会触发znode上设置的data watch(如果set成功的话)。一个成功的 create() 操作会触发被创建的znode上的数据watch,以及其父节点上的child watch。而一个成功的 delete()操作将会同时触发一个znode的data watch和child watch(因为这样就没有子节点了),同时也会触发其父节点的child watch。

Watch 由client连接上的ZooKeeper服务器在本地维护。这样可以减小设置、维护和分发watch的开销。当一个客户端连接到一个新的服务器上 时,watch将会被以任意会话事件触发。当与一个服务器失去连接的时候,是无法接收到watch的。而当client重新连接时,如果需要的话,所有先 前注册过的watch,都会被重新注册。通常这是完全透明的。只有在一个特殊情况下,watch可能会丢失:对于一个未创建的znode的exist watch,如果在客户端断开连接期间被创建了,并且随后在客户端连接上之前又删除了,这种情况下,这个watch事件可能会被丢失。

ZooKeeper对Watch提供了什么保障

对于watch,ZooKeeper提供了这些保障:

  • Watch与其他事件、其他watch以及异步回复都是有序的。 ZooKeeper客户端库保证所有事件都会按顺序分发。

  • 客户端会保障它在看到相应的znode的新数据之前接收到watch事件。//这保证了在process()再次利用zk client访问时数据是存在的

  • 从ZooKeeper接收到的watch事件顺序一定和ZooKeeper服务所看到的事件顺序是一致的。

关于Watch的一些值得注意的事情

  • Watch是一次性触发器,如果你得到了一个watch事件,而你希望在以后发生变更时继续得到通知,你应该再设置一个watch,当然ZKClient已经帮你实现了这个事情。

  • 因 为watch是一次性触发器,而获得事件再发送一个新的设置watch的请求这一过程会有延时,所以你无法确保你看到了所有发生在ZooKeeper上的 一个节点上的事件。所以请处理好在这个时间窗口中可能会发生多次znode变更的这种情况。(你可以不处理,但至少请认识到这一点)。//也就是说,在process()中如果处理得慢而没有注册new watch时,在这期间有其它事件出现时是不会通知!!之前可能就是没有意识到这点所以才引出本话题*

  • 一个watch对象或一个函数/上下文对,为一个事件只会被通知一次。比如,如果同一个watch对象在同一个文件上分别通过exists和getData注册了两次,而这个文件之后被删除了,这时这个watch对象将只会收到一次该文件的deletion通知。//同一个watch注册同一个节点多次只会生成一个event.这里我想到如果一个watch注册不同的node,也应当出现多个event?

  • 当你从一个服务器上断开时(比如服务器出故障了),在再次连接上之前,你将无法获得任何watch。请使用这些会话事件来进入安全模式:在disconnected状态下你将不会收到事件,所以你的程序在此期间应该谨慎行事

  • 可以配合定时刷新机制来尽可能保证获取到ZK最新的数据

参考资料

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐