0、安装并配置jdk 1.7
1、下载zookeeper
2、同步时间ntp
3、解压zookeeper到/opt
4、配置
dataDir
server.x=nodex:2888:3888
server.x=nodex:2888:3888
server.x=nodex:2888:3888
server.x=nodex:2888:3888
server.x=nodex:2888:3888
5、创建myid,写服务器编号
6、启动
zkServer.sh start 启动
zkServer.sh stop 停止
zkServer.sh status 查看zookeeper状态
zkCli.sh 开启zookeeper客户端
7、zookeeper客户端的操作
ls / 查看跟节点下有哪些节点
create /huawei “hello” 创建一个节点,数据为"hello"节点中保存的数据,不超过1M
get /huawei 获取节点中的数据
set /huawei “good” 修改节点数据->mZxid修改事务ID mtime修改的时间
create -e /huawei/huawei01 “” 临时节点创建,临时节点没有子节点,该数据可以在其他节点看到,如果关掉创建临时节点的客户端,则在另一个session中查看,在超过过期时间之后该临时节点消失
create -s /huawei/god “”
Created /sxt/god0000000002 重复创建,则该数字增大
socket加zookeeper代码实现

服务端

  • package com.huawei.zkdemo;

    import java.io.BufferedReader; import java.io.BufferedWriter;
    import java.io.IOException; import java.io.InputStream; import
    java.io.InputStreamReader; import java.io.OutputStream; import
    java.io.OutputStreamWriter; import java.net.ServerSocket; import
    java.net.Socket; import java.util.concurrent.CountDownLatch;
    
    import org.apache.zookeeper.CreateMode; import
    org.apache.zookeeper.WatchedEvent; import
    org.apache.zookeeper.Watcher; import
    org.apache.zookeeper.ZooDefs; import
    org.apache.zookeeper.ZooKeeper;
    
    /**  * @author Lpf.  * @version 创建时间:2019年4月16日 下午10:16:54 服务端 
    */ public class SocketServer {
    
    	private ServerSocket server; 	private int port = 10001; 	//
    另起线程记录线程信息 	private CountDownLatch lacth = new CountDownLatch(1);
    
    	// 初始化的时候把server实例化 	public SocketServer() throws Exception {
    		this.server = new ServerSocket(port);
    
    		// 向zookeeper注册信息 		registryService("localhost:" + port);
    
    		Socket client = null; 		while (true) { 			// 一直等着接收客户端链接
    			client = server.accept(); 			handleRequest(client); 		} 	}
    
    	private void registryService(String address) throws Exception {
    
    		ZooKeeper zk = new
    ZooKeeper("node01:2181,node02:2181,node03:2181", 3000, new
    Watcher() { 			@Override 			public void process(WatchedEvent
    event) {
    				// TODO Auto-generated method stub
    				if (event.getState() == Event.KeeperState.SyncConnected) {
    					lacth.countDown();
    				} 			} 		}); 		lacth.await();
    
    		byte[] data = ("localhost:" + port).getBytes();
    		zk.create("/myregistry/server", data,
    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); 	}
    
    	public void handleRequest(Socket client) throws IOException {
    		// 获取客户端输入流 		InputStream is = client.getInputStream(); 		//
    嵌套输入流 		BufferedReader reader = new BufferedReader(new
    InputStreamReader(is, "utf-8")); 		String requestString =
    reader.readLine();
    
    		System.out.println("客户端请求的字符串:" + requestString + "-" +
    System.currentTimeMillis()); 		OutputStream os =
    client.getOutputStream();
    
    		BufferedWriter writer = new BufferedWriter(new
    OutputStreamWriter(os, "utf-8")); 		writer.write("你好," +
    requestString); 		writer.newLine(); 		writer.flush();
    		writer.close(); 	}
    
    	public static void main(String[] args) throws Exception { 		new
    SocketServer(); 	} }
    

客户端

  • package com.huawei.zkdemo;

    import java.io.BufferedReader; import java.io.BufferedWriter;
    import java.io.InputStream; import java.io.InputStreamReader;
    import java.io.OutputStream; import java.io.OutputStreamWriter;
    import java.net.Socket; import java.util.ArrayList; import
    java.util.List; import java.util.concurrent.CountDownLatch;
    
    import org.apache.zookeeper.KeeperException; import
    org.apache.zookeeper.WatchedEvent; import
    org.apache.zookeeper.Watcher; import
    org.apache.zookeeper.ZooKeeper;
    
    /**  * @author Lpf.  * @version 创建时间:2019年4月16日 下午9:59:44 客户端  */
    public class SocketClient {
    
    	// 另起线程记录线程信息 	private CountDownLatch lacth = new
    CountDownLatch(1); 	private ZooKeeper zk; 	private List<String>
    addresses; 	private int index = 0;
    
    	private void connectZK() throws Exception { 		zk = new
    ZooKeeper("node01:2181,node02:2181,node03:2181", 3000, new
    Watcher() { 			@Override 			public void process(WatchedEvent
    event) {
    				// TODO Auto-generated method stub
    				if (event.getState() == Event.KeeperState.SyncConnected) {
    					lacth.countDown();
    				} 			} 		}); 		lacth.await();
    
    	}
    
    	private void getAddress() throws Exception { 		List<String>
    nodeNames = zk.getChildren("/myregistry", new Watcher() { 			//
    如果子节点发生变化 			@Override 			public void process(WatchedEvent event)
    {
    				// TODO Auto-generated method stub
    				if (event.getType() == Event.EventType.NodeChildrenChanged) {
    					try {
    						getAddress();
    					} catch (Exception e) {
    						// TODO Auto-generated catch block
    						e.printStackTrace();
    					}
    				} 			} 		});
    
    		addresses = new ArrayList<>(); 		for (String nodeName :
    nodeNames) { 			byte[] byteAddress = zk.getData("/myregistry/" +
    nodeName, null, null); 			String address = new
    String(byteAddress); 			addresses.add(address); 		} 	}
    
    	public void request(String str) throws Exception { 		String
    address = null; 		if (addresses != null) { 			if
    (addresses.size() == 1) {
    				address = addresses.get(0); 			} else {
    				address = addresses.get(index % addresses.size()); 			}
    
    		} 		String host = address.substring(0, address.indexOf(":"));
    		Integer myport =
    Integer.parseInt(address.substring(address.indexOf(":") + 1));
    
    		// 新开一个socket绑定地址和端口号 		Socket client = new Socket(host,
    myport); 		// 输出流 		OutputStream os = client.getOutputStream();
    		// 往外写东西 		BufferedWriter writer = new BufferedWriter(new
    OutputStreamWriter(os, "UTF-8")); 		writer.write(address +
    "-str:" + System.currentTimeMillis()); 		writer.newLine();
    		writer.flush();
    
    		// 读取输入流 		InputStream is = client.getInputStream(); 		//
    读取输入流的数据指定来源和字符集编码 		BufferedReader reader = new
    BufferedReader(new InputStreamReader(is, "utf-8"));
    
    		String line = reader.readLine(); 		System.out.println("服务端响应:"
    + line); 		client.close(); 		index++; 	}
    
    	public static void main(String[] args) throws Exception {
    		SocketClient client = new SocketClient(); 		// 连接ZK集群
    		client.connectZK(); 		// 获取指定节点的子节点信息,并放上监视器
    		client.getAddress(); 		while (true) { 			client.request("张三:" +
    System.currentTimeMillis()); 			Thread.sleep(3000); 		} 	} }
    
Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐