1、导入依赖

<dependency>    
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.10</version>
</dependency>

2.编写序列化类(解决获取数据时候无法序列化而报错)

import org.I0Itec.zkclient.exception.ZkMarshallingError;

public class ZkSerializer implements org.I0Itec.zkclient.serialize.ZkSerializer {
        //序列化,数据--》byte[]
        public byte[] serialize(Object o) throws ZkMarshallingError {
            return String.valueOf(o).getBytes();
        }
        //反序列化,byte[]--->数据
        public Object deserialize(byte[] bytes) throws ZkMarshallingError {
            return new String(bytes);
        }
}

3、常规操作(增删查改)

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.zookeeper.CreateMode;

import java.util.List;

public class ZkTest {
    //zookeeper集群地址
    public static final String  URL = "192.168.112.101:2181,192.168.112.102:2181,192.168.112.103:2181";
    //连接超时时间
    public static final int TIME_OUT = 5000;
    public static void main(String[] args) throws InterruptedException {
        ZkClient client = new ZkClient(new ZkConnection(URL),TIME_OUT);
        //设置序列化
        client.setZkSerializer(new ZkSerializer());
        //增加临时节点
        client.createEphemeral("/a1",123);
        //增加永久节点
        client.create("/a2",123, CreateMode.PERSISTENT);
        //Thread.sleep(10000);
        //删除节点
        client.delete("/a2");
        //查询节点有哪些子节点
        List<String> children = client.getChildren("/");
        System.out.println(children);
        //获取节点数据
        Object data = client.readData("/a3");
        System.out.println(data);
        //修改
        client.writeData("/a3",456);
        client.close();
    }
}

4.监听操作

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.apache.zookeeper.CreateMode;

import java.util.List;

public class ZkWatch {
    //zookeeper集群地址
    public static final String  URL = "192.168.112.101:2181,192.168.112.102:2181,192.168.112.103:2181";
    //连接超时时间
    public static final int TIME_OUT = 5000;
    public static void main(String[] args) throws InterruptedException {
        ZkClient client = new ZkClient(new ZkConnection(URL),TIME_OUT);
        //设置序列化
        client.setZkSerializer(new SerializableSerializer());
        //开始监听节点变化
        client.subscribeChildChanges("/", new IZkChildListener() {
            @Override
            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                System.out.println(parentPath);
                System.out.println(currentChilds);
            }
        });
        //开始监听节点数据变化
        client.subscribeDataChanges("/a4", new IZkDataListener() {
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
                System.out.println("dataPath:" + dataPath);
                System.out.println("data:" + data);
            }

            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println("dataPath:" + dataPath);
            }
        });
        Thread.sleep(2000);
        new Thread(new Runnable() {
            @Override
            public void run() {
                client.writeData("/a4",56);
            }
        }).start();
        Thread.sleep(60000);
        client.close();
    }
}

5.自定义zkCli.sh

5.1编写MyCli

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.I0Itec.zkclient.serialize.SerializableSerializer;

import java.util.List;
import java.util.Scanner;

public class MyCli {
    public static void main(String[] args) throws InterruptedException {
        ZkClient client = new ZkClient(new ZkConnection(args[0]),Integer.parseInt(args[01]));
        //设置序列化
        client.setZkSerializer(new SerializableSerializer());
        Scanner sc = new Scanner(System.in);
        loop:while(true){
            String cmd = sc.nextLine();
            String[] cmds = cmd.split(" ");
            switch (cmds[0]){
                case "list":
                    List<String> children = client.getChildren(cmds[1]);
                    for (String str:children) {
                        System.out.print(str + " ");
                    }
                    System.out.println();
                case "quit":
                    client.close();
                    break loop;
            }
        }
        sc.close();
    }
}

5.2修改pom.xml文件(添加打包插件)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.bigdata</groupId>
    <artifactId>bigdata_zookeeper</artifactId>
    <version>1.0</version>
    <dependencies>
        <!-- zookeeper -->
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.6</version>
        </dependency>
        <!-- zkClient 客户端连接-->
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.10</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <!-- maven编译插件-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <testExcludes>
                        <testExclude>/src/test/**</testExclude>
                    </testExcludes>
                    <encoding>utf-8</encoding>
                </configuration>
            </plugin>
            <!-- maven项目打包插件-->
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>MyCli</mainClass>  <!-- jar包程序的入口主类-->
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id> <!-- this is used for inheritance merges -->
                        <phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 -->
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

5.3.1打包(IDEA)

5.3.1打包(eclipse) 右键点击项目名

5.4执行

上传jar包至服务器

java  -jar  *.jar  参数1   参数2  ...

 

 

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐