介绍

实现分布式锁的方式很多,zookeeper是其中一种。企业生产中一般选择Redis,业务简单可以选择基于表的分布式锁,这篇文章介绍zookeeper。

zookeeper实现分布式锁原理:
基于临时顺序节点:
  1.客户端调用create()方法创建节点,需要注意的是,这里节点的创建类型需要设置为EPHEMERAL_SEQUENTIAL。
  2.客户端调用getChildren(“locknode”)方法来获取所有已经创建的子节点。
  3.客户端获取到所有子节点path之后,如果发现自己在步骤1中创建的节点是所有节点中序号最小的,那么就认为这个客户端获得了锁。
  4.如果创建的节点不是所有节点中序号最小的,那么则监视比自己创建节点的序列号小的最大的节点,进入等待。直到下次监视的子节点变更的时候,再进行子节点的获取,判断是否获取锁。
  释放锁的过程相对比较简单,就是删除自己创建的那个子节点即可

简单代码实现

mian方法:

public class Main {
    private static int n = 500;

    private static void process() {
        System.out.println(--n);
    }

    public static void main(String[] args) {
        Runnable runnable = () -> {
            DistributedLock distributedLock = null;
            try {
                distributedLock = new DistributedLock("127.0.0.1:2181", "test1");
                distributedLock.lock();
                process();
                System.out.println(Thread.currentThread().getName() + " is running");
            } finally {
                if (distributedLock != null) {
                    distributedLock.unlock();
                }
            }
        };

        for (int i = 0; i < 10; i++) {
            Thread t = new Thread(runnable);
            t.start();
        }
    }
}

分布式锁实现类:

public class DistributedLock implements Lock, Watcher {
    private ZooKeeper zkConn = null;
    private String ROOT_LOCK = "/locks";
    private String lockName;
    private String WAIT_LOCK;
    private String CURRENT_LOCK;
    private CountDownLatch countDownLatch;
    private int sessionTimeout = 3000000;
    private List<Exception> exceptionList = new ArrayList<>();

    DistributedLock(String url, String lockName) {
        this.lockName = lockName;
        try {
            zkConn = new ZooKeeper(url, sessionTimeout, this);
            if (zkConn.exists(ROOT_LOCK, false) == null)
                zkConn.create(ROOT_LOCK, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } catch (IOException | InterruptedException | KeeperException e) {
            e.printStackTrace();
        }
    }

    public void lock() {
        if (exceptionList.size() > 0) throw new LockException(exceptionList.get(0));
        try {
            if (this.tryLock()) {
                System.out.println(Thread.currentThread().getName() + " " + lockName + " get the lock!");
            } else {
                waitForLock(WAIT_LOCK, sessionTimeout);
            }
        } catch (InterruptedException | KeeperException e) {
            e.printStackTrace();
        }
    }

    public boolean tryLock() {
        try {
            String splitStr = "_lock_";
            CURRENT_LOCK = zkConn.create(ROOT_LOCK + "/" + lockName + splitStr, new byte[0],
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(CURRENT_LOCK + " is created");

            List<String> subNodes = zkConn.getChildren(ROOT_LOCK, false);

            List<String> lockObjects = new ArrayList<>();
            for (String node : subNodes) {
                String _node = node.split(splitStr)[0];
                if (_node.equals(lockName)) lockObjects.add(node);
            }
            Collections.sort(lockObjects);
            System.out.println(Thread.currentThread().getName() + "'lock is " + CURRENT_LOCK);
            if (CURRENT_LOCK.equals(ROOT_LOCK + "/" + lockObjects.get(0))) {
                return true;
            }
            String prevNode = CURRENT_LOCK.substring(CURRENT_LOCK.lastIndexOf("/") + 1);
            int prevNodePosition = Collections.binarySearch(lockObjects, prevNode);
            WAIT_LOCK = lockObjects.get(prevNodePosition - 1);

        } catch (InterruptedException | KeeperException e) {
            e.printStackTrace();
        }
        return false;
    }

    private boolean waitForLock(String waitLock, long waitTime) throws KeeperException, InterruptedException {
        Stat stat = zkConn.exists(ROOT_LOCK + "/" + waitLock, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                if (countDownLatch != null) {
                    countDownLatch.countDown();
                }
            }
        });
        if (stat != null) {
            System.out.println(Thread.currentThread().getName() + " is waiting for " + ROOT_LOCK + "/" + waitLock);
            this.countDownLatch = new CountDownLatch(1);
            this.countDownLatch.await(waitTime, TimeUnit.MILLISECONDS);
            this.countDownLatch = null;
            System.out.println(Thread.currentThread().getName() + " get the lock ");
        }
        return true;
    }

    public void unlock() {
        try {
            System.out.println(CURRENT_LOCK+" has release the lock ");
            zkConn.delete(CURRENT_LOCK, -1);
            CURRENT_LOCK = null;
            zkConn.close();
        } catch (InterruptedException | KeeperException e) {
            e.printStackTrace();
        }
    }

    public Condition newCondition() {
        return null;
    }

    public void lockInterruptibly() {
        this.lock();
    }

    @Override
    public void process(WatchedEvent watchedEvent) {

    }


    public class LockException extends RuntimeException {
        private static final long serialVersionUID = 1L;

        LockException(String e) {
            super(e);
        }

        LockException(Exception e) {
            super(e);
        }
    }


    public boolean tryLock(long timeout, TimeUnit unit) {
        try {
            if (this.tryLock()) {
                return true;
            }
            return waitForLock(WAIT_LOCK, timeout);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
}

到此简单的一个分布式锁已经实现,以后在讲解Redis,表实现分布式锁。谢谢观看

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐