curator是Netflix公司开源的一个ZooKeeper客户端封装。

ZooKeeper可以被用来实现分布式锁,具体是使用“临时顺序节点”实现。

获取锁

一个分布式锁对应ZooKeeper的一个文件夹,每个需要获取这个分布式锁的客户端线程在这个文件夹下创建一个临时顺序节点,此时有两种情况:

1)创建的临时顺序节点是文件夹下的第一个节点,则认为是获取分布式锁成功。

2)创建的临时顺序节点不是文件夹下的第一个节点,则认为当前锁已经被另一个客户端线程获取,此时需要进入阻塞状态,等待节点顺序中的前一个节点释放锁的时候唤醒当前线程。

阻塞-唤醒逻辑:把文件夹下的节点顺序排列,找到当前节点的前一个节点,在前一个节点添加Watch,当前一个节点被删除时会触发Watch事件,进而唤醒当前阻塞线程。

如果前一个节点对应的客户端崩溃了,则节点对应的Watch事件也会触发,也会唤醒后一个节点对应的客户端线程,此时仍需要判断当前节点是第一个节点之后才能获取锁,否则继续进入阻塞并Watch前一个节点。

重入性

只考虑同一个客户端、同一个线程获取同一个分布式锁的可重入性,第一次获取锁成功之后,在JVM内存中的一个ConcurrentMap中存储当前线程对应的锁路径及重入次数,后面同一个线程再次获取锁时,先检查该Map中当前锁是否已被当前线程占用即可,如果已占用,则只需要递增重入次数即可。

因为重入性只考虑同一个客户端、同一个JVM、同一个线程,所以可以不用考虑判断ConcurrentMap中的Owner线程的并发问题。

释放锁

释放锁时,对应可重入分布式锁,首先重入次数减一,然后判断重入次数是否已经为0:

1)如果重入次数为0,则删除当前客户端线程对应的临时顺序节点,删除操作会触发次节点的Watch事件,如果有别的客户端线程正在阻塞等待,则会通过Watch机制唤醒。

2)如果重入次数非0,则说明还未完全释放锁,直接返回即可。

可重入分布式锁测试代码:

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorDistributeLockTest {
    public static void main(String[] args) {
        String zkAddr = "127.0.0.1:2181";
        String lockPath = "/distribute-lock";
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework cf = CuratorFrameworkFactory.builder()
                .connectString(zkAddr)
                .sessionTimeoutMs(2000)
                .retryPolicy(retryPolicy)
                .build();
        cf.start();

        InterProcessMutex lock = new InterProcessMutex(cf, lockPath);
        new Thread("thread-1"){
            @Override
            public void run() {
                process(lock);
            }
        }.start();
        new Thread("thread-2"){
            @Override
            public void run() {
                process(lock);
            }
        }.start();
    }

    private static void process(InterProcessLock lock) {
        System.out.println(Thread.currentThread().getName() + " acquire");
        try {
            lock.acquire();
            System.out.println(Thread.currentThread().getName() + " acquire success");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println(Thread.currentThread().getName() + " release");
            try {
                lock.release();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        System.out.println(Thread.currentThread().getName() + " release success");
    }
}




延伸阅读:http://www.hollischuang.com/archives/1716

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐