多线程与高并发(六):线程池可用的各种高并发容器详解:CopyOnWriteList,BlockingQueue等
容器物理结构:数组、链表逻辑结构:很多Queue主要是为高并发准备的。Vector HashtableVector Hashtable 自带锁,有很多设计上不完善的地方,现在基本上不用。测试Hashtable的性能:用100的线程,先put进去1000000个数,再get 1000000个数package com.mashibing.juc.c_023_02_FromHashtableToCHM;
容器
物理结构:数组、链表
逻辑结构:很多
Queue主要是为高并发准备的。
Vector Hashtable
Vector Hashtable 自带锁,有很多设计上不完善的地方,现在基本上不用。
测试Hashtable的性能:用100的线程,先put进去1000000个数,再get 1000000个数
package com.mashibing.juc.c_023_02_FromHashtableToCHM;
import java.util.Hashtable;
import java.util.UUID;
public class T01_TestHashtable {
static Hashtable<UUID, UUID> m = new Hashtable<>();
static int count = Constants.COUNT;
static UUID[] keys = new UUID[count];
static UUID[] values = new UUID[count];
static final int THREAD_COUNT = Constants.THREAD_COUNT;
static {
for (int i = 0; i < count; i++) {
keys[i] = UUID.randomUUID();
values[i] = UUID.randomUUID();
}
}
static class MyThread extends Thread {
int start;
int gap = count / THREAD_COUNT;
public MyThread(int start) {
this.start = start;
}
@Override
public void run() {
for (int i = start; i < start + gap; i++) {
m.put(keys[i], values[i]);
}
}
}
public static void main(String[] args) {
long start = System.currentTimeMillis();
Thread[] threads = new Thread[THREAD_COUNT];
for (int i = 0; i < threads.length; i++) {
threads[i] = new MyThread(i * (count / THREAD_COUNT));
}
for (Thread t : threads) {
t.start();
}
for (Thread t : threads) {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long end = System.currentTimeMillis();
System.out.println(end - start);
System.out.println("size=" + m.size());
//---------------上面是测试put,下面是测试get 10000000次的第10的元素--------------------
start = System.currentTimeMillis();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 10000000; j++) {
m.get(keys[10]);
}
});
}
for (Thread t : threads) {
t.start();
}
for (Thread t : threads) {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
end = System.currentTimeMillis();
System.out.println(end - start);
}
}
直接使用Hashmap,多线程时会出现问题
package com.mashibing.juc.c_023_02_FromHashtableToCHM;
import java.util.HashMap;
import java.util.UUID;
public class T02_TestHashMap {
static HashMap<UUID, UUID> m = new HashMap<>();
static int count = Constants.COUNT;
static UUID[] keys = new UUID[count];
static UUID[] values = new UUID[count];
static final int THREAD_COUNT = Constants.THREAD_COUNT;
static {
for (int i = 0; i < count; i++) {
keys[i] = UUID.randomUUID();
values[i] = UUID.randomUUID();
}
}
static class MyThread extends Thread {
int start;
int gap = count/THREAD_COUNT;
public MyThread(int start) {
this.start = start;
}
@Override
public void run() {
for(int i=start; i<start+gap; i++) {
m.put(keys[i], values[i]);
}
}
}
public static void main(String[] args) {
long start = System.currentTimeMillis();
Thread[] threads = new Thread[THREAD_COUNT];
for(int i=0; i<threads.length; i++) {
threads[i] =
new MyThread(i * (count/THREAD_COUNT));
}
for(Thread t : threads) {
t.start();
}
for(Thread t : threads) {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long end = System.currentTimeMillis();
System.out.println(end - start);
System.out.println(m.size());
}
}
使用SynchronizedHashMap,效率与直接使用Hashtable区别不是很大
package com.mashibing.juc.c_023_02_FromHashtableToCHM;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
public class T03_TestSynchronizedHashMap {
static Map<UUID, UUID> m = Collections.synchronizedMap(new HashMap<UUID, UUID>());
static int count = Constants.COUNT;
static UUID[] keys = new UUID[count];
static UUID[] values = new UUID[count];
static final int THREAD_COUNT = Constants.THREAD_COUNT;
static {
for (int i = 0; i < count; i++) {
keys[i] = UUID.randomUUID();
values[i] = UUID.randomUUID();
}
}
static class MyThread extends Thread {
int start;
int gap = count/THREAD_COUNT;
public MyThread(int start) {
this.start = start;
}
@Override
public void run() {
for(int i=start; i<start+gap; i++) {
m.put(keys[i], values[i]);
}
}
}
public static void main(String[] args) {
long start = System.currentTimeMillis();
Thread[] threads = new Thread[THREAD_COUNT];
for(int i=0; i<threads.length; i++) {
threads[i] =
new MyThread(i * (count/THREAD_COUNT));
}
for(Thread t : threads) {
t.start();
}
for(Thread t : threads) {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long end = System.currentTimeMillis();
System.out.println(end - start);
System.out.println(m.size());
//-----------------------------------
start = System.currentTimeMillis();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(()->{
for (int j = 0; j < 10000000; j++) {
m.get(keys[10]);
}
});
}
for(Thread t : threads) {
t.start();
}
for(Thread t : threads) {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
end = System.currentTimeMillis();
System.out.println(end - start);
}
}
使用ConcurrentHashMap,插入的效率与前面的Hashtable差不多,但是读取的效率非常高。
package com.mashibing.juc.c_023_02_FromHashtableToCHM;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
public class T04_TestConcurrentHashMap {
static Map<UUID, UUID> m = new ConcurrentHashMap<>();
static int count = Constants.COUNT;
static UUID[] keys = new UUID[count];
static UUID[] values = new UUID[count];
static final int THREAD_COUNT = Constants.THREAD_COUNT;
static {
for (int i = 0; i < count; i++) {
keys[i] = UUID.randomUUID();
values[i] = UUID.randomUUID();
}
}
static class MyThread extends Thread {
int start;
int gap = count/THREAD_COUNT;
public MyThread(int start) {
this.start = start;
}
@Override
public void run() {
for(int i=start; i<start+gap; i++) {
m.put(keys[i], values[i]);
}
}
}
public static void main(String[] args) {
long start = System.currentTimeMillis();
Thread[] threads = new Thread[THREAD_COUNT];
for(int i=0; i<threads.length; i++) {
threads[i] =
new MyThread(i * (count/THREAD_COUNT));
}
for(Thread t : threads) {
t.start();
}
for(Thread t : threads) {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long end = System.currentTimeMillis();
System.out.println(end - start);
System.out.println(m.size());
//-----------------------------------
start = System.currentTimeMillis();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(()->{
for (int j = 0; j < 10000000; j++) {
m.get(keys[10]);
}
});
}
for(Thread t : threads) {
t.start();
}
for(Thread t : threads) {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
end = System.currentTimeMillis();
System.out.println(end - start);
}
}
最早的容器Vector是自带锁的,但是你整个操作调用了两个原子方法的话,整体并不是原子的。你还需要在外面加sync
/**
* 有N张火车票,每张票都有一个编号
* 同时有10个窗口对外售票
* 请写一个模拟程序
* <p>
* 分析下面的程序可能会产生哪些问题?
* 重复销售?超量销售?
* <p>
* 使用Vector或者Collections.synchronizedXXX
* 分析一下,这样能解决问题吗?
* <p>
* 就算操作A和B都是同步的,但A和B组成的复合操作也未必是同步的,仍然需要自己进行同步
* 就像这个程序,判断size和进行remove必须是一整个的原子操作
*
* @author 马士兵
*/
package com.mashibing.juc.c_024_FromVectorToQueue;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class TicketSeller3 {
static List<String> tickets = new LinkedList<>();
static {
for (int i = 0; i < 1000; i++) tickets.add("票 编号:" + i);
}
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
while (true) {
synchronized (tickets) {
if (tickets.size() <= 0) break;
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("销售了--" + tickets.remove(0));
}
}
}).start();
}
}
}
使用Queue:ConcurrentLinkedQueue,里面很多方法是CAS实现的
/**
* 有N张火车票,每张票都有一个编号
* 同时有10个窗口对外售票
* 请写一个模拟程序
* 分析下面的程序可能会产生哪些问题?
* 重复销售?超量销售?
* 使用Vector或者Collections.synchronizedXXX
* 分析一下,这样能解决问题吗?
* 就算操作A和B都是同步的,但A和B组成的复合操作也未必是同步的,仍然需要自己进行同步
* 就像这个程序,判断size和进行remove必须是一整个的原子操作
* 使用ConcurrentQueue提高并发性
*/
package com.mashibing.juc.c_024_FromVectorToQueue;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
public class TicketSeller4 {
static Queue<String> tickets = new ConcurrentLinkedQueue<>();
static {
for (int i = 0; i < 1000; i++) tickets.add("票 编号:" + i);
}
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
while (true) {
String s = tickets.poll();
if (s == null) break;
else System.out.println("销售了--" + s);
}
}).start();
}
}
}
多线程常用的容器
ConcurrentHashMap
里面用的是CAS操作,而CAS在Tree操作的时候太复杂了,所以不存在ConcurrentTreeMap,为了排序,换了跳表的结构代替Tree结构
跳表
- 底层是链表
- 拿出关键元素新开一层
- 在查找的时候,从上往下查
- CAS的实现难度比TreeMap容易很多
- 查找操作的时间复杂度比链表快很多
/**
* http://blog.csdn.net/sunxianghuang/article/details/52221913
* http://www.educity.cn/java/498061.html
* 阅读concurrentskiplistmap
*/
package com.mashibing.juc.c_025;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
public class T01_ConcurrentMap {
public static void main(String[] args) {
Map<String, String> map = new ConcurrentHashMap<>();
//Map<String, String> map = new ConcurrentSkipListMap<>(); //高并发并且排序
//Map<String, String> map = new Hashtable<>();
//Map<String, String> map = new HashMap<>(); //Collections.synchronizedXXX
//TreeMap
Random r = new Random();
Thread[] ths = new Thread[100];
CountDownLatch latch = new CountDownLatch(ths.length);
long start = System.currentTimeMillis();
for(int i=0; i<ths.length; i++) {
ths[i] = new Thread(()->{
for(int j=0; j<10000; j++) map.put("a" + r.nextInt(100000), "a" + r.nextInt(100000));
latch.countDown();
});
}
Arrays.asList(ths).forEach(t->t.start());
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
System.out.println(end - start);
System.out.println(map.size());
}
}
CopyOnWriteArrayList
本质上和ReadWrite是一个思路
写时复制,适用于读线程多,写线程少的情况。(读的时候不加锁),写的时候copy一个新的,写完之后把旧的指针指向新的。
写的效率比较低,因为是数组,每次写的时候都要复制。
add操作的源码如下:
示例
/**
* 写时复制容器 copy on write
* 多线程环境下,写时效率低,读时效率高
* 适合写少读多的环境
*/
package com.mashibing.juc.c_025;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.Vector;
import java.util.concurrent.CopyOnWriteArrayList;
public class T02_CopyOnWriteList {
public static void main(String[] args) {
List<String> lists =
//new ArrayList<>(); //这个会出并发问题!
//new Vector();
new CopyOnWriteArrayList<>();
Random r = new Random();
Thread[] ths = new Thread[100];
for (int i = 0; i < ths.length; i++) {
Runnable task = new Runnable() {
@Override
public void run() {
for (int i = 0; i < 1000; i++) lists.add("a" + r.nextInt(10000));
}
};
ths[i] = new Thread(task);
}
runAndComputeTime(ths);
System.out.println(lists.size());
}
static void runAndComputeTime(Thread[] ths) {
long s1 = System.currentTimeMillis();
Arrays.asList(ths).forEach(t -> t.start());
Arrays.asList(ths).forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long s2 = System.currentTimeMillis();
System.out.println(s2 - s1);
}
}
代码T05-T09:BlockingQueue讲解
Queue和List的区别是什么?
- 提供了很多在多线程访问下比较友好的API
- offer,peek,pool
BlockingQueue的特点是什么?
BlockingQueue的优势在于,增加了更多API,比如put,take
或者阻塞,或者指定时间等待
实现生产者-消费者模型,也是多线程里面最重要的一个模型,也是MQ的基础——MQ的本质,就是一个大型的生产者、消费者模型
LinkedBlockingQueue
LinkedBlockingQueue,是用链表实现的BlockingQueue
阻塞使用await实现的,底层应该是park
Queue常用接口
LinkedBlockingQueue示例
package com.mashibing.juc.c_025;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class T05_LinkedBlockingQueue {
static BlockingQueue<String> strs = new LinkedBlockingQueue<>();
static Random r = new Random();
public static void main(String[] args) {
new Thread(() -> {
for (int i = 0; i < 100; i++) {
try {
strs.put("a" + i); //如果满了,就会等待
TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "p1").start();
for (int i = 0; i < 5; i++) {
new Thread(() -> {
for (; ; ) {
try {
System.out.println(Thread.currentThread().getName() + " take -" + strs.take()); //如果空了,就会等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "c" + i).start();
}
}
}
DelayQueue
是BlockingQueue的一种,是一种阻塞的队列。
需要实现compareTo方法
需要指定等待时间
用来按时间进行任务调度
package com.mashibing.juc.c_025;
import java.util.Calendar;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class T07_DelayQueue {
static BlockingQueue<MyTask> tasks = new DelayQueue<>();
static Random r = new Random();
static class MyTask implements Delayed {
String name;
long runningTime;
MyTask(String name, long rt) {
this.name = name;
this.runningTime = rt;
}
@Override
public int compareTo(Delayed o) {
if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
return -1;
else if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))
return 1;
else
return 0;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public String toString() {
return name + " " + runningTime;
}
}
public static void main(String[] args) throws InterruptedException {
long now = System.currentTimeMillis();
MyTask t1 = new MyTask("t1", now + 1000);
MyTask t2 = new MyTask("t2", now + 2000);
MyTask t3 = new MyTask("t3", now + 1500);
MyTask t4 = new MyTask("t4", now + 2500);
MyTask t5 = new MyTask("t5", now + 500);
tasks.put(t1);
tasks.put(t2);
tasks.put(t3);
tasks.put(t4);
tasks.put(t5);
System.out.println(tasks);
for (int i = 0; i < 5; i++) {
System.out.println(tasks.take());
}
}
}
PriorityQueue
内部进行了排序,底层是一个二叉树(小顶堆)的结构
package com.mashibing.juc.c_025;
import java.util.PriorityQueue;
public class T07_01_PriorityQueque {
public static void main(String[] args) {
PriorityQueue<String> q = new PriorityQueue<>();
q.add("c");
q.add("e");
q.add("a");
q.add("d");
q.add("z");
for (int i = 0; i < 5; i++) {
System.out.println(q.poll());
}
}
}
输出:
a
c
d
e
z
SynchronousQueue
容量为0,不能往里装东西,只有有一个线程等着的时候,才能把东西递到这个线程手里,是用来一个线程给另外一个线程传数据的。
本质和Exchanger比较相似,也是需要两个线程同步对接,否则都会阻塞着。
在线程池里面,线程之间进行任务调度的时候,经常会用到。
package com.mashibing.juc.c_025;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
public class T08_SynchronusQueue { //容量为0
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> strs = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(strs.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
strs.put("aaa"); //阻塞等待消费者消费
//strs.put("bbb");
//strs.add("aaa");
System.out.println(strs.size());
}
}
TransferQueue
装完,阻塞等着,有线程把它取走,再离开
要先开启消费者线程,再往里面transfer,要不然就阻塞了~
场景1:要求某件任务有一个结果(比如一个订单等付款完成之后,确认有线程去处理它了,再给客户反馈)
场景2:确认收钱完成之后,才能把商品取走,比如面对面付款
示例:
package com.mashibing.juc.c_025;
import java.util.concurrent.LinkedTransferQueue;
public class T09_TransferQueue {
public static void main(String[] args) throws InterruptedException {
LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();
new Thread(() -> {
try {
System.out.println(strs.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
strs.transfer("aaa");
//strs.put("aaa");
/*new Thread(() -> {
try {
System.out.println(strs.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();*/
}
}
经典的交替打印面试题可以用 TransferQueue 实现
package com.mashibing.juc.c_026_00_interview.A1B2C3;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;
public class T13_TransferQueue {
public static void main(String[] args) {
char[] aI = "1234567".toCharArray();
char[] aC = "ABCDEFG".toCharArray();
TransferQueue<Character> queue = new LinkedTransferQueue<Character>();
new Thread(() -> {
try {
for (char c : aI) {
System.out.print(queue.take());
queue.transfer(c);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1").start();
new Thread(() -> {
try {
for (char c : aC) {
queue.transfer(c);
System.out.print(queue.take());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t2").start();
}
}
下节课预习
- Callable
- Future, Completable Future
更多推荐
所有评论(0)