zookeeper的java API使用及典型应用场景
目录javaAPI使用原生APIzkclientcurator典型应用场景分布式锁选主服务javaAPI使用原生APIzkclientcurator典型应用场景分布式锁选主服务
1. javaAPI使用
1.1 原生API
1.1.1 引入pom
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.8</version>
</dependency>
1.1.2 建立连接
package com.lulf.javaapi;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
public class CreateSessionDemo {
private final static String CONNECTSTRING = "192.168.48.128:2181,192.168.48.130:2181,"
+ "192.168.48.131:2181,192.168.48.132:2181";
private static CountDownLatch countDownLatch=new CountDownLatch(1);
public static void main(String[] args) throws IOException, InterruptedException {
ZooKeeper zooKeeper = new ZooKeeper(CONNECTSTRING, 5000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
//如果当前连接状态是连接成功的,通过计数器去控制
if(watchedEvent.getState()==Event.KeeperState.SyncConnected){
countDownLatch.countDown();
System.out.println(watchedEvent.getState());
}
}
});
countDownLatch.await();
System.out.println("test "+zooKeeper.getState());
}
}
1.1.3 增删改查节点
//创建节点
String nodePath = "/lulfZkDemo1";
String msg = "12345";
byte[] data = msg.getBytes();
//权限控制 ANYONE_ID_UNSAFE 就是world :开放式的权限控制模式 访问权限对所有用户开放 world:anyone
//super :超级用户 可以对zookeeper上的数据节点进行操作
List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
//配置节点类型,持久 持久有序 临时 临时有序
CreateMode createMode = CreateMode.EPHEMERAL;
try {
String result = zooKeeper.create(nodePath, data, acl, createMode);
System.out.println("创建成功" + result);
} catch (KeeperException e) {
e.printStackTrace();
}
增删改查demo 补上watcher机制
package com.hikvision.rabbitmq.zookeeper;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* @ClassName createNodeDemo
* @Description TODO
* @Autuor lulinfeng
* @Date 2020/8/31
* @Version 1.0
*/
public class createNodeDemo implements Watcher {
private final static String CONNECTSTRING = "192.168.48.128:2181,192.168.48.130:2181,"
+ "192.168.48.131:2181,192.168.48.132:2181";
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static ZooKeeper zooKeeper = null;
private static Stat stat = new Stat();
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
zooKeeper = new ZooKeeper(CONNECTSTRING, 5000, new createNodeDemo());
countDownLatch.await();
System.out.println("test " + zooKeeper.getState());
// 创建节点
String path = "/Lulfdemo2";
byte[] data = "1234".getBytes();
List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
CreateMode createMode = CreateMode.EPHEMERAL;
String result = zooKeeper.create(path, data, acl, createMode);
zooKeeper.getData(path, true, stat);// 增加一个watcher
System.out.println("创建成功 " + result);
// 修改数据 -1代表不用管他版本号
zooKeeper.setData(path, "311".getBytes(), -1);
//删除节点
zooKeeper.delete(path, -1);
Thread.sleep(2000);
//创建节点和子节点【这边要注意只有持久化节点才有子节点】
zooKeeper.create("/LULF", "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
TimeUnit.SECONDS.sleep(1);
zooKeeper.create("/LULF/lulf1-1", "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
TimeUnit.SECONDS.sleep(1);
}
@Override
public void process(WatchedEvent watchedEvent) {
// 如果当前连接状态是连接成功的,通过计数器去控制
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
if (Event.EventType.None == watchedEvent.getType() || null == watchedEvent.getType()) {
countDownLatch.countDown();
System.out.println(watchedEvent.getState());
} else if (watchedEvent.getType() == Event.EventType.NodeDataChanged) {
try {
System.out.println("路径 " + watchedEvent.getPath() + "改变后的值 "
+ zooKeeper.getData(watchedEvent.getPath(), true, new Stat()));
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
} else if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
} else if (watchedEvent.getType() == Event.EventType.NodeCreated) {
try {
System.out.println("路径 " + watchedEvent.getPath() + "节点的值 "
+ zooKeeper.getData(watchedEvent.getPath(), true, new Stat()));
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
} else if (watchedEvent.getType() == Event.EventType.NodeDeleted) {
System.out.println("删除路径 " + watchedEvent.getPath());
}
}
}
}
1.1.3.1 watcher 连接状态
1)keeperStat:expored
在一定的时间内客户端没有收到服务器的通知,则认为当前的会话已经过期了,
2)disconnected 断开连接状态
3)keeperstat.syncConnected 客户端和服务器端在某个节点上建立连接,并且完成一次ersion,
zxid的同步
4)keeperstatus.authFailed 授权失败
1.1.3.2 节点状态
NodeCreated 当节点被创建时触发
NodeChildrenChanged 标识子节点被创建,被删除,子节点数据发生变化
NodeDataChanged 节点数据发生改变
NodeDeleted 节点被删除
None 当客户端和服务器连接状态发生变化的时候,事件类型就是None
1.2 zkclient
1.2.1 引入pom
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
1.2.2 增删改查实例及watcher
package com.hikvision.rabbitmq.zookeeper;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import java.util.List;
/**
* @ClassName zkClientDemo
* @Description TODO
* @Autuor lulinfeng
* @Date 2020/8/31
* @Version 1.0
*/
public class zkClientDemo {
private final static String CONNECTSTRING = "192.168.48.128:2181,192.168.48.130:2181,"
+ "192.168.48.131:2181,192.168.48.132:2181";
private static ZkClient getInstance() {
return new ZkClient(CONNECTSTRING, 5000);
}
public static void main(String[] args) {
//建立连接
ZkClient zkClient = getInstance();
//创建临时节点
//提供了递归创建父节点的功能【持久节点】
zkClient.createEphemeral("/zkclient");
zkClient.createPersistent("/llf1/llf2/llf3", true);
//获取子节点
List<String> list = zkClient.getChildren("/llf1");
//watcher机制
zkClient.subscribeDataChanges("/llf1", new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
System.out.println("node" + s + "change :" + o);
}
@Override
public void handleDataDeleted(String s) throws Exception {
System.out.println("node delete :" + s);
}
});
//改数据
zkClient.writeData("/llf1","222");
//删除节点
zkClient.delete("/zkclient");
//删除递归子节点
zkClient.deleteRecursive("/llf1");
}
}
1.3 curator
Curator本身是Netfilx公司开源的zookeeper客户端
curator提供了各种应用场景的实现封装
curator-framework 提供了fluent风格的api
curator-replice 提供了实现封装
1.3.1 引入pom
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.1.0</version>
</dependency>
1.3.2 创建连接【两种方式】
public static CuratorFramework getInstance() {
curatorFramework = CuratorFrameworkFactory.newClient(CONNECTSTRING, 5000,
5000, new ExponentialBackoffRetry(1000, 3));
curatorFramework.start();
return curatorFramework;
}
fluent风格
public static CuratorFramework getFluentInstance() {
curatorFramework = CuratorFrameworkFactory.builder()
.connectString(CONNECTSTRING).sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("/curator").build();
curatorFramework.start();
return curatorFramework;
}
1.3.3 增删改查及异步及事务
package com.hikvision.rabbitmq.zookeeper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import java.util.Collection;
import java.util.concurrent.*;
/**
* @ClassName CuratorDemo
* @Description TODO
* @Autuor lulinfeng
* @Date 2020/8/31
* @Version 1.0
*/
public class CuratorDemo {
private static CuratorFramework curatorFramework;
private static final String CONNECTSTRING = "192.168.48.128:2181,192.168.48.130:2181,"
+ "192.168.48.131:2181,192.168.48.132:2181";
public static CuratorFramework getInstance() {
curatorFramework = CuratorFrameworkFactory.newClient(CONNECTSTRING, 5000,
5000, new ExponentialBackoffRetry(1000, 3));
curatorFramework.start();
return curatorFramework;
}
public static CuratorFramework getFluentInstance() {
curatorFramework = CuratorFrameworkFactory.builder()
.connectString(CONNECTSTRING).sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("/curator").build();
curatorFramework.start();
return curatorFramework;
}
public static void main(String[] args) {
CuratorFramework curatorFramework = getInstance();
System.out.println("connect success...");
try {
//创建节点
curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
.forPath("/12/32/421","123456".getBytes());
//删除节点,默认情况下 version 为-1
curatorFramework.delete().forPath("/12");
//获取数据
Stat stat=new Stat();
byte [] data=curatorFramework.getData().storingStatIn(stat).forPath("/llf");
String result=new String(data);
//更新
Stat updateStat=curatorFramework.setData().forPath("/llf","12".getBytes());
// 异步操作
ExecutorService service = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
curatorFramework.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT)
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework arg0, CuratorEvent arg1) throws Exception {
System.out.println(Thread.currentThread().getName() + "-->" + arg1.getResultCode());
}
}, service).forPath("/Allen", "allen".getBytes());
// 事务【curator独有的】
Collection<CuratorTransactionResult> transresult=curatorFramework.inTransaction().create().forPath("/trans", "111".getBytes()).and().setData()
.forPath("/lll", "555".getBytes()).and().commit();
System.out.println("success");
} catch (Exception e) {
e.printStackTrace();
}
}
}
1.3.4 事件处理
package com.hikvision.rabbitmq.zookeeper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import java.util.Collection;
import java.util.concurrent.*;
/**
* @ClassName CuratorDemo
* @Description TODO
* @Autuor lulinfeng
* @Date 2020/8/31
* @Version 1.0
*/
public class CuratorDemo {
private static CuratorFramework curatorFramework;
private static final String CONNECTSTRING = "192.168.48.128:2181,192.168.48.130:2181,"
+ "192.168.48.131:2181,192.168.48.132:2181";
public static CuratorFramework getInstance() {
curatorFramework = CuratorFrameworkFactory.newClient(CONNECTSTRING, 5000,
5000, new ExponentialBackoffRetry(1000, 3));
curatorFramework.start();
return curatorFramework;
}
public static CuratorFramework getFluentInstance() {
curatorFramework = CuratorFrameworkFactory.builder()
.connectString(CONNECTSTRING).sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("/curator").build();
curatorFramework.start();
return curatorFramework;
}
public static void main(String[] args) {
CuratorFramework curatorFramework = getInstance();
System.out.println("connect success...");
try {
//System.out.println("连接成功。。。");
// curator事件处理
// 三种watcher来做节点监听
// pathcache 监视一个路径下子节点的创建,删除节点,数据更新
// NodeCache 监视一个节点的创建,更新,删除
// TreeCache pathcache和nodecache的结合(监视路径下的创建更新和删除事件),
// 缓存路径下所有子节点的数据
//node变化NodeCache
// NodeCache cache = new NodeCache(curatorFramework, "/lulf", false);
// cache.start(true);
// cache.getListenable()
// .addListener(() -> System.out.println("节点数据发生变化" + new String(cache.getCurrentData().getData())));
// curatorFramework.setData().forPath("/lulf", "gbcq".getBytes());
//pathcache
PathChildrenCache cache2 = new PathChildrenCache(curatorFramework, "/event", false);
cache2.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
cache2.getListenable().addListener((curatorFramework1, PathChildrenCacheEvent) -> {
switch (PathChildrenCacheEvent.getType()) {
case CHILD_ADDED:
System.out.println("增加子节点");
break;
case CHILD_REMOVED:
System.out.println("删除子节点");
break;
case CHILD_UPDATED:
System.out.println("更新子节点");
break;
default:
break;
}
});
curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/event", "1".getBytes());
TimeUnit.SECONDS.sleep(1);
curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath("/event/event1", "1.1".getBytes());
TimeUnit.SECONDS.sleep(1);
curatorFramework.setData().forPath("/event/event1", "1.2".getBytes());
curatorFramework.delete().forPath("/event/event1");
System.in.read();
} catch (Exception e) {
e.printStackTrace();
}
}
}
2. 典型应用场景
2.1 zookeeper能够实现哪些场景
2.1.1 订阅发布/配置中心
watcher机制
统一配置管理(disconf)
实现配置信息的集中式原理和数据的动态更新
实现配置中心有俩种模式:push,pull
长轮询
zookeeper采用的是推拉相结合的方式。客户端向服务器端注册自己需要关注的节点。一旦节点数据发生变化,name服务器端会向客户端发送watcher事件通知。客户端收到通知后,主动到服务器端获取更新后的数据。
a 数据量比较小
b 数据内容在运行时发生动态变更
c 集群中的各个机器共享变量
2.1.2 分布式锁
分布式锁的三种实现手段
1) 数据库 创建一个表,通过唯一索引的方式
create table(id,methodname…) methodname增加唯一索引
insert 一条数据 xxx delete 删除数据
mysql 有innodb来设置表锁或者行锁
2)redis setNX 存在则会返回0 不存在则返回数据
3)zookeeper 有序节点
共享锁(读锁)
2.1.2.1 分布式锁代码实现
首先定义ZK的连接客户端类
package com.hikvision.rabbitmq.zookeeper.lock;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
/**
* @ClassName ZookeeperClient
* @Description TODO
* @Autuor lulinfeng
* @Date 2020/9/2
* @Version 1.0
*/
public class ZookeeperClient {
private final static String CONNECTSTRING = "192.168.48.128:2181,192.168.48.130:2181,"
+ "192.168.48.131:2181,192.168.48.132:2181";
private static int sessionTimeOut = 5000;
//获取连接
public static ZooKeeper getInstance() throws IOException, InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
ZooKeeper zooKeeper = new ZooKeeper(CONNECTSTRING, sessionTimeOut, new Watcher() {
@Override
public void process(WatchedEvent paramWatchedEvent) {
if (paramWatchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
}
});
countDownLatch.await();
return zooKeeper;
}
public static int getSessionTimeOut() {
return sessionTimeOut;
}
}
定义watcher
通过CountDownLatch来控制
package com.hikvision.rabbitmq.zookeeper.lock;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import java.util.concurrent.CountDownLatch;
/**
* @ClassName LockWatcher
* @Description TODO
* @Autuor lulinfeng
* @Date 2020/9/2
* @Version 1.0
*/
public class LockWatcher implements Watcher {
private CountDownLatch latch;
public LockWatcher(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getType() == Event.EventType.NodeDeleted) {
latch.countDown();
}
}
}
书写lock和unlock方法
package com.hikvision.rabbitmq.zookeeper.lock;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
/**
* @ClassName DistributeLock
* @Description TODO
* @Autuor lulinfeng
* @Date 2020/9/2
* @Version 1.0
*/
public class DistributeLock {
private static final String ROOT_LOCKS = "/LOCKS"; // 根节点
private ZooKeeper zooKeeper;
private int sessionTimeOut = 5000;// 会话超时时间
private String lockID;// 记录锁节点ID
private CountDownLatch countDownLatch = new CountDownLatch(1);
private final static byte[] data = {1, 2};
public DistributeLock() throws IOException, InterruptedException {
this.zooKeeper = ZookeeperClient.getInstance();
this.sessionTimeOut = ZookeeperClient.getSessionTimeOut();
}
// 获取锁的方法
public boolean lock() {
try {
lockID = zooKeeper.create(ROOT_LOCKS + "/", data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(Thread.currentThread().getName() + "->成功创建lock节点[" + lockID + "]开始去竞争锁");
List<String> childrenNodes = zooKeeper.getChildren(ROOT_LOCKS, true);
// 排序从小到大
SortedSet<String> sortedSet = new TreeSet<String>();
for (String children : childrenNodes) {
sortedSet.add(ROOT_LOCKS + "/" + children);
}
String first = sortedSet.first();// 拿到最小的节点
if (lockID.equals(first)) {
// 表示当前就是最小的
System.out.println(Thread.currentThread().getName() + "success get lock,lock节点[" + lockID + "]");
return true;
}
SortedSet<String> lessThanLockID = sortedSet.headSet(lockID);
if (!lessThanLockID.isEmpty()) {
String preLockId = lessThanLockID.last(); // 拿到比当前lockid这个节点更小的上一个节点
zooKeeper.exists(preLockId, new LockWatcher(countDownLatch));
countDownLatch.await(sessionTimeOut, TimeUnit.MILLISECONDS);
// 上面这段代码意味着如果会话超时或者节点被删除
System.out.println(Thread.currentThread().getName() + "成功获取锁,lock节点[" + lockID + "]");
}
return true;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
// 释放锁的方法
public boolean unlock() {
System.out.println(Thread.currentThread().getName() + "-->开始释放锁:[" + lockID + "]");
try {
zooKeeper.delete(lockID, -1);
System.out.println("节点[" + lockID + "]被成功删除");
return true;
} catch (Exception e) {
e.getStackTrace().toString();
}
return false;
}
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(10);
Random random = new Random();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
DistributeLock lock = null;
try {
lock = new DistributeLock();
latch.countDown();
latch.await();
lock.lock();
Thread.sleep(random.nextInt(500));
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}).start();
}
}
}
结果如下:
2.1.3 负载均衡
请求/数据 分摊到多个计算单元
2.1.4 ID生成器
利用生成的有序节点
2.1.5 分布式队列
2.1.5.1 先进先出队列
getchildren获取指定根节点下面的子节点
确定自己节点在子节点中的顺序
如果自己不是最小的子节点,监听比自己小的上一个子节点,否则处于等待 接受watcher通知,重复流程。
2.1.5.2 Barrier模式
也是阻碍模式,围栏模式,满足条件才会触发
2.1.6 统一命名服务
2.1.7 master选举
7*24小时可用 99.999%可用
master-slave模式
master出现故障 slave上位作为master 心跳机制去维持状态 脑裂
单机版选主
用户中心
package com.hikvision.rabbitmq.zookeeper.leader;
import java.io.Serializable;
/**
* @ClassName UserCenter
* @Description TODO
* @Autuor lulinfeng
* @Date 2020/9/2
* @Version 1.0
*/
public class UserCenter implements Serializable {
/**
*
*/
private static final long serialVersionUID = -4060228979536051295L;
private int m_id;//机器信息
private String mc_name; //机器名称
public int getM_id() {
return m_id;
}
public void setM_id(int m_id) {
this.m_id = m_id;
}
public String getMc_name() {
return mc_name;
}
public void setMc_name(String mc_name) {
this.mc_name = mc_name;
}
}
具体选主
package com.hikvision.rabbitmq.zookeeper.leader;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import static java.util.concurrent.Executors.*;
/**
* 选主服务
*
* @author lulf
*
*/
public class MasterSelector {
private ZkClient client;
private final static String MASTER_PATH = "/master";// 需要争抢的节点
private IZkDataListener dataListener;// 注册节点内容发生变化
private UserCenter server; // 其他服务器
private UserCenter master; // master节点
private static boolean isrunning = false;
ScheduledExecutorService scheduledExecutorService = newScheduledThreadPool(1);
public MasterSelector(UserCenter server,ZkClient client) {
this.server = server;
this.client=client;
this.dataListener = new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
// 节点如果被删除,发起一个选主操作
chooseMaster();
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
}
};
}
public void start() {
// 开始选举
if(!isrunning){
isrunning=true;
client.subscribeDataChanges(MASTER_PATH, dataListener);//注冊节点时间
chooseMaster();
}
}
public void stop() {
// 停止
if(isrunning){
isrunning=false;
scheduledExecutorService.shutdown();
client.unsubscribeDataChanges(MASTER_PATH, dataListener);//取消订阅
releaseMaster();
}
}
// 具体选主的服务
private void chooseMaster() {
if (!isrunning) {
System.out.println("当前服务没有启动。。。");
return;
}
try {
client.createEphemeral(MASTER_PATH, server);
master = server;// 把server节点赋值给master
System.out.println(master.getMc_name() + "-->我已经是master,开始领导你们");
// 定时器
// master释放锁,出现故障
scheduledExecutorService.schedule(() -> {
releaseMaster();
}, 5, TimeUnit.SECONDS);//每5秒释放一次锁
} catch (ZkNodeExistsException e) {
e.getStackTrace().toString();
//表示master已经存在
UserCenter userCenter=client.readData(MASTER_PATH, true);
if(userCenter==null){
chooseMaster();//再次获取master
}else{
master=userCenter;
}
}
}
private void releaseMaster() {
// 释放锁(故障模拟)
//判断当前是否是master,只有master才需要释放锁
if(checkMaster()){
client.delete(MASTER_PATH, -1);//删除
}
}
private boolean checkMaster() {
// 判断当前的server是否是master
UserCenter userCenter=client.readData(MASTER_PATH);
if(userCenter.getMc_name().equals(server.getMc_name())){
master=userCenter;
return true;
}
return false;
}
}
测试选主
package com.hikvision.rabbitmq.zookeeper.leader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
public class MasterChooseDemo {
private final static String CONNECTSTRING = "192.168.48.128:2181,192.168.48.130:2181,"
+ "192.168.48.131:2181,192.168.48.132:2181";
public static void main(String[] args) {
List<MasterSelector>selectorList=new ArrayList<MasterSelector>();
try {
for (int i = 0; i < 10; i++) {
ZkClient zkClient=new ZkClient(CONNECTSTRING, 5000,5000,new SerializableSerializer());
UserCenter userCenter=new UserCenter();
userCenter.setM_id(i);
userCenter.setMc_name("lulf_"+i);
MasterSelector selector=new MasterSelector(userCenter,zkClient);
selectorList.add(selector);
selector.start();//触发选举操作
TimeUnit.SECONDS.sleep(4);
}
} catch (Exception e) {
e.getStackTrace().toString();
}finally {
for (MasterSelector masterSelector : selectorList) {
masterSelector.stop();
}
}
}
}
更多推荐
所有评论(0)