1 ZK的原生API

1.1 java操作原生API

1.1.1 引入依赖

使用java操作zookeeper,引入maven坐标

<dependency>
	<groupId>org.apache.zookeeper</groupId>
	<artifactId>zookeeper</artifactId>
	<version>3.5.5</version>
</dependency>

1.1.2 创建会话

创建会话方法:客户端通过创建一个zookeeper实例来链接zookeeper服务器
Zookeeper提供了4个构造方法,根据所提供的参数不同
参数说明如下:

  • connectString:连接服务器列表,如果有多个则用分隔
  • sessionTimeout:心跳检测时间周期(毫秒
  • wather:事件处理通知器
  • canBeReadOnly:标识当前会话是否支持只读
  • sessionId和sessionPassword:提供连接zookeepersession和密码,通过这两个确定唯一一台客户端,目的可以提供重复会话

注意zookeeper客户端和服务器端会话的建立是一个异步过程,也就是说在程序中,我们程序方法在处理完客户端初始化后立即返回(也就是说程序往下执行代码,这样,大多数情况下我们并没有真正构建一个可用会话,在会话的生命周期处于CONNECTING时才算真正建立完毕,所以我们需要使用多线程中学习的一个小工具类)

public class ZooKeeperDemo{
	//zk 连接地址
	private static final String CONNECT_ADDR="192.168.126.130:2181";
	//session 超时时间
	private static final Integer SESSION_OUTTIME=5000;
	//阻塞程序执行,用于等待zookeeper连接成功,发送成功信号
	private static CountDownLatch countDown = new CountDownLatch(1);
	public static void main(String[] args) throws Exception{
		//zk 连接
		ZooKeeper zk = new ZooKeeper(CONNECT_ADDR,SESSION_OUTTIME,new Watcher(){
			@Override
			public void process(WatchedEvent event){				
					Event.KeeperState state = event.getState();
					Event.EventType type = event.getType();
					//如果建立了安全连接
					if(Event.KeeperState.SyncConnected==state){
						if(Event.EventType.None==type){
							//如果连接建立成功,则发送信号
							countDown.countDown();
							System.out.println("zk 建立连接");
						}
					}			
			}
		});
		//进行阻塞
		countDown.await();
		System.out.println("zk 执行了");
		//关闭会话
		zk.close();
	}
}

1.1.3 节点的CRUD

1.1.3.1 创建节点

创建节点方法:create
有两种创建节点的方法:同步异步创建节点方式
同步方式:

  • 参数1,节点路径(名称):/nodeName(不允许递归创建节点,也就是说在父节点不存在的情况下,不允许创建子节点)
  • 参数2,节点内容: 要求类型是字节数组
  • 参数3,节点权限:使用Ids.OPEN_ACL_UNSAFE开发权限即可。(这个参数一般在权限没有太高要求的场景下,没必要关注)
  • 参数4,节点类型:创建节点的类型:CreateMode.*,提供以下几种节点类型:
    PERSISENT:持久节点
    PERSISENT_SEQUENTIAL:持久顺序节点
    EPHEMERAL:临时节点
    EPHEMERAL_SEQUENTIAL:临时顺序节点
    CONTAINER:容器节点,用于LeaderLock等特殊用途,当容器节点不存在任何子节点时,容器将成为服务器在将来某个时候删除的候选节点
    PERSISTENT_WITH_TTL:带TTLtime-to-live,存活时间)的永久节点,节点在TTL时间之内没有得到更新并且没有孩子节点,就会被自动删除
    PERSISTENT_SEQUENTIAL_WITH_TTL:带TTLtime-to-live,存活时间)和单调递增序号的永久节点,节点在TTL时间之内没有得到更新并且没有孩子节点,就会被自动删除
//创建父节点,此处的zk是上一步的会话产生的
String ret = zk.create("/testRoot","testRoot".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISENT);
System.out.println(ret);

注意:临时节点效率很高,主要用来作为分布式锁

异步方式:(在同步方式参数基础上增加俩参数)

  • 参数5,注册一个异步回调函数,要实现AsynCallBack.VoidCallBack接口,重写processResult(int rc,String path,Obejct ctx,String name)方法,当节点创建完毕后执行此方法
    rc:为服务端响应码,0表示调用成功,-4表示端口连接,-110表示指定节点存在,-112表示会话已经过期
    path:接口调用时传入API的数据节点的路径参数
    ctx:为调用接口传入APIctx的值
    name:实际在服务器端创建节点的名称
  • 参数6,传递给回调函数的参数,一般为上下文(context)信息
//此处的 -1是跳过版本限制 dataVersion ,在删除时如果传入的版本号不是-1 与现有版本号一致,那么可以删除
zk.delete("/testRoot",-1,new AsynCallBack.VoidCallBack(){
	@Override
	public void processResult(int rc,String path,Object ctx){
		System.out.println(rc);
		System.out.println(path);
		System.out.println(ctx);	
	}
},"a");
1.1.3.2 修改节点
//此处的-1 也是跳过版本号的检查
zk.setData("/testRoot","modify data root".getBytes(),-1);
1.1.3.3 判断是否存在

判断节点是否存在

//  参数一:路径,参数二:是否watch监听
zk.exists("/testRoot",false);
1.1.3.4 获取节点值
byte[] data = zk.getData("/testRoot", false, null);
System.out.println(new String(data));
System.out.println(zk.getChildren("/testRoot", false));
1.1.3.5 删除节点
//删除节点
zk.delete("/testRoot/children", -1);
System.out.println(zk.exists("/testRoot/children", false));

2 zkClient的API

2.1 引入pom依赖

<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>

2.2 操作客户端

2.2.1 创建客户端

/** zookeeper地址 */
	static final String CONNECT_ADDR = "192.168.1.171:2181,192.168.1.172:2181,192.168.1.173:2181";
	/** session超时时间 */
	static final int SESSION_OUTTIME = 5000;//ms 
		
	public static void main(String[] args) throws Exception {
		ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), 5000);
		
	}

2.2.2 增加和删除节点

可以递归增加和删除节点

创建节点时,那个true是否创建父节点

//1. create and delete方法 
		zkc.createEphemeral("/temp");
		//第二个参数createParents==true,表示父节点不存在时创建父节点(递归创建)
		zkc.createPersistent("/super/c1", true);
		Thread.sleep(10000);
		zkc.delete("/temp");
		zkc.deleteRecursive("/super");

2.2.3 读取节点

//2. 设置path和data 并且读取子节点和每个节点的内容
	zkc.createPersistent("/super", "1234");
	zkc.createPersistent("/super/c1", "c1内容");
	zkc.createPersistent("/super/c2", "c2内容");
	List<String> list = zkc.getChildren("/super");
	for(String p : list){
		System.out.println(p);
		String rp = "/super/" + p;
		//此处的readData是原生API
			String data = zkc.readData(rp);
			System.out.println("节点为:" + rp + ",内容为: " + data);
	}

2.2.4 判断是否存在

		//3. 更新和判断节点是否存在
		zkc.writeData("/super/c1", "新内容");
		System.out.println(zkc.readData("/super/c1"));
		System.out.println(zkc.exists("/super/c1"));

2.3 zkClinet不需要watcher

zkClient不用watcherwatcher参数,也就是说我们开发人员无需关心反复注册Watcher的问题,zkclient给我们提供了一套监听方式,可以使用监听节点的方式进行操作,剔除了繁琐的反复watcher操作,简化了代码的复杂程度

2.3.1 subscribeChildChanges方法

只监听父子节点的新增和删除,但是节点变化不监听

  • 参数一:path路径
  • 参数二:实现了IZkChildListener接口的类
public static void main(String[] args) throws Exception {
		ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), 5000);		
		//对父节点添加监听子节点变化。
		zkc.subscribeChildChanges("/super", new IZkChildListener() {
			@Override
			public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
				System.out.println("parentPath: " + parentPath);
				System.out.println("currentChilds: " + currentChilds);
			}
		});
		Thread.sleep(3000);		
		zkc.createPersistent("/super");
		Thread.sleep(1000);		
		zkc.createPersistent("/super" + "/" + "c1", "c1内容");
		Thread.sleep(1000);		
		zkc.createPersistent("/super" + "/" + "c2", "c2内容");
		Thread.sleep(1000);		
		zkc.delete("/super/c2");
		Thread.sleep(1000);			
		zkc.deleteRecursive("/super");
		Thread.sleep(Integer.MAX_VALUE);
	}

在这里插入图片描述

2.3.2 subscribeDataChanges方法

主要监听节点的删除和改变

public static void main(String[] args) throws Exception {
		ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), 5000);		
		zkc.createPersistent("/super", "1234");		
		//对父节点添加监听子节点变化。
		zkc.subscribeDataChanges("/super", new IZkDataListener() {
			@Override
			public void handleDataDeleted(String path) throws Exception {
				System.out.println("删除的节点为:" + path);
			}			
			@Override
			public void handleDataChange(String path, Object data) throws Exception {
				System.out.println("变更的节点为:" + path + ", 变更内容为:" + data);
			}
		});		
		Thread.sleep(3000);
		zkc.writeData("/super", "456", -1);
		Thread.sleep(1000);

		zkc.delete("/super");
		Thread.sleep(Integer.MAX_VALUE);
	}

在这里插入图片描述

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐