RedissonLock#subscribe

订阅锁释放事件,并阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题:
基于信号量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件,一旦锁释放会发消息通知待等待的线程进行竞争.

1、当 this.await 返回 false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败.
2、当 this.await 返回 true,进入循环尝试获取锁.

protected final LockPubSub pubSub;

...

protected RFuture<RedissonLockEntry> subscribe(long threadId) {
    return pubSub.subscribe(getEntryName(), getChannelName());
}

entryName 格式:“id:name”;
channelName 格式:“redisson_lock__channel:{name}”;

RedissonLock#pubSub 是在RedissonLock构造函数中通过如下方式初始化的:

this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();

subscribeServiceMasterSlaveConnectionManager的实现中又是通过如下方式构造的,其中this就是MasterSlaveConnectionManager实例,config则为MasterSlaveServersConfig实例:

subscribeService = new PublishSubscribeService(this, config);

//PublishSubscribeService.java
private final LockPubSub lockPubSub = new LockPubSub(this);
private final AsyncSemaphore[] locks = new AsyncSemaphore[50];
public PublishSubscribeService(ConnectionManager connectionManager, MasterSlaveServersConfig config) {
   super();
    this.connectionManager = connectionManager;
    this.config = config;
    for (int i = 0; i < locks.length; i++) {
        locks[i] = new AsyncSemaphore(1);
    }
}

我们会发现其在初始化时,会初始化一组信号量,至于用途是什么,我们会在后面揭晓。现在我们知道RedissonLock#pubSub是怎么初始化的了,让我们回到订阅流程。

/*
* @param entryName     格式:“id:name”
* @param channelName   格式:“redisson_lock__channel:{name}”
* @return
*/
//PublishSubscribe.java
public RFuture<E> subscribe(String entryName, String channelName) {
    //代码@1 对于同一个锁,semaphore为单例
    AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));

    //每一个尝试获取锁失败的线程都会创建一个listenerHolder和一个newPromise,所以这里的listenerHolder、newPromise是与当前获取锁的线程绑定的
    AtomicReference<Runnable> listenerHolder = new AtomicReference<>();
    RPromise<E> newPromise = new RedissonPromise<E>() {
        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            return semaphore.remove(listenerHolder.get());
        }
    };

    //每一个尝试获取锁失败的线程都会创建一个listener
    Runnable listener = () -> {
        // entry's type is RedissonLockEntry
        E entry = entries.get(entryName);
        if (entry != null) {
            entry.acquire();
            semaphore.release();
            entry.getPromise().onComplete(new TransferListener<E>(newPromise));
            return;
        }

        // new RedissonLockEntry
        E value = createEntry(newPromise);
        value.acquire();

        E oldValue = entries.putIfAbsent(entryName, value);
        if (oldValue != null) {
            oldValue.acquire();
            semaphore.release();
            oldValue.getPromise().onComplete(new TransferListener<E>(newPromise));
            return;
        }

        RedisPubSubListener<Object> redisPubSubListener = createListener(channelName, value);
        service.subscribe(LongCodec.INSTANCE, channelName, semaphore, redisPubSubListener);
    };

    //首个尝试获取锁失败的线程acquire操作不会阻塞,会直接触发执行listener.run方法
    semaphore.acquire(listener);
    listenerHolder.set(listener);
    
    return newPromise;
}

代码@1

public AsyncSemaphore getSemaphore(ChannelName channelName) {
    return locks[Math.abs(channelName.hashCode() % locks.length)];
}

通过入参channelName的格式redisson_lock__channel:{name}我们知道,对于同一个锁,这里获取的信号量是同一个。

代码@2

public void acquire(Runnable listener) {
    acquire(listener, 1);
}

public void acquire(Runnable listener, int permits) {

    synchronized (this) {
        if (counter < permits) {
            listeners.add(new Entry(listener, permits));
            return;
        } else {
            counter -= permits;
        }
    }

    listener.run();
}

AsyncSemaphore#counter代表当前信号量允许的请求数,初始值为1,所以对于首个尝试获取该锁失败的线程会直接触发执行listener.run方法。而对于后续尝试获取该锁失败的线程则会创建Entry对象(保存listenerpermits的映射关系)并保存到AsyncSemaphore#listeners

尝试获取锁失败的线程会走到此流程订阅redis通知。

  1. 假设有A、B、C、etc多个线程顺序调用{@link #subscribe}方法,因为semaphore初始信号量为1(参见{@link PublishSubscribeService#PublishSubscribeService}方法),所以线程A可以获得信号量(参见{@link AsyncSemaphore#acquire}方法),并执行{@code listener.run}方法,而对于之后的线程B、C、etc,其listener会被封装成{@link AsyncSemaphore.Entry}保存在semaphore的{@link AsyncSemaphore#listeners}中。

  2. 首个尝试获取锁失败的线程{@code semaphore.acquire(listener);}操作不会阻塞,会直接触发执行{@code listener.run}方法。这里线程A作为首个尝试获取锁失败的线程,会执行{@code listener.run}方法,其发现{@link PublishSubscribe#entries}中并没有当前锁对应的记录,会创建一个{@link RedissonLockEntry}(参见{@link #createEntry}方法)并添加到{@link PublishSubscribe#entries}(姑且记为 A_e),key为"id:name"。同时会注册监听器redisPubSubListener。

(1) semaphore的{@link AsyncSemaphore#release()}方法会被调用,从{@link AsyncSemaphore#listeners}中取出一个{@link AsyncSemaphore.Entry}对象(线程B),并进而调用{@link AsyncSemaphore#acquire}方法此时能够成功获取信号量,并执行{@code listener.run}方法。在上述执行{@code listener.run}方法时,{@code E entry = entries.get(entryName);}获取到的{@link RedissonLockEntry}对象是前面线程A写入的A_e。接着调用{@code RedissonPromise#onComplete}方法为线程A的newPromise添加监听,监听器保存在{@link RedissonPromise#promise}对象的{@link DefaultPromise#listeners}中。监听器的作用用于在线程A的newPromise({@link RedissonLockEntry#promise})完成时将其的结果同步到当前线程(线程B、C、etc)的newPromise
继续调用semaphore的{@link AsyncSemaphore#release()}方法,以此类推。。。重复上述步骤(1)

代码如下:

@Override
public void onComplete(BiConsumer<? super T, ? super Throwable> action) {
    promise.addListener(f -> {
        if (!f.isSuccess()) {
            action.accept(null, f.cause());
            return;
        }
        
        action.accept((T) f.getNow(), null);
    });
}

(2) 监听器的{@link BaseRedisPubSubListener#onStatus}方法被调用,标记A_e的{@link RedissonLockEntry#promise}也即线程A的newPromise完成(执行代码:value.getPromise().trySuccess(value)),其会唤醒注册在newPromise的{@link RedissonPromise#promise}对象的{@link DefaultPromise#listeners}中的所有监听器,从而在线程A的newPromise({@link RedissonLockEntry#promise})完成时将其的结果同步到当前线程(线程B、C、etc)的newPromise

上述(1)(2)所达到的效果就是同步等待(参见 RedissonLock {@code subscribeFuture.await(time, TimeUnit.MILLISECONDS)})注册redis通知完成。

  1. redisPubSubListener监听器在收到redis的通知时,对于{@link @channelName}的消息会调用{@link PublishSubscribe.onMessage}方法释放一个信号量,唤醒等待的entry.getLatch().tryAcquire去再次尝试申请锁。

之后会通过如下方式阻塞等待订阅redis通知完成。

if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
    if (!subscribeFuture.cancel(false)) {
        subscribeFuture.onComplete((res, e) -> {
            if (e == null) {
                unsubscribe(subscribeFuture, threadId);
            }
        });
    }
    acquireFailed(threadId);
    return false;
}

如果等待超时,会通过RedissonPromise#cancel方法取消当前线程的订阅。而cancel方法的实现如下:

RPromise<E> newPromise = new RedissonPromise<E>() {
   @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return semaphore.remove(listenerHolder.get());
    }
};

//AsyncSemaphore.java
public boolean remove(Runnable listener) {
   synchronized (this) {
        return listeners.remove(new Entry(listener, 0));
    }
}

前面我们提到过,线程B、C、etc因为获取不到信号量,其listener会被封装成{@link AsyncSemaphore.Entry}保存在semaphore的{@link AsyncSemaphore#listeners}中。这里就是各线程将各自的listenersemaphore的{@link AsyncSemaphore#listeners}中移除。对于第一个尝试获取锁失败的线程A,其并不会被保存在listeners中,所以这里移除会失败,即RedissonPromise#cancel方法会返回false。进而走到下面的subscribeFuture.onComplete逻辑。

Logo

鸿蒙生态一站式服务平台。

更多推荐