zookeeper集群节点上注册监听器watch详解(二)
1.前面一篇文章讲到了在Linux环境上搭建集群 建立4个节点,zookeeper有一个十分重要的功能是注册监听器,通过注册监听器,当zookeeper节点发生变化时,zookeeper会主动通知客户端,从而实现一些功能。好比如当一台服务器启动的时候,我们在zookeeper上创建一个临时节点。通过监听这些临时节点,我们就可以知道目前有多少台服务器在线。当服务器关掉时zookeeper也会主动通知
1.前面一篇文章讲到了在Linux环境上搭建集群 建立4个节点,zookeeper有一个十分重要的功能是注册监听器,通过注册监听器,当zookeeper节点发生变化时,zookeeper会主动通知客户端,从而实现一些功能。好比如当一台服务器启动的时候,我们在zookeeper上创建一个临时节点。通过监听这些临时节点,我们就可以知道目前有多少台服务器在线。当服务器关掉时zookeeper也会主动通知我们,这样我们就相当于实时了解当前服务器在线情况,方便协调服务器访问,监听节点的数据变化事件包括:
1.1、节点被创建;
1.2、节点上写入数据;
1.3、节点数据变化;
1.4、节点数据被删除;
1.5、节点本身被删除
2.下面看代码示例,首先添加maven依赖包,并启动好zookeeper:
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
3.连接zk的ClusterCoordinate.java方法如下:
import Cloud.Base.Error; import Cloud.Base.Loger; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.util.*; /** * zk集群协调 */ final public class ClusterCoordinate { /** 校验码,防止节点被人为篡改 **/ private static final String Data = "7fd9d6ff5695959e7f7690017892ee06"; /** 持久性父节点*/ protected String ClusterName; /** 临时节点*/ protected String MyNodeName; /** 集群连接IP端口*/ protected String ConnectionString; /** 连接时间*/ protected int ConnectTimeout = 5000; /** 节点连接集合对象*/ protected ClusterCoordinateListener Listener; private ZooKeeper zk; private List<String> Children; public ClusterCoordinate(String ClusterName, String MyNodeName, String ConnectionString, int ConnectTimeout, ClusterCoordinateListener Listener) { this.ClusterName = ClusterName; this.MyNodeName = MyNodeName; this.ConnectionString = ConnectionString; //保证最低超时时间为默认值 if(ConnectTimeout > this.ConnectTimeout) { this.ConnectTimeout = ConnectTimeout; } this.Listener = Listener; } /** * 连接 * * 获取事件状态(与客户端连接状态相关) * KeeperState:Disconneced 连接失败 * KeeperState:SyncConnected 连接成功 * KeeperState:AuthFailed 认证失败 * KeeperState:Expired 会话过期 * 获取事件类型(与zknode相关) * EventType:NodeCreated 节点创建 * EventType:NodeDataChanged 节点的数据变更 * EventType:NodeChildrentChanged 子节点下的数据变更 * EventType:NodeDeleted 节点删除 * EventType:None 刚连接什么都没做 * @return */ public int Connect() { try { //连接 zk = new ZooKeeper(ConnectionString, ConnectTimeout, new Watcher() { @Override public void process(WatchedEvent Event) { Event.KeeperState State = Event.getState(); //System.out.println("B = " + JSON.toJSONString(Event)); //连接断开(无法选举) if(State == Watcher.Event.KeeperState.Disconnected) { //连接异常,0节点通知 Listener.OnClusterChange(new ArrayList<>()); } try { List<String> children = zk.getChildren("/"+ClusterName, true); //System.out.println("A = " + JSON.toJSONString(Event)); byte[] NodeDataByte = zk.getData("/" + ClusterName + "/" + MyNodeName, true, new Stat()); String NodeData = new String(NodeDataByte); if(Data.equals(NodeData) && IsSame(children) == false) { Listener.OnClusterChange(children); //System.out.println("当前节点发生改变======================"+children+"=======变化节点================="+path); } Children = children; } catch (KeeperException | InterruptedException e) { //此处不做处理,否则通知可能重复 } } }); //判断父节点是否存在 try { Stat ExistsParents = this.zk.exists("/" + ClusterName, true); if(null == ExistsParents) { //PERSISTENT 持久节点 this.zk.create("/"+ClusterName, Data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } //判断子节点是否存在 Stat ExistsChild = this.zk.exists("/" + ClusterName + "/" + MyNodeName, true); //如果子节点存在抛异常 if(null != ExistsChild) { //尽可能等待存在的节点被删除后再做处理,否则zk节点状态和通知会出问题 Thread.sleep(4000); ExistsChild = this.zk.exists("/" + ClusterName + "/" + MyNodeName, true); if(ExistsChild != null) { Loger.Log.error("Zookeeper子节点已存在 node = " + "/" + ClusterName + "/" + MyNodeName); return Error.ZK_CHILD_NODE_ALREADY_EXISTS; } } //EPHEMERAL 临时节点 this.zk.create("/"+ClusterName+"/"+MyNodeName, Data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } catch (Exception Ex) { //连接异常,0节点通知 Listener.OnClusterChange(new ArrayList<>()); } } catch (Exception Ex) { //启动时异常,0节点通知 Listener.OnClusterChange(new ArrayList<>()); } return Error.SUCCESS; } private boolean IsSame(List<String> Children) { if(this.Children == null) { return false; } if(Children == null || Children.size() == 0) { return false; } for (String Child1 : Children) { if(this.Children.contains(Child1) == false) { return false; } } for (String Child2 : this.Children) { if(Children.contains(Child2) == false) { return false; } } return true; } }
以上代码是基于ZooKeeper的事件监听----Watch机制 Watcher 监听机制是 Zookeeper 中非常重要的特性,我们基于 zookeeper 上创建的节点,可以对这些节点绑定监听事件。比如可以监听节点数据变更、节点删除、子节点状态变更等事件,通过这个事件机制,可以实现:基于 zookeeper 实现分布式锁、集群管理等功能
Watcher 特性:当数据发生变化的时候, zookeeper 会产生一个 watcher 事件,并且会发送到客户端。但是客户端只会收到一次通知。如果后续这个节点再次发生变化,那么之前设置 watcher 的客户端不会再次收到消息(watcher 是一次性的操作),可以通过循环监听去达到永久监听效果
以上代码回调中zk.getChildren("/"+ClusterName, true);方法起到持续监听节点的作用,如果此处不查询只能监听一次
4.最后写一个main方法进行测试:
public static void main(String[] args) throws Exception
{
ClusterCoordinate c = new ClusterCoordinate("Test", "node1", "192.168.71.39:2881,192.168.71.39:2882,192.168.71.39:2883,192.168.71.39:2884", 3000, new ClusterCoordinateListener()
{
@Override
public void OnClusterChange(List<String> AliveNode)
{
//TODO 此处是监听后的节点 具体的处理业务可以在这里写 故以上写了个实现ClusterCoordinateListener 接口
System.out.println(AliveNode);
}
});
c. Connect();
Thread.sleep(20000000);
}
5.ClusterCoordinateListener.java接口
import java.util.List;
/**
* @author 何志鹏
* @version 1.0
* @date 2021/11/13 14:37
*/
public interface ClusterCoordinateListener
{
public int OnClusterChange(List<String> AliveNode);
}
6.最后看一下测试结果 当全部把服务器集群全部停掉后 立马就会返回DisConnected状态,然后会持续的进行重试连接 当服务器重启之后 就会连接恢复数据 当干掉其中一台时 首先会抛出异常无法连接当前父节点,随后会进行重新选举 然后监听其它的节点:
更多推荐
所有评论(0)