Zookeeper-Java客户端API的基本使用
1.java中zk api的简单应用package com.ithzk.zookeeper;import org.apache.zookeeper.*;import org.apache.zookeeper.data.Stat;import org.junit.Before;import org.junit.Test;import java.io.IOException;i...
·
Zookeeper-Java客户端API的基本使用
1.java中zk api的简单应用
package com.ithzk.zookeeper;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
/**
* @author hzk
* @date 2018/3/26
*/
public class SimpleZkClient {
private static final String CONNECTION_ADDRESS = "10.102.150.65:2181,10.102.151.43:2181";
private static final int SESSION_TIMEOUT = 2000;
ZooKeeper zkClient = null;
@Before
public void init() throws IOException {
zkClient = new ZooKeeper(CONNECTION_ADDRESS, SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//收到事件通知后的回调函数
System.out.println(watchedEvent.getType()+"->"+watchedEvent.getPath());
try {
zkClient.getChildren("/", true);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
//创建节点
@Test
public void createNode() throws KeeperException, InterruptedException {
/**
* 节点路径 节点数据 节点权限 节点类型
*/
zkClient.create("/zk_one","hello_zk".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
/**
* 判断节点是否存在
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void isExist() throws KeeperException, InterruptedException {
Stat exists = zkClient.exists("/uu", false);
System.out.println(exists);
}
// 获取子节点
@Test
public void getChildren() throws KeeperException, InterruptedException {
// 路径 监听器
List<String> children = zkClient.getChildren("/", true);
for (String child : children) {
System.out.println(child);
}
Thread.sleep(Long.MAX_VALUE);
}
// 获取数据
@Test
public void getData() throws KeeperException, InterruptedException {
byte[] data = zkClient.getData("/uu", false, null);
System.out.println(new String(data));
}
// 删除数据
@Test
public void deleteData() throws KeeperException, InterruptedException {
//参数2 指定删除版本 -1代表删除所有版本
zkClient.delete("/uu",-1);
}
// 修改数据
@Test
public void setData() throws KeeperException, InterruptedException {
zkClient.setData("/uu1","set data success".getBytes(),-1);
}
}
2.模拟多台服务器上下线监控
服务器
修改HOSTNAME 模拟多台服务器
package com.ithzk.zookeeper;
import org.apache.zookeeper.*;
import java.io.IOException;
/**
* @author hzk
* @date 2018/3/26
*/
public class DistrubutedServer {
private static final String CONNECTION_ADDRESS = "10.102.150.65:2181,10.102.151.43:2181";
private static final String GROUP_NODE = "/servers";
// 模拟服务器1
private static final String HOSTNAME = "myServer1";
// 模拟服务器2
private static final String HOSTNAME = "myServer2";
private static final int SESSION_TIMEOUT = 2000;
ZooKeeper zkClient = null;
/**
* 创建zk客户端连接
* @throws IOException
*/
public void getConnection() throws IOException {
zkClient = new ZooKeeper(CONNECTION_ADDRESS, SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//收到事件通知后的回调函数
System.out.println(watchedEvent.getType()+"->"+watchedEvent.getPath());
try {
zkClient.getChildren("/", true);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
/**
* 向zk集群注册服务器信息
* @param hostName
* @throws KeeperException
* @throws InterruptedException
*/
public void register(String hostName) throws KeeperException, InterruptedException {
String node = zkClient.create(GROUP_NODE + "/server", hostName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostName+"is register success! node:"+node);
}
/**
* 业务功能
*/
public void handleBussiness(String hostName) throws InterruptedException {
System.out.println(hostName+" is working!");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
//1.获取zookeeper连接
DistrubutedServer distrubutedServer = new DistrubutedServer();
distrubutedServer.getConnection();
//2.注册服务器信息
distrubutedServer.register(HOSTNAME);
//3.启动业务功能
distrubutedServer.handleBussiness(HOSTNAME);
}
}
客户端
package com.ithzk.zookeeper;
import org.apache.zookeeper.*;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
/**
* @author hzk
* @date 2018/3/26
*/
public class DistrubutedClient {
private static final String CONNECTION_ADDRESS = "10.102.150.65:2181,10.102.151.43:2181";
private static final String GROUP_NODE = "/servers";
private static final String HOSTNAME = "myServer1";
private static final int SESSION_TIMEOUT = 2000;
//多线程运行时 若不同线程对同一个变量进行操作 实际会将该变量从堆内存中拷贝到线程本身的栈内存 每个线程中都有自身的副本
//volatile使得所有线程不拷贝副本 直接只访问同一个内存空间
private volatile List<String> serverList;
private ZooKeeper zkClient = null;
/**
* 创建zk客户端连接
* @throws IOException
*/
public void getConnection() throws IOException {
zkClient = new ZooKeeper(CONNECTION_ADDRESS, SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//收到事件通知后的回调函数
try {
//重新更新服务器列表 并且注册监听
getServerList();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
/**
*
* @throws KeeperException
* @throws InterruptedException
*/
public void getServerList() throws KeeperException, InterruptedException, UnsupportedEncodingException {
//获取子节点 服务器信息并且对父节点进行监听
List<String> children = zkClient.getChildren(GROUP_NODE, true);
//创建局部list存储服务器信息
ArrayList<String> servers = new ArrayList<>();
for (String child : children) {
//child 是子节点节点名
byte[] data = zkClient.getData(GROUP_NODE + "/" + child, false, null);
servers.add(new String(data,"utf-8"));
}
//利用volatile变量存储服务器信息
serverList = servers;
//打印服务器列表
System.out.println(serverList);
}
/**
* 业务功能
*/
public void handleBussiness() throws InterruptedException {
System.out.println("client is working!");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
//1.获取zookeeper连接
DistrubutedClient distrubutedClient = new DistrubutedClient();
distrubutedClient.getConnection();
//2.获取servers子节点信息,从而获得服务器信息
distrubutedClient.getServerList();
//3.业务逻辑
distrubutedClient.handleBussiness();
}
}
3.zookeeper客户端线程的属性(守护线程)
zookeeper内部将listener设置为守护线程,Java的线程分为两种:User Thread(用户线程)、DaemonThread(守护线程)。只要当前JVM实例中尚存任何一个非守护线程没有结束,守护线程就全部工作;只有当最后一个非守护线程结束是,守护线程随着JVM一同结束工作,Daemon作用是为其他线程提供便利服务,守护线程最典型的应用就是GC(垃圾回收器),他就是一个很称职的守护者。
User和Daemon两者几乎没有区别,唯一的不同之处就在于虚拟机的离开:如果 User Thread已经全部退出运行了,只剩下Daemon Thread存在了,虚拟机也就退出了。 因为没有了被守护者,Daemon也就没有工作可做了,也就没有继续运行程序的必要了。
package com.ithzk.zookeeper;
/**
* @author hzk
* @date 2018/3/28
*/
public class DefendThread {
public static void main(String[] args){
System.out.println("主线程启动了");
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("线程开始了");
while (true) {
}
}
});
//设置为守护线程 zookeeper 内部设置listener为守护线程
thread.setDaemon(true);
thread.start();
}
}
更多推荐
已为社区贡献11条内容
所有评论(0)