前言

我们在Synchronized中了解到,如果我们调用await()方法的话,会将当前线程阻塞并且放置到等待队列中,唤醒的方法有notify(),notifyAll(),这些是java内置的特性支持。而在JUC包中也有对应的ReentrantLock,那它是如何实现Synchronized等待队列这一特性的呢,接下来,我们就来了解一下。

注: 本文与AQS挂钩,需先了解AbstractQueuedSynchronizer。


Condition以及其具体实现类源码分析

众所周知,ReentrantLock本身是继承AQS来实现锁的特性,关于AQS独占锁实现的源码分析,可以看JDK源码系列 AbstractQueuedSynchronizer源码剖析,那么想必实现等待队列的功能也应该是由AQS提供的。

我们可以看到ReentrantLock底层的等待队列其实是一个ConditionObject实现。

final ConditionObject newCondition() {
    return new ConditionObject();
}

这个ConditionObject哪里来的呢,看下面的分析。


一、Condition的继承关系类图

在这里插入图片描述
JUC的Condition继承关系类图很简单,Condition类是一个接口类,而具体的子类实现是AQS的内部类ConditionObject

二、Condition
public interface Condition {
    //会导致当前线程接在收到唤醒信号或者被中断之前都会一直处于等待的状态
    void await() throws InterruptedException;
	//会导致当前线程接在收到唤醒信号之前都会一直处于等待的状态
    void awaitUninterruptibly();
    //会导致当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
    long awaitNanos(long var1) throws InterruptedException;
    boolean await(long var1, TimeUnit var3) throws InterruptedException;
    boolean awaitUntil(Date var1) throws InterruptedException;
	//唤醒一个等待线程
    void signal();
	//唤醒所有等待线程
    void signalAll();
}

以上便是Condition接口的介绍,我们可以看到,Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法。不同的是,Object中的wait(),notify(),notifyAll()方法是和"同步锁"(synchronized关键字)捆绑使用的;而Condition是需要与"互斥锁"/"共享锁"捆绑使用的。


三、ConditionObject实现

我们来看一下AQS内部类ConditionObject是如何实现Condition的。

3.1 参数
//condition队列的第一个结点
private transient Node firstWaiter;
//condition队列的最后一个结点
private transient Node lastWaiter;

我们来简单了解Node结点实现:Condtion队列的Node实现来源于AQS的内部类Node。

static final class Node {
	//共享锁模式
    static final AbstractQueuedSynchronizer.Node SHARED = new AbstractQueuedSynchronizer.Node();
    //独占锁模式
    static final AbstractQueuedSynchronizer.Node EXCLUSIVE = null;
    static final int CANCELLED = 1;
    static final int SIGNAL = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;
    //当前节点的等待状态
    volatile int waitStatus;
    //前置节点
    volatile AbstractQueuedSynchronizer.Node prev;
    //后继节点
    volatile AbstractQueuedSynchronizer.Node next;
    //当前节点所关联的线程
    volatile Thread thread;
    //Condition队列中指向下一个node的指针,在CLH队列中不使用。
    AbstractQueuedSynchronizer.Node nextWaiter;
}

我们来简单介绍一下Node的waitStatus:

waitStatus意义
CANCELLED值为1,在CLH队列中等待的线程等待超时或则被中断,需要从同步队列中取消该Node的节点,其节点为CANCELLED,即结束状态,进入该状态后的节点将不会再变化
SIGNAL值为-1,被标识为等待唤醒状态的后继节点,当其前继节点的线程释放了同步锁或被取消,将会通知该后继节点的线程执行。其实就是处于唤醒状态,只要前继节点释放锁,就会通知标识为SINGAL状态的后继节点的线程执行。
CONDITION值为-2,与Condition相关。该标识的节点处于等待队列,节点的线程等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的节点将从等待队列转移到同步队列中,等待获取锁。
PROPAGATE值为-3,与共享模式相关。在共享模式中,该标识节点的线程处于可运行状态。
0每个新加入到CLH队列的结点的初始状态

我们来区别一下nextnextWaiter,为什么Node上会有两个标记后继节点的成员变量呢。
他们主要区别是: next是用于CLH队列的,而nextWaiter是用于Condition队列
其实Node节点可以表示两个链表,CLH同步队列其实是一个双向链表的结构,Condition队列是一个单向链表的结构。CLH队列是AQS中等待获取锁的同步队列,这里就不赘述了。

3.2 ConditionObject的核心API

signal:唤醒Condition队里的首节点,将其转移到CLH队列中。
之所以把signal放在await之前讲是因为await的代码可能有些地方比较难懂,有些过程可能会涉及到signal,所以我们把signal先进行讲解。

public final void signal() {
	//如果当前线程不是独占线程(说明当前线程还没有持有锁) 尝试调用signal直接抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    AbstractQueuedSynchronizer.Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

doSignal(AbstractQueuedSynchronizer.Node first):对Condition队里的首节点进行transferForSignal操作

private void doSignal(AbstractQueuedSynchronizer.Node first) {
    do {
    	//将Condition队列的第一个结点移除 并将firstWaiter指向移除节点的下一个节点
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    //若转移成功或者Condition队列为空,退出循环。
    } while (!transferForSignal(first) &&
            (first = firstWaiter) != null);
}

transferForSignal(AbstractQueuedSynchronizer.Node node):将Condition队里的首节点从Condition队列转移到CLH队列中。

final boolean transferForSignal(AbstractQueuedSynchronizer.Node node) {
    //如果将移除的结点的状态从CONDITION转化为0失败,说明当前结点已经被弃用了,状态为CANCELLED
    //状态为CANCELLER的节点不需要从Condition队列转移到CLH队列中,直接丢弃。
    //这里可以直接判断为cancelled是因为调用signal()方法的线程必须是AQS中的独占线程,独占线程只能有一个,所以不可能有多个线程唤醒当前节点,只能说明节点被弃用。
    if (!compareAndSetWaitStatus(node, AbstractQueuedSynchronizer.Node.CONDITION, 0))
        return false;
   	//若状态转换成功 调用enq函数将当前节点加入CLH队列的尾部 并返回当前节点在CLH队列中的前驱节点
    AbstractQueuedSynchronizer.Node p = enq(node);
    //获取前驱节点的状态
    int ws = p.waitStatus;
    //如果前驱节点的状态的大于0,即为cancelled状态
    //获取前驱节点的状态不为CANCELLED,尝试通过CAS将前驱节点的状态装成signal状态,但是失败了,说明有其他线程改变了前驱节点的状态。(例如前驱节点被中断设置为CANCALLED状态)
    //以上条件若满足一个,那么就唤醒当前节点的线程。
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, AbstractQueuedSynchronizer.Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

transferForSignal的执行流程为:

  1. 通过CAS改变当前结点的waitStatus,如果失败,说明当前结点被弃用,不需要再进行转移了。
  2. 若成功改变状态,调用enq通过cas自旋将结点加入CLH队列的尾部,并且获取当前结点的前驱结点。
  3. 判断前驱结点的状态,若为calcelled或者尝试将前驱结点的waitStatus转换成SIGNAL状态失败的话,调用LockSupport.unpark唤醒当前结点的线程。

以上便是signal整个流程的源码分析。signalAll为唤醒全部的线程,这里就不赘述。

await

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    //调用addConditionWaiter()往Condition队列中添加一个新结点
    AbstractQueuedSynchronizer.Node node = addConditionWaiter();
    //完全释放当前线程所占有的锁(无论可否重入)并唤醒node的后继结点
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    /**
     * 只要没有转移到CLH队列中就进行阻塞
     * 转移情况有:
     * 1、其他线程调用signalAll唤醒当前线程
     * 2、其他线程调用signal可能唤醒当前线程
     * 2、其他线程中断了当前线程
     */
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        /**
         * 获取中断的模式:
         * 在线程从park中被唤醒的时候,需要判断是否被中断,若中断则尝试转移到CLH队列中并退出循环
         */
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
     /**
      * 跳出while循环,说明node节点被中断了或者是被唤醒,并且转移到了CLH队列中
      * 若当前结点在CLH队列中,调用acquireQueued尝试获取资源
      * 如果获取资源成功但是线程被中断过且interruptMode!=THROW_IE,将interruptMode设置为REINTERRUPT。
      */
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    //清理状态为CANCELLED的结点
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    //如果线程被中断过 调用reportInterruptAfterWait来处理中断
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

// THROW_IN表示直接抛出异常
// REINTERRUPT表示重新设置中断标志,向后传递中断
private void reportInterruptAfterWait(int interruptMode)
        throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

await的执行流程:

  1. 调用addConditionWaiter往Condition的尾部添加一个状态为CONDITION的新结点。
  2. 尝试当前线程所占有的锁,若释放成功,则调用unparkSuccessor唤醒该节点在CLH队列中第一个状态正常的后继结点。
  3. 在while循环中调用isOnSyncQueue方法检测node是否再次被转移到CLH队列中。如果没有,则park当前线程,等待被唤醒或者中断。若被唤醒或者中断,从等待状态中醒来,调用checkInterruptWhileWaiting检测node在等待过程中是否被中断过,并设置interruptMode的值来标志中断状态。如果当前线程已经处于CLH队列中了,则跳出while循环。
  4. 调用acquireQueued来尝试获取锁,这个过程,线程会被阻塞直至获取资源。
  5. 检查interruptMode的状态,在最后调用reportInterruptAfterWait统一抛出异常或发生中断。

addWaiter: 添加一个等待结点

private AbstractQueuedSynchronizer.Node addConditionWaiter() {
    AbstractQueuedSynchronizer.Node t = lastWaiter;
    //若尾节点不为空且状态不是CONDITION状态 进行unlinkCancelledWaiters()操作 遍历Condition队列清除状态为CANCELLED的结点
    if (t != null && t.waitStatus != AbstractQueuedSynchronizer.Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    //创建一个新的节点: 存储当前的线程,结点的状态为CONDITION状态
    AbstractQueuedSynchronizer.Node node = new AbstractQueuedSynchronizer.Node(Thread.currentThread(), AbstractQueuedSynchronizer.Node.CONDITION);
    //若Condition队列尾结点为空,说明当前Condition队列为空,将当前新建结点设置为Condition队列的首节点
    if (t == null)
        firstWaiter = node;
    else
    	//将新建节点追加到Condition队列的尾部
        t.nextWaiter = node;
    //重置尾节点为当前节点
    lastWaiter = node;
    return node;
}

addConditionWaiter的执行流程:

  1. 先判断Condition队列的尾节点是否存在,若存在且状态不为CONDITION,那么执行unlinkCancelledWaiters遍历链表并移除状态为CANCELLED的结点。
  2. 创建一个新的结点,设置状态为CONDITION,并绑定当前的线程,加入到Condition队列的尾部。

unlinkCancelledWaiters()

private void unlinkCancelledWaiters() {
	//获取Condition队列的第一个结点
    AbstractQueuedSynchronizer.Node t = firstWaiter;
    AbstractQueuedSynchronizer.Node trail = null;
    //遍历Condition队列 删除状态为CANCELLED的结点
    while (t != null) {
    	//获取Condition队列的后继结点
        AbstractQueuedSynchronizer.Node next = t.nextWaiter;
        //如果后继结点的状态不为CONDITION,说明当前节点被中断或者等待超时了,状态为CANCELLED
        if (t.waitStatus != AbstractQueuedSynchronizer.Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}

unlinkCancellerWaiters就是遍历一遍Condition队列,删除节点状态为cancelled的节点。

fullyRelease(node):释放结点所占有的锁,无论是可重入还是不可重入

final int fullyRelease(AbstractQueuedSynchronizer.Node node) {
	//判断释放锁释放成功
    boolean failed = true;
    try {
    	//获取当前结点的状态
        int savedState = getState();
        //调用release释放锁 释放成功直接返回savedState 否则抛出异常
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
    	//如果释放锁失败 将当前结点的状态设置为CANCELLED
        if (failed)
            node.waitStatus = AbstractQueuedSynchronizer.Node.CANCELLED;
    }
}
public final boolean release(int arg) {
	//调用AQS的tryRelease方法(由子类实现)进行资源的释放
    if (tryRelease(arg)) {
    	//释放成功 获取CLH队列的头结点
        AbstractQueuedSynchronizer.Node h = head;
        //如果CLH队列的头结点不为空 且h.waitStatus!=0
        if (h != null && h.waitStatus != 0)
        	//调用AQS的unparkSuccessor唤醒头结点的后继结点,准备获取资源
            unparkSuccessor(h);
        return true;
    }
    return false;
}

fullyRelease: 调用tryRelease释放锁,释放成功返回savedStatus,否则抛出异常并设置当前结点的状态为CANCELLED。

isOnSyncQueue(node):判断当前结点是否在CLH同步队列中

final boolean isOnSyncQueue(AbstractQueuedSynchronizer.Node node) {
   	/**
     * 结点状态为CONDITION的一定在Condition队列中,CLH队列中不存在CONDITION状态的结点
     * 或者node.prev==null一定在Condition队列中
     * CLH队列中prev为null的结点只有head结点(获取资源并且正在运行)
     * 新入队的结点的prev不可能为null
     */
    if (node.waitStatus == AbstractQueuedSynchronizer.Node.CONDITION || node.prev == null)
        return false;
    //如果当前结点还有后继结点的话,那说明该节点肯定还在CLH队列中,Condition队列中的节点的后继结点一定为null
    if (node.next != null) // If has successor, it must be on queue
        return true;
    //从尾节点开始,使用prev指针,遍历整个CLH队列,判断node结点是否存在CLH队列中
    return findNodeFromTail(node);
}

findNodeFromTail:从尾节点开始,使用prev指针,遍历整个CLH队列。

private boolean findNodeFromTail(AbstractQueuedSynchronizer.Node node) {
    AbstractQueuedSynchronizer.Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

因为新加入的结点总是在tail结点附近的,所以从后往前遍历更快。

checkInterruptWhileWaiting:检测在等待的过程中是否被中断过,若中断,是在被其他线程唤醒后中断的还是在被唤醒前就已经中断的。

/*
 * 检测中断,若被中断过,如果是在被唤醒前中断的,那么返回THROW_IE,如果是在唤醒后被中断的,那么返回REINTERRUPT。 
 * 否则,返回0.
 */
private int checkInterruptWhileWaiting(AbstractQueuedSynchronizer.Node node) {
	//如果为true,说明被中断过,则调用transferAfterCancelledWait
    return Thread.interrupted() ?
            (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
            0;
}
final boolean transferAfterCancelledWait(AbstractQueuedSynchronizer.Node node) {
    //调用cas设置node状态为0 成功,说明当前node是在唤醒前被中断
    if (compareAndSetWaitStatus(node, AbstractQueuedSynchronizer.Node.CONDITION, 0)) {
        //调用enq将node加入CLH队列中
        enq(node);
        //返回true
        return true;
    }
    //上面失败的原因是其他线程先一步调用signal或者signalAll将同步状态更新。
    //当前线程被唤醒从condition队列转移到CLH队列中,说明当前线程是被唤醒后被中断的
    //循环 直到node真正加入在CLH队列中,退出循环
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

以上,便是对于Condition具体实现类的分析,由于其他方法都比较相似,这里就不赘述。

3.1 源码漏洞

signal和signalAll在执行前都会判断当前线程是否是持有锁的独占线程,如果不是,会直接抛出异常。
而await在执行时,并没有进行!isHeldExclusively()的判断,导致不持有的线程和持有锁的线程会都可以调用await操作。虽然在fullyRelease里面有对不持有锁的线程进行判断并且抛出异常。但是,addConditionWaiter已经执行,这里就会出现一些问题。

private AbstractQueuedSynchronizer.Node addConditionWaiter() {
    AbstractQueuedSynchronizer.Node t = lastWaiter;
    if (t != null && t.waitStatus != AbstractQueuedSynchronizer.Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    AbstractQueuedSynchronizer.Node node = new AbstractQueuedSynchronizer.Node(Thread.currentThread(), AbstractQueuedSynchronizer.Node.CONDITION);
    //由于同步竞争这里会导致问题
    if (t == null)
        firstWaiter = node;
    else
    
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

由于addConditionWaiter没有进行同步,假设持有锁的线程t1和没有持有锁的线程t2同时进入该if(t==null)判断,由于系统调度t1先执行,这时候firstWaiter的结点为node1。接着t2进行执行,把firstWaiter由node1设置为了node2。
这样导致node1丢失,后面调用signal方法或者singalAll方法时会找不到node1,也就是t1线程永远无法被唤醒。该bug被录入JDK-8187408 : AbstractQueuedSynchronizer wait queue corrupted when thread awaits without holding the lock.

参考文章:

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐