zookeeper的客户端比较常用的有zkClient和apache的curator,curator的fluent风格很是令人讨喜,用的比较多,查看了curator实现分布式锁的原理之后,希望根据自己的理解,通过zkClient api的方式实现分布式锁。

大概算了下,175行代码,算上api的练习,一共用了三个小时,感觉自己好像个憨憨~

代码

ZkClientDistribute:

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/**
 * @description:
 * @author: Leesin Dong
 * @date: Created in 2020/4/14 21:49
 * @modified By:
 */
public class ZkClientDistribute implements Lock {
    //根节点,命名空间
    private String basePath;

    ZkClient zkClient;

    //第几个进来的?
    private static AtomicInteger count = new AtomicInteger(-1);

    Map<Thread, LockData> chm = new ConcurrentHashMap<>();

    public void lock() {
        //根据顺序命名
        int currentTime = count.incrementAndGet();
		//省去了可重入锁的逻辑
        String childPath = basePath + "/" + String.valueOf(currentTime);
        LockData lockData = new LockData(Thread.currentThread(), currentTime);
        chm.put(Thread.currentThread(), lockData);
        //创建节点
        String sucessPath = getsLockLoop(lockData, childPath);
        System.out.println(basePath+"/"+sucessPath + "获得锁成功了!");
    }

    private String getsLockLoop(LockData lockData, String childPath) {
        Boolean hasLock = false;
        createsTheLock(childPath);
        String successPath;
        while (!hasLock) {
            Result result = getsTheLock(lockData, 1, basePath);
            if (result.getsTheLock) {
                hasLock = true;
                // System.out.println(lockData.currentTime + "获得了锁");
                break;
            } else {
                //添加监听
                //监听节点
                Object lock = new Object();
                watcher(result.pathToWatch, lock);
                synchronized (lock) {
                    try {
                        //阻塞
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        successPath = String.valueOf(lockData.currentTime);
        return successPath;
    }

    private void watcher(String pathToWatch, Object o) {
        zkClient.subscribeDataChanges(pathToWatch, new IZkDataListener() {
            // 得到删除节点的path
            @Override
            public void handleDataDeleted(String path) throws Exception {
                System.out.println(">>>监听到删除的节点为:" + path);
                new Thread(() -> {
                    synchronized (o) {
                        o.notify();
                    }
                }).start();
            }

            // 得到变更的节点和变更的内容
            @Override
            public void handleDataChange(String path, Object data) throws Exception {
                // notifyAll();
                System.out.println(">>>监听到变更的节点为:" + path + ", 变更内容为:" + data);
            }
        });

    }

    //创建临时节点
    private void createsTheLock(String path) {
        //递归创建子节点(此时父节点并不存在)
        zkClient.createPersistent(path, true);
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(path + "创建成功了");
        // System.out.println("目前子节点:" + zkClient.getChildren(basePath));
    }

    public ZkClientDistribute(String basePath, ZkClient zkClient) {
        this.basePath = basePath;
        this.zkClient = zkClient;
        zkClient.createPersistent(basePath, true);
    }

    public void lockInterruptibly() throws InterruptedException {

    }

    public boolean tryLock() {
        return false;
    }

    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    public void unlock() {
        LockData lockData = chm.get(Thread.currentThread());
        //递归删除
        System.out.println("准备删除" + basePath+"/"+lockData.currentTime);
        //删除第一个
        zkClient.delete(basePath + "/" + lockData.currentTime);
    }

    public Condition newCondition() {
        return null;
    }

    public Result getsTheLock(LockData lockData, int maxLeases, String path) {
        Boolean getsTheLock = getCurrentIndex(lockData) < maxLeases;
        //获得全部子节点
        //pathToWatch 是前一个节点
        String pathToWatch = "";
        pathToWatch = getsTheLock ? null : basePath + "/" + (getCurrentIndex(lockData) - 1);
        // System.out.println("getCurrentIndex(lockData) "+getCurrentIndex(lockData)+" pathToWatch "+pathToWatch);
        return new Result(getsTheLock, pathToWatch);
    }

    private Integer getCurrentIndex(LockData lockData) {
        List<String> children = zkClient.getChildren(basePath);
        if (children.contains(String.valueOf(lockData.currentTime))) {
            int index = children.indexOf(String.valueOf(lockData.currentTime));
            return index;
        }
        return 100;
    }

    private static class LockData {
        final Thread owningThread;
        final int currentTime;

        private LockData(Thread owningThread, int currentTime) {
            this.owningThread = owningThread;
            this.currentTime = currentTime;
        }
    }

    private class Result {
        final Boolean getsTheLock;
        final String pathToWatch;

        private Result(Boolean getsTheLock, String pathToWatch) {
            this.getsTheLock = getsTheLock;
            this.pathToWatch = pathToWatch;
        }
    }
}

测试类:

import org.I0Itec.zkclient.ZkClient;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * @description:
 * @author: Leesin Dong
 * @date: Created in 2020/4/14 21:48
 * @modified By:
 */
public class App {
    private static String CONNECTION_STR = "192.168.8.111:2181,192.168.8.112:2181,192.168.8.113:2181";

    public static void main(String[] args)  {
        //建立连接
        ZkClient zkClient = new ZkClient(CONNECTION_STR, 50000);
        ZkClientDistribute zcd = new ZkClientDistribute("/leesin", zkClient);
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                zcd.lock();
                try {
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                zcd.unlock();
            },"Thread-"+i).start();
        }
    }
}

结果

在这里插入图片描述

zkClient API练习

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;

import java.util.List;

/**
 * @description:先简单操作一把,感受一下API,给下面做准备
 * @author: Leesin Dong
 * @date: Created in 2020/4/14 21:53
 * @modified By:
 */
public class ZKClientTest {
    private static String CONNECTION_STR = "192.168.8.111:2181,192.168.8.112:2181,192.168.8.113:2181";

    public static void main(String[] args) throws InterruptedException {
        //建立连接
        ZkClient zkClient = new ZkClient(CONNECTION_STR, 50000);

        String path = "/zkclient";

        //子节点监听
        zkClient.subscribeChildChanges(path, new IZkChildListener() {
            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                System.out.println("路径"+parentPath+"下面的子节点变更,子节点为:"+currentChilds);
            }
        });

        //监听节点
        zkClient.subscribeDataChanges(path, new IZkDataListener() {
            public void handleDataChange(String dataPath, Object data) throws Exception {
                System.out.println("路径"+dataPath+"数据变成了"+data);
            }

            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println("路径"+dataPath+"已经被删除了");
            }

        });




        //递归创建子节点(此时父节点并不存在)
        zkClient.createPersistent("/zkclient/al",true);
        Thread.sleep(5000);
        System.out.println(zkClient.getChildren(path));

        //递归删除
        zkClient.delete("/zkclient");
        System.out.println("删除成功");

        //获得子节点
        List<String> children = zkClient.getChildren("/leesin");
        for (int i = 0; i < children.size(); i++) {
            System.out.println(children.get(i));
        }

    }
}
Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐