011 zkClient操作zookeeper集群
1、导入依赖<dependency><groupId>com.101tec</groupId><artifactId>zkclient</artifactId><version>0.10</version></dependency>2.编写序列化类(解决获取数据时候无法序列化而报错)import org
·
1、导入依赖
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
2.编写序列化类(解决获取数据时候无法序列化而报错)
import org.I0Itec.zkclient.exception.ZkMarshallingError;
public class ZkSerializer implements org.I0Itec.zkclient.serialize.ZkSerializer {
//序列化,数据--》byte[]
public byte[] serialize(Object o) throws ZkMarshallingError {
return String.valueOf(o).getBytes();
}
//反序列化,byte[]--->数据
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
return new String(bytes);
}
}
3、常规操作(增删查改)
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.zookeeper.CreateMode;
import java.util.List;
public class ZkTest {
//zookeeper集群地址
public static final String URL = "192.168.112.101:2181,192.168.112.102:2181,192.168.112.103:2181";
//连接超时时间
public static final int TIME_OUT = 5000;
public static void main(String[] args) throws InterruptedException {
ZkClient client = new ZkClient(new ZkConnection(URL),TIME_OUT);
//设置序列化
client.setZkSerializer(new ZkSerializer());
//增加临时节点
client.createEphemeral("/a1",123);
//增加永久节点
client.create("/a2",123, CreateMode.PERSISTENT);
//Thread.sleep(10000);
//删除节点
client.delete("/a2");
//查询节点有哪些子节点
List<String> children = client.getChildren("/");
System.out.println(children);
//获取节点数据
Object data = client.readData("/a3");
System.out.println(data);
//修改
client.writeData("/a3",456);
client.close();
}
}
4.监听操作
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.apache.zookeeper.CreateMode;
import java.util.List;
public class ZkWatch {
//zookeeper集群地址
public static final String URL = "192.168.112.101:2181,192.168.112.102:2181,192.168.112.103:2181";
//连接超时时间
public static final int TIME_OUT = 5000;
public static void main(String[] args) throws InterruptedException {
ZkClient client = new ZkClient(new ZkConnection(URL),TIME_OUT);
//设置序列化
client.setZkSerializer(new SerializableSerializer());
//开始监听节点变化
client.subscribeChildChanges("/", new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println(parentPath);
System.out.println(currentChilds);
}
});
//开始监听节点数据变化
client.subscribeDataChanges("/a4", new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.println("dataPath:" + dataPath);
System.out.println("data:" + data);
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("dataPath:" + dataPath);
}
});
Thread.sleep(2000);
new Thread(new Runnable() {
@Override
public void run() {
client.writeData("/a4",56);
}
}).start();
Thread.sleep(60000);
client.close();
}
}
5.自定义zkCli.sh
5.1编写MyCli
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import java.util.List;
import java.util.Scanner;
public class MyCli {
public static void main(String[] args) throws InterruptedException {
ZkClient client = new ZkClient(new ZkConnection(args[0]),Integer.parseInt(args[01]));
//设置序列化
client.setZkSerializer(new SerializableSerializer());
Scanner sc = new Scanner(System.in);
loop:while(true){
String cmd = sc.nextLine();
String[] cmds = cmd.split(" ");
switch (cmds[0]){
case "list":
List<String> children = client.getChildren(cmds[1]);
for (String str:children) {
System.out.print(str + " ");
}
System.out.println();
case "quit":
client.close();
break loop;
}
}
sc.close();
}
}
5.2修改pom.xml文件(添加打包插件)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.bigdata</groupId>
<artifactId>bigdata_zookeeper</artifactId>
<version>1.0</version>
<dependencies>
<!-- zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
<!-- zkClient 客户端连接-->
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- maven编译插件-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<testExcludes>
<testExclude>/src/test/**</testExclude>
</testExcludes>
<encoding>utf-8</encoding>
</configuration>
</plugin>
<!-- maven项目打包插件-->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>MyCli</mainClass> <!-- jar包程序的入口主类-->
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this is used for inheritance merges -->
<phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
5.3.1打包(IDEA)
5.3.1打包(eclipse) 右键点击项目名
5.4执行
上传jar包至服务器
java -jar *.jar 参数1 参数2 ...
更多推荐
已为社区贡献1条内容
所有评论(0)