ZooKeeper : Curator框架之分布式锁InterProcessReadWriteLock
InterProcessReadWriteLock跨JVM工作的可重入读/写互斥锁,使用Zookeeper来持有锁,所有JVM中使用相同锁路径的所有进程都将实现进程间临界区。这个互斥锁是公平的,每个用户都会按照请求的顺序获得互斥锁(从ZK的角度来看)。读写锁维护一对关联的锁,一个用于只读操作,一个用于写操作。只要没有写锁,读锁可以被多个用户同时持有,而写锁是独占的。读写锁允许从写锁降级为读锁,方法
InterProcessReadWriteLock
跨JVM
工作的可重入读/写互斥锁,使用Zookeeper
来持有锁,所有JVM
中使用相同锁路径的所有进程都将实现进程间临界区。这个互斥锁是公平的,每个用户都会按照请求的顺序获得互斥锁(从ZK
的角度来看)。
读写锁维护一对关联的锁,一个用于只读操作,一个用于写操作。只要没有写锁,读锁可以被多个用户同时持有,而写锁是独占的。
读写锁允许从写锁降级为读锁,方法是先获取写锁,然后就可以获取读锁。但是,无法从读锁升级到写锁。
测试
pom.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.kaven</groupId>
<artifactId>zookeeper</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
</dependencies>
</project>
CuratorFrameworkProperties
类(提供CuratorFramework
需要的一些配置信息,以及创建CuratorFramework
实例的方法):
package com.kaven.zookeeper;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorFrameworkProperties {
// 连接地址
public static final String CONNECT_ADDRESS = "192.168.1.3:9000";
// 连接超时时间
public static final int CONNECTION_TIMEOUT_MS = 40000;
// Session超时时间
public static final int SESSION_TIMEOUT_MS = 10000;
// 命名空间
public static final String NAMESPACE = "MyNamespace";
// 重试策略
public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3);
public static CuratorFramework getCuratorFramework() {
// 创建CuratorFramework实例
CuratorFramework curator = CuratorFrameworkFactory.builder()
.connectString(CuratorFrameworkProperties.CONNECT_ADDRESS)
.retryPolicy(CuratorFrameworkProperties.RETRY_POLICY)
.connectionTimeoutMs(CuratorFrameworkProperties.CONNECTION_TIMEOUT_MS)
.sessionTimeoutMs(CuratorFrameworkProperties.SESSION_TIMEOUT_MS)
.namespace(CuratorFrameworkProperties.NAMESPACE)
.build();
curator.start();
assert curator.getState().equals(CuratorFrameworkState.STARTED);
return curator;
}
}
InterProcessReadWriteLockRunnable
类(实现了Runnable
接口,模拟分布式节点获取分布式锁):
package com.kaven.zookeeper;
import lombok.SneakyThrows;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import java.nio.charset.StandardCharsets;
import java.util.Random;
public class InterProcessReadWriteLockRunnable implements Runnable{
@SneakyThrows
@Override
public void run() {
// 使用不同的CuratorFramework实例,表示不同的分布式节点
CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();
// 分布式锁的路径
String basePath = "/kaven";
// 创建InterProcessReadWriteLock实例,用于提供分布式锁的功能
InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(curator, basePath,
"分布式读写锁".getBytes(StandardCharsets.UTF_8));
// 根据随机数来决定获取写锁还是读锁
Random random = new Random();
if(random.nextInt(10000) > 5000) {
// 获取写锁
readWriteLock.writeLock().acquire();
System.out.println(Thread.currentThread().getName() + "获取写锁");
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + "释放写锁");
// 释放写锁
readWriteLock.writeLock().release();
}
else {
// 获取读锁
readWriteLock.readLock().acquire();
System.out.println(Thread.currentThread().getName() + "获取读锁");
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + "释放读锁");
// 释放读锁
readWriteLock.readLock().release();
}
}
}
启动类:
package com.kaven.zookeeper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Application {
private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();
public static void main(String[] args) throws Exception {
// 分布式节点处理业务
for (int i = 0; i < 15; i++) {
EXECUTOR_SERVICE.execute(new InterProcessReadWriteLockRunnable());
}
}
}
模拟15
个分布式节点获取分布式锁,输出如下所示:
pool-1-thread-8获取写锁
pool-1-thread-8释放写锁
pool-1-thread-13获取写锁
pool-1-thread-13释放写锁
pool-1-thread-12获取读锁
pool-1-thread-15获取读锁
pool-1-thread-12释放读锁
pool-1-thread-15释放读锁
pool-1-thread-1获取写锁
pool-1-thread-1释放写锁
pool-1-thread-6获取读锁
pool-1-thread-2获取读锁
pool-1-thread-14获取读锁
pool-1-thread-9获取读锁
pool-1-thread-2释放读锁
pool-1-thread-6释放读锁
pool-1-thread-14释放读锁
pool-1-thread-9释放读锁
pool-1-thread-4获取写锁
pool-1-thread-4释放写锁
pool-1-thread-11获取写锁
pool-1-thread-11释放写锁
pool-1-thread-5获取读锁
pool-1-thread-5释放读锁
pool-1-thread-10获取写锁
pool-1-thread-10释放写锁
pool-1-thread-7获取写锁
pool-1-thread-7释放写锁
pool-1-thread-3获取读锁
pool-1-thread-3释放读锁
为了验证输出是否符合预期,可以通过ZooKeeper
提供的客户端获取锁路径下的所有节点,如下图所示:
排序后如下所示,和输出是对应的(读锁可以被多个用户同时持有,而写锁是独占的)。
/MyNamespace/kaven/_c_cd949fbc-c779-46e6-a7d8-4c4c90f13f23-__WRIT__0000000000
/MyNamespace/kaven/_c_a4934270-e9c4-40c0-8b89-5fb369a7cfaa-__WRIT__0000000001
/MyNamespace/kaven/_c_5a03751b-7f45-4813-99f6-49671c473367-__READ__0000000002
/MyNamespace/kaven/_c_babe2eec-7259-484f-9f76-51a5fc45d607-__READ__0000000003
/MyNamespace/kaven/_c_405e6cc8-481b-4d9a-8cf1-a83ea69912dd-__WRIT__0000000004
/MyNamespace/kaven/_c_a870b175-c082-4661-9383-4a82a021e283-__READ__0000000005
/MyNamespace/kaven/_c_3fa721b5-1015-46e8-8da3-20b333b76ca4-__READ__0000000006
/MyNamespace/kaven/_c_03e0208c-fb25-4229-9534-4dc6c295d6a8-__READ__0000000007
/MyNamespace/kaven/_c_5428e1af-c9fc-4afd-92f7-76fbbb545dcc-__READ__0000000008
/MyNamespace/kaven/_c_395407f3-c402-40b4-abe2-5750f43ddd9d-__WRIT__0000000009
/MyNamespace/kaven/_c_c7a1a90c-8b76-47fc-8c9e-1dc15c43a8bd-__WRIT__0000000010
/MyNamespace/kaven/_c_800861ea-8da2-4ff5-b237-19d8b939393e-__READ__0000000011
/MyNamespace/kaven/_c_3d357ec6-5288-4719-ba3b-7f98eceb32d6-__WRIT__0000000012
/MyNamespace/kaven/_c_e8067da5-bf08-4876-8200-1746dbc2ce77-__WRIT__0000000013
/MyNamespace/kaven/_c_8d4e07a2-08b9-4c1d-91d4-3b0d76844b33-__READ__0000000014
锁降级
public class InterProcessReadWriteLockRunnable implements Runnable{
@SneakyThrows
@Override
public void run() {
// 使用不同的CuratorFramework实例,表示不同的分布式节点
CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();
// 分布式锁的路径
String basePath = "/kaven";
// 创建InterProcessReadWriteLock实例,用于提供分布式锁的功能
InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(curator, basePath,
"分布式读写锁".getBytes(StandardCharsets.UTF_8));
// 获取写锁
readWriteLock.writeLock().acquire();
System.out.println(Thread.currentThread().getName() + "获取写锁");
Thread.sleep(2000);
// 锁降级
readWriteLock.readLock().acquire();
System.out.println(Thread.currentThread().getName() + "获取读锁,锁降级成功");
Thread.sleep(2000);
// 释放读锁
System.out.println(Thread.currentThread().getName() + "释放读锁");
readWriteLock.readLock().release();
// 释放写锁
System.out.println(Thread.currentThread().getName() + "释放写锁");
readWriteLock.writeLock().release();
}
}
public class Application {
private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();
public static void main(String[] args) throws Exception {
// 分布式节点处理业务
for (int i = 0; i < 5; i++) {
EXECUTOR_SERVICE.execute(new InterProcessReadWriteLockRunnable());
}
}
}
输出如下所示:
pool-1-thread-3获取写锁
pool-1-thread-3获取读锁,锁降级成功
pool-1-thread-3释放读锁
pool-1-thread-3释放写锁
pool-1-thread-1获取写锁
pool-1-thread-1获取读锁,锁降级成功
pool-1-thread-1释放读锁
pool-1-thread-1释放写锁
pool-1-thread-4获取写锁
pool-1-thread-4获取读锁,锁降级成功
pool-1-thread-4释放读锁
pool-1-thread-4释放写锁
pool-1-thread-5获取写锁
pool-1-thread-5获取读锁,锁降级成功
pool-1-thread-5释放读锁
pool-1-thread-5释放写锁
pool-1-thread-2获取写锁
pool-1-thread-2获取读锁,锁降级成功
pool-1-thread-2释放读锁
pool-1-thread-2释放写锁
InterProcessReadWriteLock
类除了构造方法外,就只有readLock
和writeLock
这两个方法可以调用,而这两个方法的返回值是InterProcessMutex
实例,因此分布式读写锁InterProcessReadWriteLock
的实现是基于分布式锁InterProcessMutex
。
Curator
框架的分布式锁InterProcessReadWriteLock
就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。
更多推荐
所有评论(0)