zookeeper安装>>客户端操作指令>>socket加zookeeper代码实现一步到位!!!!!
0、安装并配置jdk 1.71、下载zookeeper2、同步时间ntp3、解压zookeeper到/opt4、配置dataDirserver.x=nodex:2888:3888server.x=nodex:2888:3888server.x=nodex:2888:3888server.x=nodex:2888:3888server.x=nodex:2888:38885、创建...
0、安装并配置jdk 1.7
1、下载zookeeper
2、同步时间ntp
3、解压zookeeper到/opt
4、配置
dataDir
server.x=nodex:2888:3888
server.x=nodex:2888:3888
server.x=nodex:2888:3888
server.x=nodex:2888:3888
server.x=nodex:2888:3888
5、创建myid,写服务器编号
6、启动
zkServer.sh start 启动
zkServer.sh stop 停止
zkServer.sh status 查看zookeeper状态
zkCli.sh 开启zookeeper客户端
7、zookeeper客户端的操作
ls / 查看跟节点下有哪些节点
create /huawei “hello” 创建一个节点,数据为"hello"节点中保存的数据,不超过1M
get /huawei 获取节点中的数据
set /huawei “good” 修改节点数据->mZxid修改事务ID mtime修改的时间
create -e /huawei/huawei01 “” 临时节点创建,临时节点没有子节点,该数据可以在其他节点看到,如果关掉创建临时节点的客户端,则在另一个session中查看,在超过过期时间之后该临时节点消失
create -s /huawei/god “”
Created /sxt/god0000000002 重复创建,则该数字增大
socket加zookeeper代码实现
服务端
-
package com.huawei.zkdemo;
import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; /** * @author Lpf. * @version 创建时间:2019年4月16日 下午10:16:54 服务端 */ public class SocketServer { private ServerSocket server; private int port = 10001; // 另起线程记录线程信息 private CountDownLatch lacth = new CountDownLatch(1); // 初始化的时候把server实例化 public SocketServer() throws Exception { this.server = new ServerSocket(port); // 向zookeeper注册信息 registryService("localhost:" + port); Socket client = null; while (true) { // 一直等着接收客户端链接 client = server.accept(); handleRequest(client); } } private void registryService(String address) throws Exception { ZooKeeper zk = new ZooKeeper("node01:2181,node02:2181,node03:2181", 3000, new Watcher() { @Override public void process(WatchedEvent event) { // TODO Auto-generated method stub if (event.getState() == Event.KeeperState.SyncConnected) { lacth.countDown(); } } }); lacth.await(); byte[] data = ("localhost:" + port).getBytes(); zk.create("/myregistry/server", data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); } public void handleRequest(Socket client) throws IOException { // 获取客户端输入流 InputStream is = client.getInputStream(); // 嵌套输入流 BufferedReader reader = new BufferedReader(new InputStreamReader(is, "utf-8")); String requestString = reader.readLine(); System.out.println("客户端请求的字符串:" + requestString + "-" + System.currentTimeMillis()); OutputStream os = client.getOutputStream(); BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(os, "utf-8")); writer.write("你好," + requestString); writer.newLine(); writer.flush(); writer.close(); } public static void main(String[] args) throws Exception { new SocketServer(); } }
客户端
-
package com.huawei.zkdemo;
import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.net.Socket; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; /** * @author Lpf. * @version 创建时间:2019年4月16日 下午9:59:44 客户端 */ public class SocketClient { // 另起线程记录线程信息 private CountDownLatch lacth = new CountDownLatch(1); private ZooKeeper zk; private List<String> addresses; private int index = 0; private void connectZK() throws Exception { zk = new ZooKeeper("node01:2181,node02:2181,node03:2181", 3000, new Watcher() { @Override public void process(WatchedEvent event) { // TODO Auto-generated method stub if (event.getState() == Event.KeeperState.SyncConnected) { lacth.countDown(); } } }); lacth.await(); } private void getAddress() throws Exception { List<String> nodeNames = zk.getChildren("/myregistry", new Watcher() { // 如果子节点发生变化 @Override public void process(WatchedEvent event) { // TODO Auto-generated method stub if (event.getType() == Event.EventType.NodeChildrenChanged) { try { getAddress(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }); addresses = new ArrayList<>(); for (String nodeName : nodeNames) { byte[] byteAddress = zk.getData("/myregistry/" + nodeName, null, null); String address = new String(byteAddress); addresses.add(address); } } public void request(String str) throws Exception { String address = null; if (addresses != null) { if (addresses.size() == 1) { address = addresses.get(0); } else { address = addresses.get(index % addresses.size()); } } String host = address.substring(0, address.indexOf(":")); Integer myport = Integer.parseInt(address.substring(address.indexOf(":") + 1)); // 新开一个socket绑定地址和端口号 Socket client = new Socket(host, myport); // 输出流 OutputStream os = client.getOutputStream(); // 往外写东西 BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(os, "UTF-8")); writer.write(address + "-str:" + System.currentTimeMillis()); writer.newLine(); writer.flush(); // 读取输入流 InputStream is = client.getInputStream(); // 读取输入流的数据指定来源和字符集编码 BufferedReader reader = new BufferedReader(new InputStreamReader(is, "utf-8")); String line = reader.readLine(); System.out.println("服务端响应:" + line); client.close(); index++; } public static void main(String[] args) throws Exception { SocketClient client = new SocketClient(); // 连接ZK集群 client.connectZK(); // 获取指定节点的子节点信息,并放上监视器 client.getAddress(); while (true) { client.request("张三:" + System.currentTimeMillis()); Thread.sleep(3000); } } }
更多推荐
所有评论(0)