一、简介

    LinkedBlockingQueue是BlockingQueue的一种使用Link List的实现,它对头和尾(取和添加操作)采用两把不同的锁,相对于ArrayBlockingQueue提高了吞吐量。它也是一种阻塞型的容器,适合于实现“消费者生产者”模式。

二、具体实现

    LinkedBlockingQueue底层的定义如下:

public class LinkedBlockingQueue<E> extends AbstractQueue<E>   
        implements BlockingQueue<E>, java.io.Serializable {   
  
    static class Node<E> {   
        /** The item, volatile to ensure barrier separating write and read */  
        volatile E item;   
        Node<E> next;   
        Node(E x) { item = x; }   
    }   
  
    // 支持原子操作   
     private final AtomicInteger count = new AtomicInteger(0);   
  
    // 链表的头和尾   
     private transient Node<E> head;   
    private transient Node<E> last;   
  
    // 针对取和添加操作的两把锁及其上的条件   
    private final ReentrantLock takeLock = new ReentrantLock();   
    private final Condition notEmpty = takeLock.newCondition();   
    private final ReentrantLock putLock = new ReentrantLock();   
    private final Condition notFull = putLock.newCondition();   
  
   ...   
}  

LinkedBlockingQueue的添加操作:


 

public class LinkedBlockingQueue<E> extends AbstractQueue<E>   
        implements BlockingQueue<E>, java.io.Serializable {   
  
    private void insert(E x) {   
        last = last.next = new Node<E>(x);   
    }   
  
    /**  
     * signal方法在被调用时,当前线程必须拥有该condition相关的锁!  
     * Signal a waiting take. Called only from put/offer (which do not  
     * otherwise ordinarily lock takeLock.)  
     */  
    private void signalNotEmpty() {   
        final ReentrantLock takeLock = this.takeLock;   
        takeLock.lock();   
        try {   
            notEmpty.signal();   
        } finally {   
            takeLock.unlock();   
        }   
    }   
  
    public void put(E o) throws InterruptedException {   
        if (o == null) throw new NullPointerException();   
        int c = -1;   
        final ReentrantLock putLock = this.putLock;   
        final AtomicInteger count = this.count;   
        // 使用putLock   
        putLock.lockInterruptibly();   
        try {   
            try {   
                  // 当容量已满时,等待notFull条件   
            while (count.get() == capacity)   
                    notFull.await();   
            } catch (InterruptedException ie) {   
                notFull.signal(); // propagate to a non-interrupted thread   
                throw ie;   
            }   
            insert(o);   
            // 取出当前值,并将原数据增加1   
            c = count.getAndIncrement();   
            // 容量不满,再次激活notFull上等待的put线程   
        if (c + 1 < capacity)   
                notFull.signal();   
        } finally {   
            putLock.unlock();   
        }   
        // 必须先释放putLock再在notEmpty上signal,否则会造成死锁   
     if (c == 0)   
            signalNotEmpty();   
    }   
  
  ...   
}  


LinkedBlockingQueue的取操作:

public class LinkedBlockingQueue<E> extends AbstractQueue<E>   
        implements BlockingQueue<E>, java.io.Serializable {   
  
    private E extract() {   
        Node<E> first = head.next;   
        head = first;   
        E x = first.item;   
        first.item = null;   
        return x;   
    }   
  
    private void signalNotFull() {   
        final ReentrantLock putLock = this.putLock;   
        putLock.lock();   
        try {   
            notFull.signal();   
        } finally {   
            putLock.unlock();   
        }   
    }   
  
    public E take() throws InterruptedException {   
        E x;   
        int c = -1;   
        final AtomicInteger count = this.count;   
        final ReentrantLock takeLock = this.takeLock;   
        // 使用takeLock   
        takeLock.lockInterruptibly();   
        try {   
            try {   
                  // 若容量为空,等待notEmpty   
                while (count.get() == 0)   
                    notEmpty.await();   
            } catch (InterruptedException ie) {   
                notEmpty.signal(); // propagate to a non-interrupted thread   
                throw ie;   
            }   
  
            x = extract();   
            c = count.getAndDecrement();   
            // 再次激活notEmpty   
            if (c > 1)   
                notEmpty.signal();   
        } finally {   
            takeLock.unlock();   
        }   
        // take执行之前容量已满,则激活notFull   
        if (c == capacity)   
            signalNotFull();   
        return x;   
    }   
  
  ...   
}  


 

    ConcurrentLinkedQueue是一个无锁的queue实现,它采用了一种无锁算法(在API中有说明),相比于传统的同步的queue来说吞吐量可以大大提高,同时它也不同于BlockingQueue,并不单单提供阻塞操作。它主要的目的是通过采用无锁的算法,使得read/write操作均不需要对容器加锁,提高容器吞吐量

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐