java 并发容器ConcurrentHashMap与阻塞队列
ConcurrentHashMap简介ConcurrentHashMap是Java1.5中引用的一个线程安全的支持高并发的HashMap集合类。JDK1.8的ConcurrentHashMap 和JDK1.8的HashMap是很相似的。其中抛弃了原有的 Segment 分段锁,而采用了CAS + synchronized来保证并发安全性。ConcurrentHashMap存在的意义1、线程不安全的
ConcurrentHashMap简介
ConcurrentHashMap是Java1.5中引用的一个线程安全的支持高并发的HashMap集合类。JDK1.8的ConcurrentHashMap 和JDK1.8的HashMap是很相似的。其中抛弃了原有的 Segment 分段锁,而采用了 CAS + synchronized
来保证并发安全性。
ConcurrentHashMap存在的意义
1、线程不安全的HashMap
因为多线程环境下,使用Hashmap进行put操作会引起死循环,导致CPU利用率接近100%,所以在并发情况下不能使用HashMap。
2、效率低下的HashTable
HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable的效率非常低下。
因为当一个线程访问HashTable的同步方法时,其他线程访问HashTable的同步方法时,可能会进入阻塞或轮询状态。
如线程1使用put进行添加元素,线程2不但不能使用put方法添加元素,并且也不能使用get方法来获取元素,所以竞争越激烈效率越低。
相关比较
(1)你知道 HashMap 的工作原理吗?你知道 HashMap 的 get() 方法的工作原理吗?
HashMap 是基于 hashing 的原理,我们使用 put(key, value) 存储对象到 HashMap 中,使用 get(key) 从 HashMap 中获取对象。当我们给 put() 方法传递键和值时,我们先对键调用 hashCode() 方法,返回的 hashCode 用于找到 bucket 位置来储存 Entry 对象。
2)你知道 ConcurrentHashMap 的工作原理吗?你知道 ConcurrentHashMap 在 JAVA8 和 JAVA7 对比有哪些不同呢?
JAVA7之前ConcurrentHashMap主要采用锁机制,在对某个Segment进行操作时,将该Segment锁定,不允许对其进行非查询操作,而在JAVA8之后采用CAS无锁算法,这种乐观操作在完成前进行判断,如果符合预期结果才给予执行,对并发操作提供良好的优化
ConcurrentHashMap是一个线程安全的类,但是使用组合操作容易造成线程不安全的假象
package com.zhang.myjuc.a6.collections.concurrenthashmap;
import java.util.concurrent.ConcurrentHashMap;
/**
* OptionsNotSafe:组合操作并不保证线程安全
*
* @author zhangxiaoxiang
* @date 2020/08/19
*/
public class OptionsNotSafe implements Runnable{
private static ConcurrentHashMap<String, Integer> scores=new ConcurrentHashMap<>();
public static void main(String[] args) throws InterruptedException {
scores.put("长草颜团子",0);
Thread t1 =new Thread(new OptionsNotSafe());
Thread t2 = new Thread(new OptionsNotSafe());
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("分数是: "+scores);
}
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
//下面两行使用组合操作就不能保证线程安全了
// Integer score = scores.get("长草颜团子");
// Integer newScore=score+1;
// scores.put("长草颜团子",newScore);
//-----------------------------------------
//使用提供的这个方法就可以保证了
while (true){
Integer score = scores.get("长草颜团子");
Integer newScore=score+1;
boolean b = scores.replace("长草颜团子", score, newScore);
if (b){
break;
}
}
}
}
}
CopyOnWriteArrayList支持读取(遍历)时修改(删除和添加),而ArrayList不行
package com.zhang.myjuc.a6.collections.copyonwrite;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* CopyOnWriteArrayListDemo:
* 演示CopyOnWriteArrayList可以在迭代的过程中修改数组内容,但是ArrayList不行,对比
* 所有的可变操作(添加、设置等)都是通过复制底层数组的新副本来实现的。
*
* @author zhangxiaoxiang
* @date 2020/08/23
*/
public class CopyOnWriteArrayListDemo {
public static void main(String[] args) {
//切换注释,变换ArrayList<>() CopyOnWriteArrayList<>() 这里为了简介,所以写在一个测试类对比
// ArrayList<String> list = new ArrayList<>();
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("长草颜团子1号");
list.add("长草颜团子2号");
list.add("长草颜团子3号");
list.add("长草颜团子4号");
list.add("长草颜团子5号");
Iterator<String> iterator = list.iterator();
while (iterator.hasNext()) {
System.out.println("数据是(注意变化) ===>" + list);
//遍历时ArrayList<>()不可以修改,且(会抛异常 Exception in thread "main" java.util.ConcurrentModificationException)
// 但是CopyOnWriteArrayList<>()可以
String next = iterator.next();
System.out.println(next);
if ("长草颜团子3号".equals(next)) {
list.remove("长草颜团子2号");
}
if ("长草颜团子4号".equals(next)) {
list.add("长草颜团子6号");
}
}
}
}
不可以并发修改的ArrayList
可以并发修改的CopyOnWriteArrayList
队列 BlockingQueue简介
在java多线程操作中, BlockingQueue<E> 常用的一种方法之一。在看jdk内部尤其是一些多线程,大量使用了blockinkQueue 来做的
队列成员关系图谱
1.ArrayBlockingQueue是一个阻塞式的队列
继承自AbstractBlockingQueue,间接的实现了Queue接口和Collection接口。底层以数组的形式保存数据(实际上可看作一个循环数组)。常用的操作包括 add,offer,put,remove,poll,take,peek。
ArrayBlockingQueue 中的几个重要的方法。
- add(E e):把 e 加到 BlockingQueue 里,即如果 BlockingQueue 可以容纳,则返回 true,否则报异常
- offer(E e):表示如果可能的话,将 e 加到 BlockingQueue 里,即如果 BlockingQueue 可以容纳,则返回 true,否则返回 false
- put(E e):把 e 加到 BlockingQueue 里,如果 BlockQueue 没有空间,则调用此方法的线程被阻断直到 BlockingQueue 里面有空间再继续
- poll(time):取走 BlockingQueue 里排在首位的对象,若不能立即取出,则可以等 time 参数规定的时间,取不到时返回 null
- take():取走 BlockingQueue 里排在首位的对象,若 BlockingQueue 为空,阻断进入等待状态直到 Blocking 有新的对象被加入为止
- remainingCapacity():剩余可用的大小。等于初始容量减去当前的 size。
2.LinkedBlockingQueue
基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
作为开发者,我们需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。
3. DelayQueue
DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
使用场景:DelayQueue使用场景较少,但都相当巧妙,常见的例子比如使用一个DelayQueue来管理一个超时未响应的连接队列。
4. PriorityBlockingQueue
基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。
5. SynchronousQueue
一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。相对于有缓冲的BlockingQueue来说,少了一个中间经销商的环节(缓冲区),如果有经销商,生产者直接把产品批发给经销商,而无需在意经销商最终会将这些产品卖给那些消费者,由于经销商可以库存一部分商品,因此相对于直接交易模式,总体来说采用中间经销商的模式会吞吐量高一些(可以批量买卖);但另一方面,又因为经销商的引入,使得产品从生产者到消费者中间增加了额外的交易环节,单个产品的及时响应性能可能会降低。
以ArrayBlockingQueue为示例的代码
package com.zhang.myjuc.a6.collections.queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* ArrayBlockingQueueDemo:队列的基本使用
* 就拿生产者和消费者的模式来举例说明(比如面试场景)
*
* @author zhangxiaoxiang
* @date 2020/08/24
*/
public class ArrayBlockingQueueDemo {
public static void main(String[] args) {
//设置队内容积,相当于这里一次性最多能够同时安排3名候选人等待面试,注意多余3个就阻塞,相当于第4个准备排队,但是队内长度不计入
ArrayBlockingQueue<String> queue=new ArrayBlockingQueue<>(3);
Interviewer r1=new Interviewer(queue);
Consumer r2=new Consumer(queue);
new Thread(r1).start();
new Thread(r2).start();
}
}
/**
* 候选(待面试)人员 模拟生产者
*/
class Interviewer implements Runnable {
//把接口作为属性是内聚的一种常见实现形式,要实现数据互通,需要添加队内来实现
BlockingQueue<String> queue;
public Interviewer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
System.out.println("---------10个候选人员兴高采烈的来啦-------");
for (int i = 0; i < 10; i++) {
String candidate = "Candidate" + i;
try {
//入队 在队列的尾部插入指定的元素,如果队列已满,则等待可用的空间(所以看到有4个入队是可以的,前3个进入成功,第四个阻塞)
queue.put(candidate);
System.out.println( "候选人员 "+candidate+"~~~入队,等待面试"+",队列长度="+queue.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 结束标记
try {
queue.put("stop");
System.out.println("入队已经结束了哦");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 面试官 模拟消费者
*/
class Consumer implements Runnable {
//把接口作为属性是内聚的一种常见实现形式,要实现数据互通,需要添加队内来实现
BlockingQueue<String> queue;
public Consumer(BlockingQueue queue) {
this.queue = queue;
}
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String msg;
try {
//出队(注意出队完成的标志是stop)
while (!("stop".equals(msg = queue.take()))) {
// while(!(msg = queue.take()).equals("stop")){
System.out.println(msg + " ------出队成功,面试结束");
}
System.out.println("-----------面试结束,再会--------------");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行结果
更多推荐
所有评论(0)