InterProcessReadWriteLock

JVM工作的可重入读/写互斥锁,使用Zookeeper来持有锁,所有JVM中使用相同锁路径的所有进程都将实现进程间临界区。这个互斥锁是公平的,每个用户都会按照请求的顺序获得互斥锁(从ZK的角度来看)。

读写锁维护一对关联的锁,一个用于只读操作,一个用于写操作。只要没有写锁,读锁可以被多个用户同时持有,而写锁是独占的。

读写锁允许从写锁降级为读锁,方法是先获取写锁,然后就可以获取读锁。但是,无法从读锁升级到写锁。

测试

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.kaven</groupId>
    <artifactId>zookeeper</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
        </dependency>
    </dependencies>
</project>

CuratorFrameworkProperties类(提供CuratorFramework需要的一些配置信息,以及创建CuratorFramework实例的方法):

package com.kaven.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorFrameworkProperties {
    // 连接地址
    public static final String CONNECT_ADDRESS = "192.168.1.3:9000";
    // 连接超时时间
    public static final int CONNECTION_TIMEOUT_MS = 40000;
    // Session超时时间
    public static final int SESSION_TIMEOUT_MS = 10000;
    // 命名空间
    public static final String NAMESPACE = "MyNamespace";
    // 重试策略
    public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3);

    public static CuratorFramework getCuratorFramework() {
        // 创建CuratorFramework实例
        CuratorFramework curator = CuratorFrameworkFactory.builder()
                .connectString(CuratorFrameworkProperties.CONNECT_ADDRESS)
                .retryPolicy(CuratorFrameworkProperties.RETRY_POLICY)
                .connectionTimeoutMs(CuratorFrameworkProperties.CONNECTION_TIMEOUT_MS)
                .sessionTimeoutMs(CuratorFrameworkProperties.SESSION_TIMEOUT_MS)
                .namespace(CuratorFrameworkProperties.NAMESPACE)
                .build();
        curator.start();
        assert curator.getState().equals(CuratorFrameworkState.STARTED);
        return curator;
    }
}

InterProcessReadWriteLockRunnable类(实现了Runnable接口,模拟分布式节点获取分布式锁):

package com.kaven.zookeeper;

import lombok.SneakyThrows;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;

import java.nio.charset.StandardCharsets;
import java.util.Random;

public class InterProcessReadWriteLockRunnable implements Runnable{
    @SneakyThrows
    @Override
    public void run() {
        // 使用不同的CuratorFramework实例,表示不同的分布式节点
        CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();

        // 分布式锁的路径
        String basePath = "/kaven";

        // 创建InterProcessReadWriteLock实例,用于提供分布式锁的功能
        InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(curator, basePath,
                "分布式读写锁".getBytes(StandardCharsets.UTF_8));

        // 根据随机数来决定获取写锁还是读锁
        Random random = new Random();
        if(random.nextInt(10000) > 5000) {
            // 获取写锁
            readWriteLock.writeLock().acquire();
            System.out.println(Thread.currentThread().getName() + "获取写锁");
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getName() + "释放写锁");
            // 释放写锁
            readWriteLock.writeLock().release();
        }
        else {
            // 获取读锁
            readWriteLock.readLock().acquire();
            System.out.println(Thread.currentThread().getName() + "获取读锁");
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getName() + "释放读锁");
            // 释放读锁
            readWriteLock.readLock().release();
        }
    }
}

启动类:

package com.kaven.zookeeper;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Application {
    private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();

    public static void main(String[] args) throws Exception {
        // 分布式节点处理业务
        for (int i = 0; i < 15; i++) {
            EXECUTOR_SERVICE.execute(new InterProcessReadWriteLockRunnable());
        }
    }
}

模拟15个分布式节点获取分布式锁,输出如下所示:

pool-1-thread-8获取写锁
pool-1-thread-8释放写锁
pool-1-thread-13获取写锁
pool-1-thread-13释放写锁
pool-1-thread-12获取读锁
pool-1-thread-15获取读锁
pool-1-thread-12释放读锁
pool-1-thread-15释放读锁
pool-1-thread-1获取写锁
pool-1-thread-1释放写锁
pool-1-thread-6获取读锁
pool-1-thread-2获取读锁
pool-1-thread-14获取读锁
pool-1-thread-9获取读锁
pool-1-thread-2释放读锁
pool-1-thread-6释放读锁
pool-1-thread-14释放读锁
pool-1-thread-9释放读锁
pool-1-thread-4获取写锁
pool-1-thread-4释放写锁
pool-1-thread-11获取写锁
pool-1-thread-11释放写锁
pool-1-thread-5获取读锁
pool-1-thread-5释放读锁
pool-1-thread-10获取写锁
pool-1-thread-10释放写锁
pool-1-thread-7获取写锁
pool-1-thread-7释放写锁
pool-1-thread-3获取读锁
pool-1-thread-3释放读锁

为了验证输出是否符合预期,可以通过ZooKeeper提供的客户端获取锁路径下的所有节点,如下图所示:
在这里插入图片描述
排序后如下所示,和输出是对应的(读锁可以被多个用户同时持有,而写锁是独占的)。

/MyNamespace/kaven/_c_cd949fbc-c779-46e6-a7d8-4c4c90f13f23-__WRIT__0000000000
/MyNamespace/kaven/_c_a4934270-e9c4-40c0-8b89-5fb369a7cfaa-__WRIT__0000000001
/MyNamespace/kaven/_c_5a03751b-7f45-4813-99f6-49671c473367-__READ__0000000002
/MyNamespace/kaven/_c_babe2eec-7259-484f-9f76-51a5fc45d607-__READ__0000000003
/MyNamespace/kaven/_c_405e6cc8-481b-4d9a-8cf1-a83ea69912dd-__WRIT__0000000004
/MyNamespace/kaven/_c_a870b175-c082-4661-9383-4a82a021e283-__READ__0000000005
/MyNamespace/kaven/_c_3fa721b5-1015-46e8-8da3-20b333b76ca4-__READ__0000000006
/MyNamespace/kaven/_c_03e0208c-fb25-4229-9534-4dc6c295d6a8-__READ__0000000007
/MyNamespace/kaven/_c_5428e1af-c9fc-4afd-92f7-76fbbb545dcc-__READ__0000000008
/MyNamespace/kaven/_c_395407f3-c402-40b4-abe2-5750f43ddd9d-__WRIT__0000000009
/MyNamespace/kaven/_c_c7a1a90c-8b76-47fc-8c9e-1dc15c43a8bd-__WRIT__0000000010
/MyNamespace/kaven/_c_800861ea-8da2-4ff5-b237-19d8b939393e-__READ__0000000011
/MyNamespace/kaven/_c_3d357ec6-5288-4719-ba3b-7f98eceb32d6-__WRIT__0000000012
/MyNamespace/kaven/_c_e8067da5-bf08-4876-8200-1746dbc2ce77-__WRIT__0000000013
/MyNamespace/kaven/_c_8d4e07a2-08b9-4c1d-91d4-3b0d76844b33-__READ__0000000014

锁降级

public class InterProcessReadWriteLockRunnable implements Runnable{
    @SneakyThrows
    @Override
    public void run() {
        // 使用不同的CuratorFramework实例,表示不同的分布式节点
        CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();

        // 分布式锁的路径
        String basePath = "/kaven";

        // 创建InterProcessReadWriteLock实例,用于提供分布式锁的功能
        InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(curator, basePath,
                "分布式读写锁".getBytes(StandardCharsets.UTF_8));

        // 获取写锁
        readWriteLock.writeLock().acquire();
        System.out.println(Thread.currentThread().getName() + "获取写锁");
        Thread.sleep(2000);
        // 锁降级
        readWriteLock.readLock().acquire();
        System.out.println(Thread.currentThread().getName() + "获取读锁,锁降级成功");
        Thread.sleep(2000);
        // 释放读锁
        System.out.println(Thread.currentThread().getName() + "释放读锁");
        readWriteLock.readLock().release();
        // 释放写锁
        System.out.println(Thread.currentThread().getName() + "释放写锁");
        readWriteLock.writeLock().release();
    }
}
public class Application {
    private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();

    public static void main(String[] args) throws Exception {
        // 分布式节点处理业务
        for (int i = 0; i < 5; i++) {
            EXECUTOR_SERVICE.execute(new InterProcessReadWriteLockRunnable());
        }
    }
}

输出如下所示:

pool-1-thread-3获取写锁
pool-1-thread-3获取读锁,锁降级成功
pool-1-thread-3释放读锁
pool-1-thread-3释放写锁
pool-1-thread-1获取写锁
pool-1-thread-1获取读锁,锁降级成功
pool-1-thread-1释放读锁
pool-1-thread-1释放写锁
pool-1-thread-4获取写锁
pool-1-thread-4获取读锁,锁降级成功
pool-1-thread-4释放读锁
pool-1-thread-4释放写锁
pool-1-thread-5获取写锁
pool-1-thread-5获取读锁,锁降级成功
pool-1-thread-5释放读锁
pool-1-thread-5释放写锁
pool-1-thread-2获取写锁
pool-1-thread-2获取读锁,锁降级成功
pool-1-thread-2释放读锁
pool-1-thread-2释放写锁

InterProcessReadWriteLock类除了构造方法外,就只有readLockwriteLock这两个方法可以调用,而这两个方法的返回值是InterProcessMutex实例,因此分布式读写锁InterProcessReadWriteLock的实现是基于分布式锁InterProcessMutex

Curator框架的分布式锁InterProcessReadWriteLock就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐