zk集群搭建以及java客户端连接zk
项目终于不忙了.闲来无事,想起上次面试被人问了一个zk把我给问住了.看来要好好了解一下zk了.于是开始了zk的学习之路.首先带大家搭建一下zk集群环境,这个很简单,我就不说那么详细了,首先准备三台机器,来安装zk集群 要有root权限,因为要改hosts下载zk安装包上传到liunx服务器 我用的zookeeper.3.4.10.tar.gz 你们用其他
项目终于不忙了.闲来无事,想起上次面试被人问了一个zk把我给问住了.看来要好好了解一下zk了.于是开始了zk的学习之路.
首先带大家搭建一下zk集群环境,这个很简单,我就不说那么详细了,
首先准备三台机器,来安装zk集群 要有root权限,因为要改hosts
下载zk安装包上传到liunx服务器 我用的zookeeper.3.4.10.tar.gz 你们用其他版本也行
解压 tar -zxvf .....
个人习惯重命名 为zookeeper 在当前用户的家目录
然后cd到conf这个目录下面
重命名zoo.cfg 然后打开这个文件 重点两点 一个是datadir 我是直接在zookeeper下创建了data目录 所以我的配置是 datadir=/home/gang/zookeeper/data
然后配置 server.0=uamserver:2888:3888
server.1=uamserver1:2888:3888
server.2=uamserver2:2888:3888
uamserver 和uamserver1 ,uamserver2分别对应三个机器.三个机器都配置一样
然后在data目录下创建myid文件 内容分别是0 1 2 这三个数server0对应的myid为0 切换到root用户修改hosts文件
然后启动切换到zookeeper/bin目录下./zkServer.sh start 启动三台后 验证是否成功 ./zkServer.sh status
这时候
[was@uamserver bin]$ ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /home/was/zookeeper1/bin/../conf/zoo.cfg
Mode: follower
[was@uamserver bin]$ ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /home/was/zookeeper3/bin/../conf/zoo.cfg
Mode: leader
这时候说明集群已经搭建成功
现在我们通过zk原生api来操作下zk,java客户端
public class Zookeeper {
private static CountDownLatch countDownLatch=new CountDownLatch(1);
private final static String zk_url="1.11.11.1:2181,1.11.11.2:2182,1.11.11.3:2183";
private final static int time_out=5000;
public static void main(String[] arges) throws IOException, InterruptedException {
//初始化zk
ZooKeeper zooKeeper=new ZooKeeper(zk_url, time_out, new Watcher() {
public void process(WatchedEvent watchedEvent) {
Event.KeeperState state = watchedEvent.getState();
Event.EventType type = watchedEvent.getType();
if(Event.KeeperState.SyncConnected==state){
if(Event.EventType.None==type){
//调用此方法测计数减一
countDownLatch.countDown();
}
}
}
});
//阻碍当前线程进行,除非计数归零
countDownLatch.await();
try {
//创建持久化节点
zooKeeper.create("/gang","你好".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
//获取节点数据
byte[] data = zooKeeper.getData("/gang", false, null);
System.out.println(new String(data));
//修改节点数据
zooKeeper.setData("/gang","吕金刚".getBytes(),0);
//删除节点数据
zooKeeper.delete("/gang",-1);
//创建临时节点 异步创建
zooKeeper.create("/jingang", "临时节点".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new AsyncCallback.StringCallback() {
@Override
public void processResult(int i, String s, Object o, String s1) {
System.out.println(o);
System.out.println(i);
System.out.println(s1);
System.out.println(s);
}
},"a");
//获取临时节点数据
byte[] jingangs = zooKeeper.getData("/jingang", false, null);
System.out.println(new String(jingangs));
//验证节点是否存在
Stat exists = zooKeeper.exists("/jingang", false);
System.out.println(exists);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
zooKeeper.close();
}
}
咱们把他简单的封装一下
public class ZookWatcher implements Watcher {
//默认从0开始,原子性
private AtomicInteger count = new AtomicInteger();
private static ZooKeeper zk;
private static final int time_out=10000;
private static CountDownLatch countDownLatch=new CountDownLatch(1);
private static final String url="1.11.11.1:2181,1.11.11.2:2182,1.11.11.3:2183";
/**
* 重写process用来监控watcher事件
* @param watchedEvent
*/
@Override
public void process(WatchedEvent watchedEvent) {
Event.EventType type = watchedEvent.getType();
Event.KeeperState state = watchedEvent.getState();
//是否连接成功
if(Event.KeeperState.SyncConnected==state){
if(Event.EventType.None==type){
System.out.println("成功连接到zk服务器");
//zk连接成功将计数器清零
countDownLatch.countDown();
}
//如果是创建节点事件
else if(Event.EventType.NodeCreated==type){
System.out.println("创建节点成功");
}
//如果节点中的数据被改变
else if(Event.EventType.NodeDataChanged==type){
System.out.println("节点数据发生变化");
}
//如果子节点的数据发生改变
else if(Event.EventType.NodeChildrenChanged==type){
System.out.println("子节点数据发生改变");
}
//节点被删除事件
else if(Event.EventType.NodeDeleted==type){
System.out.println("节点被删除");
}else;
}else if(Event.KeeperState.Disconnected==state){
//连接失败直接退出
return;
}
try {
//阻塞当前线程,直到zk初始化完毕.
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 获取zk连接
* @return
* @throws InterruptedException
*/
public ZooKeeper createConntection() throws InterruptedException {
try {
zk=new ZooKeeper(url, time_out, this);
//阻塞当前线程,当创好连接以后再继续执行当前线程
countDownLatch.await();
} catch (IOException e) {
e.printStackTrace();
}
return zk;
}
/**
* 关闭zk连接
* @param zooKeeper
*/
public void close(ZooKeeper zooKeeper){
try {
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 验证此节点是否存在
* @param path
* @param watcher
* @return
*/
public Stat extis(String path,boolean watcher){
try {
Stat exists = zk.exists(path, watcher);
return exists;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
/**
* 创建节点
* @param path 节点路径
* @param data 数据
* @return
*/
public String createNode(String path,String data,boolean watcher){
String result=null;
try {
Stat exists = zk.exists(path, watcher);
if(null==exists){
result= zk.create(path,data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
}else{
System.out.println("节点已经存在");
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return result;
}
/**
* 修改节点的数据
* @param path 节点路径
* @param data 数据
* @param version 版本号
* @return
*/
public Stat setNodeData(String path,String data,int version,boolean watcher){
Stat stat =null;
try {
Stat exists = zk.exists(path, watcher);
stat = zk.setData(path, data.getBytes(), version);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return stat;
}
/**
* 删除节点
* @param path 节点的路径
* @param version 版本号,当输入-1时候是指所有数据
*/
public void deleteNode(String path,int version,boolean watcher){
try {
Stat exists = zk.exists(path, watcher);
zk.delete(path,version);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
/**
* 获取节点数据
* @param path
* @return
*/
public String getNodeData(String path,boolean watcher){
byte[] data=null;
try {
data = zk.getData(path, watcher, null);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return new String(data);
}
}
下篇重点介绍一下zk的特性,以及watcher和watcher事件
更多推荐
所有评论(0)