关于ZooKeeper的介绍,网上很多介绍,我也懒的在多写了..

直接上代码吧.. 长时间不用,会生疏..

关于环境, 我自己在虚拟机上装了CentOs6.8服务器,然后在上面装了ZooKeeper,单机模式吧

用不到集群模式


Java操作ZooKeeper

创建单个节点

	package com.java.module.zookeeper.zk_1;
	
	import org.apache.zookeeper.CreateMode;
	import org.apache.zookeeper.WatchedEvent;
	import org.apache.zookeeper.Watcher;
	import org.apache.zookeeper.ZooDefs;
	import org.apache.zookeeper.ZooKeeper;
	
	import java.util.concurrent.CountDownLatch;
	
	/**
	 *
	 */
	public class CreateZNode implements Watcher {
	
	    //超时时间
	    private static final int SESSION_TIMEOUT = 5000;
	
	    private ZooKeeper zk;
	    private CountDownLatch connectedSignal = new CountDownLatch(1);
	
	    /**
	     * 连接zookeeper
	     *
	     * @param host
	     * @throws Exception
	     */
	    public void connect(String host) throws Exception {
	        zk = new ZooKeeper(host, SESSION_TIMEOUT, this);
	        connectedSignal.await();
	    }
	
	    /**
	     * 关闭zk连接
	     *
	     * @throws Exception
	     */
	    public void closeZk() throws Exception {
	        zk.close();
	    }
	
	
	    @Override
	    public void process(WatchedEvent watchedEvent) {
	        if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
	            connectedSignal.countDown();
	        }
	    }
	
	
	    /**
	     * 创建zk节点
	     *
	     * @param zNode
	     * @throws Exception
	     */
	    public void createZNode(String zNode) throws Exception {
	        String path = "/" + zNode;
	        String createPath = zk.create(path, null/*data*/, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
	        System.out.println(createPath);
	    }
	
	
	    public static void main(String[] args) {
	        try {
	
	            CreateZNode create = new CreateZNode();
	            create.connect("192.168.125.125:2181");
	            create.createZNode("testZNode");
	            create.closeZk();
	
	        } catch (Exception e) {
	            e.printStackTrace();
	        }
	    }
	
	}


在connect方法中,实例化了一个ZooKeeper类的对象,这个类是客户端中API中的主要类,用于维护客户端和ZooKeeper服务之间得连接.

ZooKeeper类的构造函数主要有三个参数 : 
1. ZooKeeper服务的主机地址
2. 以毫秒为单位的会话超时时间.
3. Watcher对象的实例.Watcher对象接收来自于ZooKeeper的回调,以获得各种事件的通知.

当一个ZooKeeper的实例被创建时,会启动一个线程连接到ZooKeeper服务.
由于构造函数的调用是立即返回的,因此在使用新建的ZooKeeper对象之前一定要等待其与ZooKeeper服务之间成功建立连接.
我们使用Java的CountDownLatch类来阻止使用新建的ZooKeeper对象,直到这个ZooKeeper对象已经准备就绪.
Watcher累被用于获取ZooKeeper对象是否准备就绪的信息,在它的接口中只有一个方法 : 
public void process(WatchedEvent watchedEvent)

当客户端已经与ZooKeeper服务建立连接后,Watcher的process()方法会被调用,参数是一个用于表示该链接的事件.
在接收到一个连接事件(以Watcher.Event.KeeperState的枚举型值SyncConnected来表示)时,我们通过调用CountDownLatch的countDown()方法来递减它的计数器.

锁存器(latch)被创建时带有一个值为1的计数器,用于表示在他释放所有等待线程之前需要发生的事件数.
在调用countDown()方法之后,计数器的值变为0,则await()方法返回.

现在connect()方法已返回,然后执行创建节点的方法.所需的参数 : 
1. 路径(字符串表示)
2. znode的内容(字节数组)
3. 访问控制列表
4. 节点类型(暂时/持久)
a) 暂时(临时) : 当创建znode的客户端断开连接时,临时节点都会被删除.
b) 持久(永久) : 当创建znode的客户端断开连接时,永久节点不会被删除.


多节点创建

注意 : 创建多级zNode节点,必须是父node已经创建的前提下,否则会报错.
也就是说,一下子无法创建多级node节点.
Demo代码 创建的子节点是临时节点,在客户端断开连接时(程序休眠结束)可以去服务器上验证下.

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.util.concurrent.CountDownLatch;

/**
 * Created by CYX on 2017/3/14.
 */
public class ConnectionWatcher implements Watcher {

	//超时时间
	private static final int SESSION_TIMEOUT = 5000;

	protected ZooKeeper zk;
	private CountDownLatch connectedSignal = new CountDownLatch(1);

	/**
	 * 连接zookeeper
	 *
	 * @param host
	 * @throws Exception
	 */
	public void connect(String host) throws Exception {
		zk = new ZooKeeper(host, SESSION_TIMEOUT, this);
		connectedSignal.await();
	}

	/**
	 * 关闭zk连接
	 *
	 * @throws Exception
	 */
	public void closeZk() throws Exception {
		zk.close();
	}

	@Override
	public void process(WatchedEvent watchedEvent) {
		if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
			connectedSignal.countDown();
		}
	}

}



import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooDefs;

/**
 * Created by CYX on 2017/3/14.
 */
public class JoinGroup extends ConnectionWatcher {

	@Override
	public void connect(String host) throws Exception {
		super.connect(host);
	}

	@Override
	public void closeZk() throws Exception {
		super.closeZk();
	}

	@Override
	public void process(WatchedEvent watchedEvent) {
		super.process(watchedEvent);
	}

	public void join(String groupName , String memberName)throws Exception {
		String path = "/" + groupName + "/" + memberName;
		String createPath = zk.create(path, null/*data*/, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
		System.out.println(createPath);
	}

	public static void main(String[] args) throws Exception{
		JoinGroup joinGroup = new JoinGroup();
		joinGroup.connect("192.168.125.125:2181");
		joinGroup.join("testZNode", "twoPath");
		Thread.sleep(30000);
	}

}
注意 :ConnectionWatcher这个类后面的例子中是公有的,写Demo的时候注意.

客户端还连接着,子节点是存在的.

客户端断开连接之后,子节点消失



获取父节点下的子节点

package com.java.module.zookeeper.zk_3;

import com.java.module.zookeeper.util.ConnectionWatcher;
import org.apache.zookeeper.WatchedEvent;

import java.util.List;

/**
 * 获取节点下的子节点
 */
public class ListGroup extends ConnectionWatcher {

	@Override
	public void connect(String host) throws Exception {
		super.connect(host);
	}

	@Override
	public void closeZk() throws Exception {
		super.closeZk();
	}

	@Override
	public void process(WatchedEvent watchedEvent) {
		super.process(watchedEvent);
	}


	public void listGroup(String groupName) throws Exception {
		String path = "/" + groupName;

		List<String> nodes = zk.getChildren(path, false);
		if (nodes.isEmpty())
			System.out.println("节点 : " + path + " 下没有子节点 ");

		for (String node : nodes) {
			System.out.println(node);
		}

	}


	public static void main(String[] args) throws  Exception{
		ListGroup listGroup = new ListGroup();
		listGroup.connect("192.168.125.125:2181");
		listGroup.listGroup("testZNode");
	}

}



删除节点

注意 : zk不支持递归删除节点,所以必须先将子节点删除才能删除父节点.

package com.java.module.zookeeper.zk_4;

import com.java.module.zookeeper.util.ConnectionWatcher;
import org.apache.zookeeper.WatchedEvent;

import java.util.List;

/**
 * 删除zk上的节点.
 * zk不支持递归删除节点,所以必须先将子节点删除才能删除父节点.
 */
public class DeleteGroup extends ConnectionWatcher {

	@Override
	public void connect(String host) throws Exception {
		super.connect(host);
	}

	@Override
	public void closeZk() throws Exception {
		super.closeZk();
	}

	@Override
	public void process(WatchedEvent watchedEvent) {
		super.process(watchedEvent);
	}


	public void delete(String groupName) throws Exception {
		String path = "/" + groupName;
		try {
			List<String> nodes = zk.getChildren(path, false);
			for (String node : nodes) {
				zk.delete(path + "/" + node, -1);
			}
			zk.delete(path, -1);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}


	public static void main(String[] args) throws Exception {
		DeleteGroup deleteGroup = new DeleteGroup();
		deleteGroup.connect("192.168.125.125:2181");
		deleteGroup.delete("testZNode");
	}

}


好了,今天到此为止,其与的后面有时间再搞..


Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐