1 并发容器概览

java.util.concurrent包提供的容器分为3类

  1. Concurrent*
  2. CopyOnWrite*
  3. Blocking*

  • Concurrent的特点是大部分通过CAS实现并发,而CopyOnWrite*则是通过复制一份原数据来实现的,Blocking*通过AQS实现的
  • ConcurrentHashMap:线程安全的HashMap
  • CopyOnWriteArrayList:线程安全的List
  • BlockingQueue:这是一个接口,表示阻塞队列,非常适合用于作为数据共享的通道
  • ConcurrentLinkedQueue:高效的非阻塞并发队列,使用链表实现。可以看做一个线程安全的 LinkedList
  • ConcurrentskipListMap:是一个Map,使用跳表的数据结构进行快速查找
  • ArrayList -> CopyOnWriteArrayList
  • HashSet -> CopyOnWriteArraySet
  • TreeSet -> ConcurrentSkipListSet
  • HashMap -> ConcurrentHashMap
  • TreeMap -> ConcurrentSkipListMap


2 趣说集合类的历史—古老和过时的同步容器

Vector和Hashtable

  • 早期JDK
  • 并发性能差
public class VectorDemo {
    public static void main(String[] args) {
        Vector<String> vector = new Vector<>();
        vector.add("test");
        System.out.println(vector.get(0));
    }
}
//java.util.Vector#get
public synchronized E get(int index) {
	//...
}

 Vector使用不当也是线程不安全的

//Vector使用不当也是线程不安全的
public class VectorExample {
    private static Vector<Integer> vector = new Vector<>();
    public static void main(String[] args) {
        while (true) {
            for (int i = 0; i < 10; i++) {
                vector.add(i);
            }
            Thread thread1 = new Thread(() -> {
                for (int i = 0; i < vector.size(); i++) {
                    vector.remove(i);
                }
            });
            Thread thread2 = new Thread(() -> {
                for (int i = 0; i < vector.size(); i++) {
                    vector.get(i);
                }
            });
            thread1.start();
            thread2.start();
        }
    }
}

 迭代过程中不允许修改

public class VectorExample {
    // java.util.ConcurrentModificationException
    private static void test1(Vector<Integer> v1) { // foreach
        for(Integer i : v1) {
            if (i.equals(3)) {
                v1.remove(i);
            }
        }
    }
    // java.util.ConcurrentModificationException
    private static void test2(Vector<Integer> v1) { // iterator
        Iterator<Integer> iterator = v1.iterator();
        while (iterator.hasNext()) {
            Integer i = iterator.next();
            if (i.equals(3)) {
                v1.remove(i);
            }
        }
    }
    // success
    private static void test3(Vector<Integer> v1) { // for
        for (int i = 0; i < v1.size(); i++) {
            if (v1.get(i).equals(3)) {
                v1.remove(i);
            }
        }
    }
    public static void main(String[] args) {
        Vector<Integer> vector = new Vector<>();
        vector.add(1);
        vector.add(2);
        vector.add(3);
        test1(vector);
        //test2(vector);
        //test3(vector);
    }
}

 

public class HashtableDemo {
    public static void main(String[] args) {
        Hashtable<String, String> hashtable = new Hashtable<>();
        hashtable.put("学完以后跳槽涨薪幅度", "80%");
        System.out.println(hashtable.get("学完以后跳槽涨薪幅度"));
    }
}
//java.util.Hashtable#put
public synchronized V put(K key, V value) {
	//...
}

ArrayList和HashMap

  • 虽然这两个类不是线程安全的,但是可以用Collections.synchronizedList(new ArrayList<E>())和Collections.synchronizedMap(new HashMap<K, V>())使之变成线程安全的

/**
 * 演示Collections.synchronizedList(new ArrayList<E>())
 */
public class SynList {
    public static void main(String[] args) {
        List<Integer> list = Collections.synchronizedList(new ArrayList<Integer>());
        list.add(5);
        System.out.println(list.get(0));

        Map<Object, Object> objectObjectMap = Collections.synchronizedMap(new HashMap<>());
    }
}
public class ArrayList<E> extends AbstractList<E>
        implements List<E>, RandomAccess, Cloneable, java.io.Serializable

//java.util.Collections#synchronizedList(java.util.List<T>)
public static <T> List<T> synchronizedList(List<T> list) {
	return (list instanceof RandomAccess ?
			new SynchronizedRandomAccessList<>(list) :
			new SynchronizedList<>(list));
}

static class SynchronizedRandomAccessList<E>
	extends SynchronizedList<E>
	implements RandomAccess {
	//java.util.Collections.SynchronizedRandomAccessList#SynchronizedRandomAccessList(java.util.List<E>)
	SynchronizedRandomAccessList(List<E> list) {
		super(list);
	}

//synchronized没有加在方法上,但是效率也没有太大的提高
//java.util.Collections.SynchronizedList#get
public E get(int index) {
	synchronized (mutex) {return list.get(index);}
}
public E set(int index, E element) {
	synchronized (mutex) {return list.set(index, element);}
}

ConcurrentHashMap和CopyOnWriteArrayList

  • 取代同步的HashMap和同步的ArrayList(时代巨轮滚滚向前)
  • 绝大多数并发情况下,ConcurrentHashMap和CopyOnWriteArrayList的性能都更好

3 ConcurrentHashMap(重点、面试常考)

3.1 Map简介

磨刀不误砍柴工:Map简介

  • HashMap
  • Hashtable
  • LinkedHashMap
  • TreeMap

3.2 为什么HashMap是线程不安全

为什么HashMap是线程不安全的?

  • 同时put碰撞导致数据丢失
  • 同时put扩容导致数据丢失
  • 死循环造成的CPU100%(HashMap在高并发下的死循环(仅在JDK7及以前存在))

HashMap关于并发的特点

  • 非线程安全
  • 迭代时不允许修改内容
  • 只读的并发是安全的
  • 如果一定要把HashMap用在并发环境,用Collections.synchronizedMap(new HashMap())

3.3 HashMap的分析

红黑树:https://blog.csdn.net/qq_40794973/article/details/103098463

  • 二叉查找树BST的一种平衡策略,O(logN) vs O(N)

  • 会自动平衡,防止极端不平衡从而影响查找效率的情况发生


  • 每个节点要么是红色,要么是黑色,但根节点永远是黑色的
  • 红色节点不能连续(也即是,红色节点的孩子和父亲都不能是红色
  • 从任一节点到其子树中每个叶子节点的路径都包含相同数量的黑色节点
  • 所有的叶子节点都是黑色

3.4 JDK1.7的ConcurrentHashMap实现和分析

  • Java 7中的ConcurrentHashMap最外层是多个segment,每个segment的底层数据结构与HashMap类似,仍然是数组和链表组成的拉链法
  • 每个segment独立上ReentrantLock锁,每个segment之间互不影响,提高了并发效率
  • ConcurrentHashMap默认有16个Segments,所以最多可以同时支持16个线程并发写(操作分别分布在不同的Segment上)。这个默认值可以在初始化的时候设置为其他值,但是一旦初始化以后,是不可以扩容的

3.5 JDK1.8的ConcurrentHashMap实现和源码分析

//java.util.concurrent.ConcurrentHashMap#putVal
final V putVal(K key, V value, boolean onlyIfAbsent) {
//get
public V get(Object key) {
	Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
	int h = spread(key.hashCode());
	if ((tab = table) != null && (n = tab.length) > 0 &&
		(e = tabAt(tab, (n - 1) & h)) != null) {
		if ((eh = e.hash) == h) {
			if ((ek = e.key) == key || (ek != null && key.equals(ek)))
				return e.val;
		}
		else if (eh < 0)
			return (p = e.find(h, key)) != null ? p.val : null;
		while ((e = e.next) != null) {
			if (e.hash == h &&
				((ek = e.key) == key || (ek != null && key.equals(ek))))
				return e.val;
		}
	}
	return null;
}

putVal流程

  • 判断key value不为空
  • 计算hash值
  • 根据对应位置节点的类型,来赋值,或者 helpTransfer,或者增长链表,或者给红黑树增加节点
  • 检查满足阈值就“红黑树化”
  • 返回 oldVal

get流程

  • 计算hash值
  • 找到对应的位置,根据情况进行:
  •               直接取值
  •               红黑树里找值
  •               遍历链表取值
  •               返回找到的结果

3.6 为什么要把1.7的结构改成1.8的结构?

对比JDK1.7和1.8的优缺点,为什么要把1.7的结构改成1.8的结构?

  • 数据结构
  • Hash碰撞
  • 保证并发安全
  • 查询复杂度

为什么超过8要转为红黑树?

  • 链表占用空间更小
 * first values are:
 *
 * 0:    0.60653066
 * 1:    0.30326533
 * 2:    0.07581633
 * 3:    0.01263606
 * 4:    0.00157952
 * 5:    0.00015795
 * 6:    0.00001316
 * 7:    0.00000094
 * 8:    0.00000006
 * more: less than 1 in ten million

3.7 组合操作:ConcurrentHashMap也不是线程安全的?

/**
 * 组合操作并不保证线程安全
 */
public class OptionsNotSafe implements Runnable {
    private static ConcurrentHashMap<String, Integer> scores = new ConcurrentHashMap<>();
    public static void main(String[] args) throws InterruptedException {
        scores.put("小明", 0);
        Thread t1 = new Thread(new OptionsNotSafe());
        Thread t2 = new Thread(new OptionsNotSafe());
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        System.out.println(scores);
    }
    @Override
    public void run() {
        只能保证同时get或者同时put是线程安全的
        //for (int i = 0; i < 1000; i++) {
        //    //synchronized (OptionsNotSafe.class){
        //        //get + put 无法保证线程安全
        //        Integer score = scores.get("小明");
        //        Integer newScore = score + 1;
        //        scores.put("小明", newScore);
        //    //}
        //}

        for (int i = 0; i < 1000; i++) {
            while (true) {
                Integer score = scores.get("小明");
                Integer newScore = score + 1;
                //key 旧的值:希望当前的值是多少 新的值
                boolean b = scores.replace("小明", score, newScore);
                if (b) {
                    break;
                }
            }
        }
    }
}


4 ConcurrentSkipListSet

TreeSet -> ConcurrentSkipListSet

ConcurrentSkipListSet

  • contains、remove、add是线程安全的
  • addAll、containsAll、removeAll、retainAll 等批量操作并不能保证以原子的方式执行
Set<Integer> set = new ConcurrentSkipListSet<>();
CopyOnWriteArraySet<Integer> copyOnWriteArraySet = new CopyOnWriteArraySet<>();
// java.util.concurrent.CopyOnWriteArraySet#CopyOnWriteArraySet()
public CopyOnWriteArraySet() {
	al = new CopyOnWriteArrayList<E>();
}

5 CopyOnWriteArrayList

ArrayList -> CopyOnWriteArrayList

HashSet -> CopyOnWriteArraySet

Set<Integer> set = new CopyOnWriteArraySet<>();
List<Integer> list = new CopyOnWriteArrayList<>();

无法保证时实一致性,但能保证最终一致性

5.1 诞生的历史和原因

  • 代替Vector和SynchronizedList,就和ConcurrentHashMap代替SynchronizedMap的原因一样
  • Vector和SynchronizedList的锁的粒度太大,并发效率相对比较低,并且迭代时无法编辑
  • Copy-On-Write并发容器还包括CopyOnWriteArraySet,用来替代同步Set

5.2 适用场景

  • 读操作可以尽可能地快,而写即使慢一些也没有太大关系
  • 读多写少:黑名单,每日更新;监听器:迭代操作远多于修改操作

5.3 读写规则

  • 回顾读写锁:读读共享、其他都互斥(写写互斥、读写互斥、写读互斥)
  • 读写锁规则的升级:读取是完全不用加锁的,并且更厉害的是,写入也不会阻塞读取操作。只有写入和写入之间需要进行同步等待
/**
 * 演示CopyOnWriteArrayList可以在迭代的过程中修改数组内容,但是ArrayList不行,对比
 */
public class CopyOnWriteArrayListDemo1 {
    public static void main(String[] args) {
        //ConcurrentModificationException
        //ArrayList<String> list = new ArrayList<>();
        CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
        list.add("1");
        list.add("2");
        list.add("3");
        list.add("4");
        list.add("5");//注意最后一个输出
        Iterator<String> itr1 = list.iterator();
        Iterator<String> itr2 = list.iterator();
        while (itr1.hasNext()) {
            System.out.println("list is" + list);
            String next = itr1.next();
            System.out.println(next);
            if ("2".equals(next)) {
                list.remove("5");
            }
            if ("3".equals(next)) {
                list.add("鸡你太美");
            }
        }
        System.out.println();
        itr2.forEachRemaining((item) -> {
            System.out.print(item + " ");
        });
    }
}

5.4 实现原理

  • CopyOnWrite的含义:拷贝一份,对副本进行修改,然后把引用指向副本
  • 创建新副本、读写分离
  • “不可变”原理

迭代的时候

//java.util.ArrayList.Itr#next
public E next() {
	checkForComodification();
	int i = cursor;
	if (i >= size)
		throw new NoSuchElementException();
	Object[] elementData = ArrayList.this.elementData;
	if (i >= elementData.length)
		throw new ConcurrentModificationException();
	cursor = i + 1;
	return (E) elementData[lastRet = i];
}

//java.util.ArrayList.Itr
private class Itr implements Iterator<E> {
	//创建迭代器的时候保存了
	int expectedModCount = modCount;

//修改一次modCount加一
//java.util.ArrayList.Itr#checkForComodification
final void checkForComodification() {
	if (modCount != expectedModCount)
		throw new ConcurrentModificationException();
}
/**
 * 对比两个迭代器
 */
public class CopyOnWriteArrayListDemo2 {
    public static void main(String[] args) throws InterruptedException {
        CopyOnWriteArrayList<Integer> list = new CopyOnWriteArrayList<>(new Integer[]{1, 2, 3});
        System.out.println(list);
        //itr1能拿到的数据却决于它的诞生时间,而不取决于它的迭代时间
        Iterator<Integer> itr1 = list.iterator();
        list.remove(2);
        Thread.sleep(1000);
        System.out.println(list);
        Iterator<Integer> itr2 = list.iterator();

        itr1.forEachRemaining((item) -> {
            System.out.print(item + " ");
        });
        System.out.println();
        itr2.forEachRemaining((item) -> {
            System.out.print(item + " ");
        });
    }
}

5.5 缺点

  • 数据一致性问题:CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的数据,马上能读到,请不要使用CopyOnWrite容器。
  • 内存占用问题:因为CopyOnWrite的写是复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存。

5.6 源码分析

//java.util.concurrent.CopyOnWriteArrayList#add(E)
public boolean add(E e) {
	final ReentrantLock lock = this.lock;
	lock.lock();
	try {
		Object[] elements = getArray();
		int len = elements.length;
		//!!!复制
		Object[] newElements = Arrays.copyOf(elements, len + 1);
		//数组末尾插入元素
		newElements[len] = e;
		//指向这个新的引用
		setArray(newElements);
		return true;
	} finally {
		lock.unlock();
	}
}

//java.util.concurrent.CopyOnWriteArrayList#get(int)
public E get(int index) {
	return get(getArray(), index);
}
//java.util.concurrent.CopyOnWriteArrayList#get(java.lang.Object[], int)
private E get(Object[] a, int index) {
	return (E) a[index];
}
// 1.线程不安全
//List<String> list = new ArrayList<>();
//for (int i = 0; i < 100; i++) {
//    new Thread(() -> {
//        list.add(UUID.randomUUID().toString().substring(0, 3));
//        list.add(UUID.randomUUID().toString().substring(0, 3));
//        list.add(UUID.randomUUID().toString().substring(0, 3));
//        System.out.println(list);
//    }, String.valueOf(i)).start();
//}

// 2.线程安全
//List<String> list = Collections.synchronizedList(new ArrayList<>());
//for (int i = 0; i < 100; i++) {
//    new Thread(() -> {
//        list.add(UUID.randomUUID().toString().substring(0, 3));
//        list.add(UUID.randomUUID().toString().substring(0, 3));
//        list.add(UUID.randomUUID().toString().substring(0, 3));
//        System.out.println(list);
//    }, String.valueOf(i)).start();
//}

// 3.线程安全
//List<String> list = new ArrayList<>();
//List<String> proxy = (List<String>) Proxy.newProxyInstance(ArrayList.class.getClassLoader(), ArrayList.class.getInterfaces(), new InvocationHandler() {
//    @Override
//    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//        synchronized (ArrayListDemo.class){
//            return method.invoke(list, args);
//        }
//    }
//});
//for (int i = 0; i < 100; i++) {
//    new Thread(() -> {
//        proxy.add(UUID.randomUUID().toString().substring(0, 3));
//        proxy.add(UUID.randomUUID().toString().substring(0, 3));
//        proxy.add(UUID.randomUUID().toString().substring(0, 3));
//        System.out.println(proxy);
//    }, String.valueOf(i)).start();
//}

// 4.线程安全
List<String> list = new CopyOnWriteArrayList<>();
for (int i = 0; i < 100; i++) {
    new Thread(() -> {
        list.add(UUID.randomUUID().toString().substring(0, 3));
        list.add(UUID.randomUUID().toString().substring(0, 3));
        list.add(UUID.randomUUID().toString().substring(0, 3));
        System.out.println(list);
    }, String.valueOf(i)).start();
}

6 并发队列Queue(阻塞队列、非阻塞队列)

6.1 为什么要使用队列

  • 用队列可以在线程间传递数据:生产者消费者模式、银行转账
  • 考虑锁等线程安全问题的重任从 “你” 转移到了 “队列” 上

6.2 并发队列简介

  • Queue
  • BlockingQueue

6.3 各并发队列关系图

6.4 阻塞队列BlockingQueue

Java类库中的队列:https://blog.csdn.net/qq_40794973/article/details/102996643

6.4.1 什么是阻塞队列

简介、地位

  • 阻塞队列是具有阻塞功能的队列,所以它首先是一个队列,其次是具有阻塞功能。
  • 通常,阻塞队列的一端是给生产者放数据用,另一端给消费者拿数据用。阻塞队列是线程安全的,所以生产者和消费者都可以是多线程的

阻塞功能:最有特色的两个带有阻塞功能的方法是

  • take()方法:获取并移除队列的头结点,一旦如果执行take的时候,队列里无数据,则阻塞,直到队列里有数据
  • put()方法:插入元素。但是如果队列已满,那么就无法继续插入,则阻塞,直到队列里有了空闲空间
  • 是否有界(容量有多大):这是一个非常重要的属性,无界队列意味着里面可以容纳非常多(Integer.MAX_VALUE,约为2的31次,是非常大的一个数,可以近似认为是无限容量)
  • 阻塞队列和线程池的关系:阻塞队列是线程池的重要组成部分

 

6.4.2 主要方法介绍

方法类型抛出异常特殊值阻寒超时
插入add(e)offer(e)put(e)offer(e, timeout, unit)
移除remove()poll()take()poll(timeout, unit) 
检查element()peek()  

6.4.3 ArrayBlockingQueue

  • 有界
  • 指定容量
  • 公平:还可以指定是否需要保证公平,如果想保证公平的话,那么等待了最长时间的线程会被优先处理,不过这会同时带来一定的性能损耗

使用案例

  • 有10个面试者,一共只有1个面试官,大厅里有3个位子供面试者休息,每个人的面试时间是10秒,模拟所有人面试的场景
public class ArrayBlockingQueueDemo {
    public static void main(String[] args) {
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
        Interviewer produce = new Interviewer(queue);
        Consumer consumer = new Consumer(queue);
        new Thread(produce).start();
        new Thread(consumer).start();
    }
}
/**
 * 生产者(面试官)
 */
class Interviewer implements Runnable {
    BlockingQueue<String> queue;
    public Interviewer(BlockingQueue queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        System.out.println("10个候选人都来啦");
        for (int i = 0; i < 10; i++) {
            String candidate = "Candidate" + i;
            try {
                queue.put(candidate);
                System.out.println("安排好了" + candidate);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        try {
            queue.put("stop");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
/**
 * 消费者
 */
class Consumer implements Runnable {
    BlockingQueue<String> queue;
    public Consumer(BlockingQueue queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        String msg;
        try {
            while(!"stop".equals(msg = queue.take())){
                System.out.println(msg + "到了");
            }
            System.out.println("所有候选人都结束了");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 源码分析:put方法

//java.util.concurrent.ArrayBlockingQueue#put
public void put(E e) throws InterruptedException {
	checkNotNull(e);
	final ReentrantLock lock = this.lock;
	lock.lockInterruptibly();
	try {
		while (count == items.length)
			notFull.await();
		enqueue(e);
	} finally {
		lock.unlock();
	}
}

//java.util.concurrent.ArrayBlockingQueue#take
public E take() throws InterruptedException {
	final ReentrantLock lock = this.lock;
	lock.lockInterruptibly();
	try {
		while (count == 0)
			notEmpty.await();
		return dequeue();
	} finally {
		lock.unlock();
	}
}

6.4.4 LinkedBlockingQueue

  • 无界
  • 容量Integer.MAX_VALUE
  • 内部结构:Node、两把锁

分析put方法

//java.util.concurrent.LinkedBlockingQueue#takeLock
private final ReentrantLock takeLock = new ReentrantLock();
//java.util.concurrent.LinkedBlockingQueue#putLock
private final ReentrantLock putLock = new ReentrantLock();
//java.util.concurrent.LinkedBlockingQueue#put
public void put(E e) throws InterruptedException {
	if (e == null) throw new NullPointerException();
	int c = -1;
	Node<E> node = new Node<E>(e);
	final ReentrantLock putLock = this.putLock;
	final AtomicInteger count = this.count;
	putLock.lockInterruptibly();
	try {
		while (count.get() == capacity) {
			notFull.await();
		}
		enqueue(node);
		c = count.getAndIncrement();
		if (c + 1 < capacity)
			//唤醒一个
			notFull.signal();
	} finally {
		putLock.unlock();
	}
	if (c == 0)
		signalNotEmpty();
}

6.4.5 PriorityBlockingQueue

https://blog.csdn.net/qq_40794973/article/details/102880394

  • 支持优先级
  • 自然顺序(而不是先进先出)
  • 无界队列
  • PriorityQueue的线程安全版本

6.4.6 SynchronousQueue

  • 它的容量为0
  • 需要注意的是,SynchronousQueue的容量不是1而是0,因为SynchronousQueue不需要去持有元素,它所做的就是直接传递(direct handoff)
  • 效率很高

SynchronousQueue注意点

  1. SynchronousQueue没有peek等函数,因为peek的含义是取出头结点,但是SynchronousQueue的容量是0,所以连头结点都没有,也就没有peek方法。同理,没有iterate相关方法
  2. 是一个极好的用来直接传递的并发数据结构
  3. SynchronousQueue是线程池Executors.newCachedThreadPool()使用的阻塞队列

public class Main {
    public static void main(String[] args) {
        SynchronousQueue<String> synchronousQueue = new SynchronousQueue<>();
        new Thread(() -> {
            for (int i = 0; i < 3; i++) {
                try {
                    synchronousQueue.put(String.valueOf(i));
                    System.out.println(Thread.currentThread().getName() + " put " + i + ".");
                } catch (InterruptedException ignored) {

                }
            }
        }, "AAA").start();
        new Thread(() -> {
            for (int i = 0; i < 3; i++) {
                try {
                    String v = synchronousQueue.take();
                    System.out.println(Thread.currentThread().getName() + " take " + v + ".");
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException ignored) {
                }
            }
        }, "BBB").start();
    }
}
//BBB take 0.
//AAA put 0.
//AAA put 1.
//BBB take 1.
//BBB take 2.
//AAA put 2.

6.4.7 DelayQueue

  • 无界队列
  • 延迟队列,根据延迟时间排序
  • 元素需要实现Delayed接口,规定排序规则

6.5 非阻塞并发队列

  • 并发包中的非阻塞队列只有ConcurrentLinkedQueue这一种,顾名思义ConcurrentLinkedQueue是使用链表作为其数据结构的,使用CAS非阻塞算法来实现线程安全(不具备阻塞功能),适合用在对性能要求较高的并发场景。用的相对比较少
  • 看源码的offer方法的CAS思想,内有p.casNext方法,用了UNSAFE.compareAndSwapObject
//java.util.concurrent.ConcurrentLinkedQueue#offer
public boolean offer(E e) {
	checkNotNull(e);
	final Node<E> newNode = new Node<E>(e);
	for (Node<E> t = tail, p = t;;) {
		Node<E> q = p.next;
		if (q == null) {
			// p is last node
			if (p.casNext(null, newNode)) {
				if (p != t)
					casTail(t, newNode); 
				return true;
			}
		}
		else if (p == q)
			p = (t != (t = tail)) ? t : head;
		else
			p = (p != t && t != (t = tail)) ? t : q;
	}
}
//java.util.concurrent.ConcurrentLinkedQueue.Node#casNext
boolean casNext(Node<E> cmp, Node<E> val) {
	return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}

6.6 如何选择适合自己的队列

  • 边界
  • 空间
  • 吞吐量

7 面试

7.1 死循环造成的CPU100%

  • HashMap在高并发下的死循环(仅在JDK7及以前存在)
  • 现象:在多线程同时扩容的时候,可能会造成循环链表,导致CPU100%
  • 代码演示
  • 原理:CoolShell有分析,面试无意义,Java之父都不会
  • JAVA HASHMAP的死循环:https://coolshell.cn/articles/9606.html

/**
 * 演示HashMap在多线程情况下造成死循环的情况
 */
public class HashMapEndlessLoop {
    private static HashMap<Integer, String> map = new HashMap<Integer, String>(2, 1.5f);
    public static void main(String[] args) {
        map.put(5,"C");
        map.put(7,"B");
        map.put(3,"A");
        new Thread(new Runnable() {
            @Override
            public void run() {
                map.put(15,"D");
                System.out.println(map);
            }
        },"Thread 1").start();
        new Thread(new Runnable(){
            @Override
            public void run() {
                map.put(1,"E");
                System.out.println(map);
            }
        },"Thread 2").start();
    }
}

transfer

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐