1. javaAPI使用

1.1 原生API

1.1.1 引入pom

<dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.8</version>
        </dependency>

1.1.2 建立连接

package com.lulf.javaapi;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

public class CreateSessionDemo {

	private final static String CONNECTSTRING = "192.168.48.128:2181,192.168.48.130:2181,"
			+ "192.168.48.131:2181,192.168.48.132:2181";
	private static CountDownLatch countDownLatch=new CountDownLatch(1);
	public static void main(String[] args) throws IOException, InterruptedException {
		ZooKeeper zooKeeper = new ZooKeeper(CONNECTSTRING, 5000, new Watcher() {
			
			@Override
			public void process(WatchedEvent watchedEvent) {
				//如果当前连接状态是连接成功的,通过计数器去控制
				if(watchedEvent.getState()==Event.KeeperState.SyncConnected){
					countDownLatch.countDown();
					System.out.println(watchedEvent.getState());
				}
			}
		});
		countDownLatch.await();
		System.out.println("test  "+zooKeeper.getState());
	}
}

1.1.3 增删改查节点

        //创建节点
        String nodePath = "/lulfZkDemo1";
        String msg = "12345";
        byte[] data = msg.getBytes();
        //权限控制 ANYONE_ID_UNSAFE 就是world :开放式的权限控制模式 访问权限对所有用户开放 world:anyone
        //super :超级用户 可以对zookeeper上的数据节点进行操作
        List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
        //配置节点类型,持久 持久有序 临时 临时有序
        CreateMode createMode = CreateMode.EPHEMERAL;
        try {
            String result = zooKeeper.create(nodePath, data, acl, createMode);
            System.out.println("创建成功" + result);
        } catch (KeeperException e) {
            e.printStackTrace();
        }

增删改查demo 补上watcher机制

package com.hikvision.rabbitmq.zookeeper;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * @ClassName createNodeDemo
 * @Description TODO
 * @Autuor lulinfeng
 * @Date 2020/8/31
 * @Version 1.0
 */
public class createNodeDemo implements Watcher {

    private final static String CONNECTSTRING = "192.168.48.128:2181,192.168.48.130:2181,"
			+ "192.168.48.131:2181,192.168.48.132:2181";
    private static CountDownLatch countDownLatch = new CountDownLatch(1);
    private static ZooKeeper zooKeeper = null;
    private static Stat stat = new Stat();

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        zooKeeper = new ZooKeeper(CONNECTSTRING, 5000, new createNodeDemo());
        countDownLatch.await();
        System.out.println("test  " + zooKeeper.getState());
        // 创建节点
        String path = "/Lulfdemo2";
        byte[] data = "1234".getBytes();
        List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
        CreateMode createMode = CreateMode.EPHEMERAL;
        String result = zooKeeper.create(path, data, acl, createMode);
        zooKeeper.getData(path, true, stat);// 增加一个watcher
        System.out.println("创建成功  " + result);

        // 修改数据 -1代表不用管他版本号
        zooKeeper.setData(path, "311".getBytes(), -1);

        //删除节点
        zooKeeper.delete(path, -1);
        Thread.sleep(2000);

        //创建节点和子节点【这边要注意只有持久化节点才有子节点】
        zooKeeper.create("/LULF", "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        TimeUnit.SECONDS.sleep(1);
        zooKeeper.create("/LULF/lulf1-1", "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        TimeUnit.SECONDS.sleep(1);
    }


    @Override
    public void process(WatchedEvent watchedEvent) {
        // 如果当前连接状态是连接成功的,通过计数器去控制
        if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
            if (Event.EventType.None == watchedEvent.getType() || null == watchedEvent.getType()) {
                countDownLatch.countDown();
                System.out.println(watchedEvent.getState());
            } else if (watchedEvent.getType() == Event.EventType.NodeDataChanged) {
                try {
                    System.out.println("路径 " + watchedEvent.getPath() + "改变后的值 "
                            + zooKeeper.getData(watchedEvent.getPath(), true, new Stat()));
                } catch (KeeperException | InterruptedException e) {
                    e.printStackTrace();
                }
            } else if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {

            } else if (watchedEvent.getType() == Event.EventType.NodeCreated) {
                try {
                    System.out.println("路径 " + watchedEvent.getPath() + "节点的值 "
                            + zooKeeper.getData(watchedEvent.getPath(), true, new Stat()));
                } catch (KeeperException | InterruptedException e) {
                    e.printStackTrace();
                }
            } else if (watchedEvent.getType() == Event.EventType.NodeDeleted) {
                System.out.println("删除路径 " + watchedEvent.getPath());
            }
        }
    }
}
1.1.3.1 watcher 连接状态

1)keeperStat:expored
在一定的时间内客户端没有收到服务器的通知,则认为当前的会话已经过期了,
2)disconnected 断开连接状态
3)keeperstat.syncConnected 客户端和服务器端在某个节点上建立连接,并且完成一次ersion,
zxid的同步
4)keeperstatus.authFailed 授权失败

1.1.3.2 节点状态

NodeCreated 当节点被创建时触发
NodeChildrenChanged 标识子节点被创建,被删除,子节点数据发生变化
NodeDataChanged 节点数据发生改变
NodeDeleted 节点被删除
None 当客户端和服务器连接状态发生变化的时候,事件类型就是None

1.2 zkclient

1.2.1 引入pom

<dependency>
      <groupId>com.101tec</groupId>
      <artifactId>zkclient</artifactId>
      <version>0.10</version>
    </dependency>

1.2.2 增删改查实例及watcher

package com.hikvision.rabbitmq.zookeeper;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;

import java.util.List;

/**
 * @ClassName zkClientDemo
 * @Description TODO
 * @Autuor lulinfeng
 * @Date 2020/8/31
 * @Version 1.0
 */
public class zkClientDemo {
    private final static String CONNECTSTRING = "192.168.48.128:2181,192.168.48.130:2181,"
			+ "192.168.48.131:2181,192.168.48.132:2181";

    private static ZkClient getInstance() {
        return new ZkClient(CONNECTSTRING, 5000);
    }

    public static void main(String[] args) {
        //建立连接
        ZkClient zkClient = getInstance();
        //创建临时节点
        //提供了递归创建父节点的功能【持久节点】
        zkClient.createEphemeral("/zkclient");
        zkClient.createPersistent("/llf1/llf2/llf3", true);

        //获取子节点
        List<String> list = zkClient.getChildren("/llf1");

        //watcher机制
        zkClient.subscribeDataChanges("/llf1", new IZkDataListener() {
            @Override
            public void handleDataChange(String s, Object o) throws Exception {
                System.out.println("node" + s + "change :" + o);
            }

            @Override
            public void handleDataDeleted(String s) throws Exception {
                System.out.println("node delete :" + s);
            }
        });
        //改数据
        zkClient.writeData("/llf1","222");

        //删除节点
        zkClient.delete("/zkclient");
        //删除递归子节点
        zkClient.deleteRecursive("/llf1");
    }
}

1.3 curator

Curator本身是Netfilx公司开源的zookeeper客户端
curator提供了各种应用场景的实现封装

curator-framework 提供了fluent风格的api
curator-replice 提供了实现封装

1.3.1 引入pom

<dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.1.0</version>
        </dependency>

1.3.2 创建连接【两种方式】

public static CuratorFramework getInstance() {
        curatorFramework = CuratorFrameworkFactory.newClient(CONNECTSTRING, 5000,
                5000, new ExponentialBackoffRetry(1000, 3));
        curatorFramework.start();
        return curatorFramework;
    }

fluent风格

public static CuratorFramework getFluentInstance() {
        curatorFramework = CuratorFrameworkFactory.builder()
                .connectString(CONNECTSTRING).sessionTimeoutMs(5000)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .namespace("/curator").build();
        curatorFramework.start();
        return curatorFramework;
    }

1.3.3 增删改查及异步及事务

package com.hikvision.rabbitmq.zookeeper;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

import java.util.Collection;
import java.util.concurrent.*;

/**
 * @ClassName CuratorDemo
 * @Description TODO
 * @Autuor lulinfeng
 * @Date 2020/8/31
 * @Version 1.0
 */
public class CuratorDemo {

    private static CuratorFramework curatorFramework;

    private static final String CONNECTSTRING = "192.168.48.128:2181,192.168.48.130:2181,"
			+ "192.168.48.131:2181,192.168.48.132:2181";

    public static CuratorFramework getInstance() {
        curatorFramework = CuratorFrameworkFactory.newClient(CONNECTSTRING, 5000,
                5000, new ExponentialBackoffRetry(1000, 3));
        curatorFramework.start();
        return curatorFramework;
    }

    public static CuratorFramework getFluentInstance() {
        curatorFramework = CuratorFrameworkFactory.builder()
                .connectString(CONNECTSTRING).sessionTimeoutMs(5000)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .namespace("/curator").build();
        curatorFramework.start();
        return curatorFramework;
    }

    public static void main(String[] args) {
        CuratorFramework curatorFramework = getInstance();
        System.out.println("connect success...");
        try {
            //创建节点
            curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
                    .forPath("/12/32/421","123456".getBytes());
            //删除节点,默认情况下 version 为-1
            curatorFramework.delete().forPath("/12");

            //获取数据
            Stat stat=new Stat();
            byte [] data=curatorFramework.getData().storingStatIn(stat).forPath("/llf");
            String result=new String(data);

            //更新
            Stat updateStat=curatorFramework.setData().forPath("/llf","12".getBytes());

            // 异步操作
            ExecutorService service = new ThreadPoolExecutor(1, 1,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>());
            curatorFramework.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT)
                    .inBackground(new BackgroundCallback() {
                        @Override
                        public void processResult(CuratorFramework arg0, CuratorEvent arg1) throws Exception {
                            System.out.println(Thread.currentThread().getName() + "-->" + arg1.getResultCode());
                        }
                    }, service).forPath("/Allen", "allen".getBytes());
            // 事务【curator独有的】
            Collection<CuratorTransactionResult> transresult=curatorFramework.inTransaction().create().forPath("/trans", "111".getBytes()).and().setData()
                    .forPath("/lll", "555".getBytes()).and().commit();

            System.out.println("success");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

1.3.4 事件处理

package com.hikvision.rabbitmq.zookeeper;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

import java.util.Collection;
import java.util.concurrent.*;

/**
 * @ClassName CuratorDemo
 * @Description TODO
 * @Autuor lulinfeng
 * @Date 2020/8/31
 * @Version 1.0
 */
public class CuratorDemo {

    private static CuratorFramework curatorFramework;

    private static final String CONNECTSTRING = "192.168.48.128:2181,192.168.48.130:2181,"
			+ "192.168.48.131:2181,192.168.48.132:2181";

    public static CuratorFramework getInstance() {
        curatorFramework = CuratorFrameworkFactory.newClient(CONNECTSTRING, 5000,
                5000, new ExponentialBackoffRetry(1000, 3));
        curatorFramework.start();
        return curatorFramework;
    }

    public static CuratorFramework getFluentInstance() {
        curatorFramework = CuratorFrameworkFactory.builder()
                .connectString(CONNECTSTRING).sessionTimeoutMs(5000)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .namespace("/curator").build();
        curatorFramework.start();
        return curatorFramework;
    }

    public static void main(String[] args) {
        CuratorFramework curatorFramework = getInstance();
        System.out.println("connect success...");
        try {
            //System.out.println("连接成功。。。");
            // curator事件处理
            // 三种watcher来做节点监听
            // pathcache 监视一个路径下子节点的创建,删除节点,数据更新
            // NodeCache 监视一个节点的创建,更新,删除
            // TreeCache pathcache和nodecache的结合(监视路径下的创建更新和删除事件),
            // 缓存路径下所有子节点的数据

            //node变化NodeCache
//		NodeCache cache = new NodeCache(curatorFramework, "/lulf", false);
//		cache.start(true);
//		cache.getListenable()
//				.addListener(() -> System.out.println("节点数据发生变化" + new String(cache.getCurrentData().getData())));
//		curatorFramework.setData().forPath("/lulf", "gbcq".getBytes());
//pathcache
            PathChildrenCache cache2 = new PathChildrenCache(curatorFramework, "/event", false);
            cache2.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
            cache2.getListenable().addListener((curatorFramework1, PathChildrenCacheEvent) -> {
                switch (PathChildrenCacheEvent.getType()) {
                    case CHILD_ADDED:
                        System.out.println("增加子节点");
                        break;
                    case CHILD_REMOVED:
                        System.out.println("删除子节点");
                        break;
                    case CHILD_UPDATED:
                        System.out.println("更新子节点");
                        break;
                    default:
                        break;
                }
            });
            curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/event", "1".getBytes());
            TimeUnit.SECONDS.sleep(1);
            curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath("/event/event1", "1.1".getBytes());
            TimeUnit.SECONDS.sleep(1);
            curatorFramework.setData().forPath("/event/event1", "1.2".getBytes());
            curatorFramework.delete().forPath("/event/event1");
            System.in.read();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2. 典型应用场景

2.1 zookeeper能够实现哪些场景

2.1.1 订阅发布/配置中心

watcher机制
统一配置管理(disconf)
实现配置信息的集中式原理和数据的动态更新
实现配置中心有俩种模式:push,pull
长轮询
zookeeper采用的是推拉相结合的方式。客户端向服务器端注册自己需要关注的节点。一旦节点数据发生变化,name服务器端会向客户端发送watcher事件通知。客户端收到通知后,主动到服务器端获取更新后的数据。
a 数据量比较小
b 数据内容在运行时发生动态变更
c 集群中的各个机器共享变量
在这里插入图片描述

2.1.2 分布式锁

分布式锁的三种实现手段
1) 数据库 创建一个表,通过唯一索引的方式
create table(id,methodname…) methodname增加唯一索引
insert 一条数据 xxx delete 删除数据
mysql 有innodb来设置表锁或者行锁
2)redis setNX 存在则会返回0 不存在则返回数据
3)zookeeper 有序节点
在这里插入图片描述
共享锁(读锁)
在这里插入图片描述

2.1.2.1 分布式锁代码实现

首先定义ZK的连接客户端类

package com.hikvision.rabbitmq.zookeeper.lock;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * @ClassName ZookeeperClient
 * @Description TODO
 * @Autuor lulinfeng
 * @Date 2020/9/2
 * @Version 1.0
 */
public class ZookeeperClient {

    private final static String CONNECTSTRING = "192.168.48.128:2181,192.168.48.130:2181,"
			+ "192.168.48.131:2181,192.168.48.132:2181";
    private static int sessionTimeOut = 5000;

    //获取连接
    public static ZooKeeper getInstance() throws IOException, InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        ZooKeeper zooKeeper = new ZooKeeper(CONNECTSTRING, sessionTimeOut, new Watcher() {

            @Override
            public void process(WatchedEvent paramWatchedEvent) {
                if (paramWatchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
                    countDownLatch.countDown();
                }
            }
        });
        countDownLatch.await();
        return zooKeeper;
    }

    public static int getSessionTimeOut() {
        return sessionTimeOut;
    }
}

定义watcher
通过CountDownLatch来控制

package com.hikvision.rabbitmq.zookeeper.lock;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

import java.util.concurrent.CountDownLatch;

/**
 * @ClassName LockWatcher
 * @Description TODO
 * @Autuor lulinfeng
 * @Date 2020/9/2
 * @Version 1.0
 */
public class LockWatcher implements Watcher {
    private CountDownLatch latch;

    public LockWatcher(CountDownLatch latch) {
        this.latch = latch;
    }

    @Override
    public void process(WatchedEvent watchedEvent) {
        if (watchedEvent.getType() == Event.EventType.NodeDeleted) {
            latch.countDown();
        }
    }
}

书写lock和unlock方法

package com.hikvision.rabbitmq.zookeeper.lock;

import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;

/**
 * @ClassName DistributeLock
 * @Description TODO
 * @Autuor lulinfeng
 * @Date 2020/9/2
 * @Version 1.0
 */
public class DistributeLock {

    private static final String ROOT_LOCKS = "/LOCKS"; // 根节点

    private ZooKeeper zooKeeper;

    private int sessionTimeOut = 5000;// 会话超时时间

    private String lockID;// 记录锁节点ID

    private CountDownLatch countDownLatch = new CountDownLatch(1);

    private final static byte[] data = {1, 2};

    public DistributeLock() throws IOException, InterruptedException {
        this.zooKeeper = ZookeeperClient.getInstance();
        this.sessionTimeOut = ZookeeperClient.getSessionTimeOut();
    }

    // 获取锁的方法
    public boolean lock() {
        try {
            lockID = zooKeeper.create(ROOT_LOCKS + "/", data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(Thread.currentThread().getName() + "->成功创建lock节点[" + lockID + "]开始去竞争锁");
            List<String> childrenNodes = zooKeeper.getChildren(ROOT_LOCKS, true);
            // 排序从小到大
            SortedSet<String> sortedSet = new TreeSet<String>();
            for (String children : childrenNodes) {
                sortedSet.add(ROOT_LOCKS + "/" + children);
            }
            String first = sortedSet.first();// 拿到最小的节点
            if (lockID.equals(first)) {
                // 表示当前就是最小的
                System.out.println(Thread.currentThread().getName() + "success get lock,lock节点[" + lockID + "]");
                return true;
            }
            SortedSet<String> lessThanLockID = sortedSet.headSet(lockID);
            if (!lessThanLockID.isEmpty()) {
                String preLockId = lessThanLockID.last(); // 拿到比当前lockid这个节点更小的上一个节点
                zooKeeper.exists(preLockId, new LockWatcher(countDownLatch));
                countDownLatch.await(sessionTimeOut, TimeUnit.MILLISECONDS);
                // 上面这段代码意味着如果会话超时或者节点被删除
                System.out.println(Thread.currentThread().getName() + "成功获取锁,lock节点[" + lockID + "]");
            }
            return true;
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return false;
    }

    // 释放锁的方法
    public boolean unlock() {
        System.out.println(Thread.currentThread().getName() + "-->开始释放锁:[" + lockID + "]");
        try {
            zooKeeper.delete(lockID, -1);
            System.out.println("节点[" + lockID + "]被成功删除");
            return true;
        } catch (Exception e) {
            e.getStackTrace().toString();
        }
        return false;
    }

    public static void main(String[] args) {
        CountDownLatch latch = new CountDownLatch(10);
        Random random = new Random();
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                DistributeLock lock = null;
                try {
                    lock = new DistributeLock();
                    latch.countDown();
                    latch.await();
                    lock.lock();
                    Thread.sleep(random.nextInt(500));
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }).start();
        }
    }
}

结果如下:
在这里插入图片描述

2.1.3 负载均衡

请求/数据 分摊到多个计算单元

2.1.4 ID生成器

利用生成的有序节点

2.1.5 分布式队列

2.1.5.1 先进先出队列

getchildren获取指定根节点下面的子节点
确定自己节点在子节点中的顺序
如果自己不是最小的子节点,监听比自己小的上一个子节点,否则处于等待 接受watcher通知,重复流程。

2.1.5.2 Barrier模式

也是阻碍模式,围栏模式,满足条件才会触发
在这里插入图片描述

2.1.6 统一命名服务

2.1.7 master选举

7*24小时可用 99.999%可用
master-slave模式
master出现故障 slave上位作为master 心跳机制去维持状态 脑裂

单机版选主
用户中心

package com.hikvision.rabbitmq.zookeeper.leader;

import java.io.Serializable;

/**
 * @ClassName UserCenter
 * @Description TODO
 * @Autuor lulinfeng
 * @Date 2020/9/2
 * @Version 1.0
 */
public class UserCenter implements Serializable {

    /**
     *
     */
    private static final long serialVersionUID = -4060228979536051295L;
    private  int m_id;//机器信息
    private String mc_name; //机器名称
    public int getM_id() {
        return m_id;
    }
    public void setM_id(int m_id) {
        this.m_id = m_id;
    }
    public String getMc_name() {
        return mc_name;
    }
    public void setMc_name(String mc_name) {
        this.mc_name = mc_name;
    }
}

具体选主

package com.hikvision.rabbitmq.zookeeper.leader;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;

import static java.util.concurrent.Executors.*;

/**
 * 选主服务
 *
 * @author lulf
 *
 */
public class MasterSelector {
    private ZkClient client;

    private final static String MASTER_PATH = "/master";// 需要争抢的节点

    private IZkDataListener dataListener;// 注册节点内容发生变化

    private UserCenter server; // 其他服务器

    private UserCenter master; // master节点

    private static boolean isrunning = false;

    ScheduledExecutorService scheduledExecutorService = newScheduledThreadPool(1);

    public MasterSelector(UserCenter server,ZkClient client) {
        this.server = server;
        this.client=client;
        this.dataListener = new IZkDataListener() {
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                // 节点如果被删除,发起一个选主操作
                chooseMaster();
            }

            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
            }
        };
    }

    public void start() {
        // 开始选举
        if(!isrunning){
            isrunning=true;
            client.subscribeDataChanges(MASTER_PATH, dataListener);//注冊节点时间
            chooseMaster();
        }
    }

    public void stop() {
        // 停止
        if(isrunning){
            isrunning=false;
            scheduledExecutorService.shutdown();
            client.unsubscribeDataChanges(MASTER_PATH, dataListener);//取消订阅
            releaseMaster();
        }

    }

    // 具体选主的服务
    private void chooseMaster() {
        if (!isrunning) {
            System.out.println("当前服务没有启动。。。");
            return;
        }
        try {
            client.createEphemeral(MASTER_PATH, server);
            master = server;// 把server节点赋值给master
            System.out.println(master.getMc_name() + "-->我已经是master,开始领导你们");
            // 定时器
            // master释放锁,出现故障
            scheduledExecutorService.schedule(() -> {
                releaseMaster();
            }, 5, TimeUnit.SECONDS);//每5秒释放一次锁
        } catch (ZkNodeExistsException e) {
            e.getStackTrace().toString();
            //表示master已经存在
            UserCenter userCenter=client.readData(MASTER_PATH, true);
            if(userCenter==null){
                chooseMaster();//再次获取master
            }else{
                master=userCenter;
            }
        }
    }

    private void releaseMaster() {
        // 释放锁(故障模拟)
        //判断当前是否是master,只有master才需要释放锁
        if(checkMaster()){
            client.delete(MASTER_PATH, -1);//删除
        }
    }

    private boolean checkMaster() {
        // 判断当前的server是否是master
        UserCenter userCenter=client.readData(MASTER_PATH);
        if(userCenter.getMc_name().equals(server.getMc_name())){
            master=userCenter;
            return true;
        }
        return false;
    }
}

测试选主

package com.hikvision.rabbitmq.zookeeper.leader;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;

public class MasterChooseDemo {
    private final static String CONNECTSTRING = "192.168.48.128:2181,192.168.48.130:2181,"
			+ "192.168.48.131:2181,192.168.48.132:2181";

    public static void main(String[] args) {
        List<MasterSelector>selectorList=new ArrayList<MasterSelector>();
        try {
            for (int i = 0; i < 10; i++) {
                ZkClient zkClient=new ZkClient(CONNECTSTRING, 5000,5000,new SerializableSerializer());
                UserCenter userCenter=new UserCenter();
                userCenter.setM_id(i);
                userCenter.setMc_name("lulf_"+i);
                MasterSelector selector=new MasterSelector(userCenter,zkClient);
                selectorList.add(selector);
                selector.start();//触发选举操作
                TimeUnit.SECONDS.sleep(4);
            }
        } catch (Exception e) {
            e.getStackTrace().toString();
        }finally {
            for (MasterSelector masterSelector : selectorList) {
                masterSelector.stop();
            }
        }

    }
}


Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐