JDK5中添加了新的java.util.concurrent包,相對同步容器而言,並發容器通過一些機制改進了並發性能。因為同步容器將所有對容器狀態的訪問都串行化了,這樣保證了線程的安全性,所以這種方法的代價就是嚴重降低了並發性,當多個線程競爭容器時,吞吐量嚴重降低。因此JDK5開始針對多線程並發訪問設計,提供了並發性能較好的並發容器,引入了java.util.concurrent包。與Vector和Hashtable、Collections.synchronizedXxx()同步容器等相比,util.concurrent中引入的並發容器主要解決了兩個問題:
1)根據具體場景進行設計,盡量避免synchronized,提供並發性。
2)定義了一些並發安全的復合操作,並且保證並發環境下的迭代操作不會出錯。
這里先粗略的介紹下 concurrrent 包下面常見並發容器的簡單應用,后續再針對性的去深入研究。
並發容器:
這些容器的關鍵方法大部分都實現了線程安全的功能,卻不使用同步關鍵字(synchronized)。
常見阻塞隊列:
BlockingQueue.
class,阻塞隊列接口
DelayQueue.
class,阻塞隊列,無界隊列,並且元素是Delay的子類,保證元素在達到一定時間后才可以取得到
public class T07_DelayQueue {
// 執行定時任務
static BlockingQueue<MyTask> tasks = new DelayQueue<>();
static Random r = new Random();
static class MyTask implements Delayed {
long runningTime;
MyTask(Long rt) {
this.runningTime = rt;
}
@Override
public int compareTo(Delayed o) {
if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
return -1;
if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))
return 1;
else
return 0;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public String toString() {
return "" + runningTime ;
}
}
public static void main(String[] args) throws InterruptedException {
long now = System.currentTimeMillis();
MyTask t1 =new MyTask(now +1000);
MyTask t2 =new MyTask(now +2000);
MyTask t3 =new MyTask(now +1500);
MyTask t4 =new MyTask(now +2500);
MyTask t5 =new MyTask(now +500);
tasks.put(t1);
tasks.put(t2);
tasks.put(t3);
tasks.put(t4);
tasks.put(t5);
System.out.println(tasks);
for(int i=0;i<5;i++) {
System.out.println(tasks.take());
}
}
}
BlockingDeque.
class,雙端阻塞隊列接口
ArrayBlockingQueue. class,阻塞隊列,數組實現。有界阻塞隊列
ArrayBlockingQueue. class,阻塞隊列,數組實現。有界阻塞隊列
public class T06_ArrayBlockingQueue {
// 阻塞隊列
static BlockingQueue<String> strs = new ArrayBlockingQueue<>(10);
static Random r =new Random();
public static void main(String[] args) throws InterruptedException {
for(int i=0;i<10;i++) {
strs.put("a"+i);
}
// strs.add("aaa");//Exception in thread "main" java.lang.IllegalStateException: Queue full
// strs.put("aaa"); // 滿了會等待 阻塞
// strs.offer("aaa");//不會報異常 有返回值 true or false
// strs.offer("aaa",1,TimeUnit.SECONDS); // 按照時間段阻塞
}
}
LinkedBlockingDeque. class,阻塞雙端隊列,鏈表實現
LinkedBlockingQueue. class,阻塞隊列,鏈表實現,無界,以下是該類對與生產者消費者模型的簡單實現:
public class T05_LinkedBlockingQueue {
// 阻塞隊列
static BlockingQueue<String> strs = new LinkedBlockingQueue<>();
static Random r =new Random();
//生產者消費者
public static void main(String[] args) {
new Thread(()->{
for(int i=0;i<100;i++) {
try {
strs.put("a"+i);//如果滿了就會等待阻塞
System.err.println("a"+i);
TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"p1").start();
for(int i=0;i<5;i++) {
new Thread(()->{
while(true) {
try {
//take 如果空了 就會阻塞
System.out.println(Thread.currentThread().getName()+" take - "+ strs.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"c" +i).start();
}
}
}
SynchronousQueue.class,同步隊列,但是隊列長度為0,生產者放入隊列的操作會被阻塞,直到消費者過來取,所以這個隊列根本不需要空間存放元素;有點像一個獨木橋,一次只能一人通過,還不能在橋上停留
public class T09_SynchronousQueue {
// 特殊的 transferQueue ,隊列為空的
//任何添加東西都要直接丟給消費者的,不會往隊列里加的
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> strs = new SynchronousQueue<>();
new Thread(()->{
try {
System.out.println(strs.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
strs.put("aaa");//阻塞,等待消費者消費
// strs.add("aaa"); // 報錯
System.out.println(strs.size());
}
}
常見非阻塞隊列:
ConcurrentLinkedDeque.class,非阻塞雙端隊列,鏈表實現,無界隊列
ConcurrentLinkedQueue.class,非阻塞隊列,鏈表實現
ConcurrentLinkedQueue.class,非阻塞隊列,鏈表實現
public class T04_ConcurrentQueue {
public static void main(String[] args) {
Queue<String> strs = new ConcurrentLinkedQueue<>();
Queue<String> strs2 = new ConcurrentLinkedDeque<>();//雙端隊列
for(int i=0;i<10;i++) {
// 根據返回值來判斷是否添加成功
strs.offer("a"+i);
}
System.out.println(strs);//[a0, a1, a2, a3, a4, a5, a6, a7, a8, a9]
System.out.println(strs.size());// 10
System.out.println(strs.poll());// a0
System.out.println(strs.size());// 9
System.out.println(strs.peek());// a1
System.out.println(strs.size());// 9
}
}
轉移隊列:
TransferQueue.class,轉移隊列接口,生產者要等消費者消費的隊列,生產者嘗試把元素直接轉移給消費者
LinkedTransferQueue.class,轉移隊列的鏈表實現,它比SynchronousQueue更快,transfer 方法有客戶端准備消費,直接把消息直接傳遞給消費者,不放到隊列里,沒有消費者線程的話該線程會阻塞。但是可以調用 add put 王隊列里丟,隊列還是有容量的。
LinkedTransferQueue.class,轉移隊列的鏈表實現,它比SynchronousQueue更快,transfer 方法有客戶端准備消費,直接把消息直接傳遞給消費者,不放到隊列里,沒有消費者線程的話該線程會阻塞。但是可以調用 add put 王隊列里丟,隊列還是有容量的。
public class T08_TransferQueue {
public static void main(String[] args) throws InterruptedException {
LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();
new Thread(()->{
try {
System.out.println(strs.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
strs.transfer("aaa");
// new Thread(()->{
// try {
// System.out.println(strs.take());
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }).start();
}
}
常見集合容器:
ConcurrentMap.class,並發Map的接口,定義了putIfAbsent(k,v)、remove(k,v)、replace(k,oldV,newV)、replace(k,v)這四個並發場景下特定的方法
ConcurrentHashMap.class,並發HashMap,ConcurrentHashMap在並發中效率比 HashTable高,因為 HashTable 在往里添加東西的時候葯鎖定整個對象,而 ConcurrentHashMap 分成了16段,插入的時候只鎖定了其中的一段,其實就是把鎖細粒度化了,因此在多線程情況下回比 hashTable高 ,同樣也比 Collections.synchronizedMap(map1) 高。
ConcurrentSkipListMap.class,跳表數據結構,它也是NavigableMap的實現類(要求元素之間可以比較),只有你確實需要快速的遍歷操作,並且可以承受額外的插入開銷的時候,在高並發中要求排序才去使用它。
ConcurrentHashMap.class,並發HashMap,ConcurrentHashMap在並發中效率比 HashTable高,因為 HashTable 在往里添加東西的時候葯鎖定整個對象,而 ConcurrentHashMap 分成了16段,插入的時候只鎖定了其中的一段,其實就是把鎖細粒度化了,因此在多線程情況下回比 hashTable高 ,同樣也比 Collections.synchronizedMap(map1) 高。
ConcurrentSkipListMap.class,跳表數據結構,它也是NavigableMap的實現類(要求元素之間可以比較),只有你確實需要快速的遍歷操作,並且可以承受額外的插入開銷的時候,在高並發中要求排序才去使用它。
ConcurrentSkipListSet.class,和上面類似,只不過map變成了set
以下代碼可以簡單的比較一下幾個容器再並發的情況下的效率問題
public class T01_ConcurrentMap {
public static void main(String[] args) {
Map<String,String> map =new ConcurrentHashMap<>();//並發性比較高
// Map<String,String> map =new ConcurrentSkipListMap<>();//並發性高而且要求排序
// Map<String,String> map =new Hashtable<>();//並發性不是跟高
// Map<String,String> map1 =new HashMap<>();
// Map<String, String> map = Collections.synchronizedMap(map1);//並發性不是很高
Random r =new Random();
Thread[] ths =new Thread[100];
CountDownLatch latch =new CountDownLatch(ths.length);
long start = System.currentTimeMillis();
for(int i=0;i<ths.length;i++) {
ths[i]=new Thread(()->{
for(int j=0;j<10000;j++) {
map.put("a"+r.nextInt(10000), "a"+r.nextInt(10000));
}
latch.countDown();
});
}
Arrays.asList(ths).forEach(o->o.start());
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
System.out.println(end-start);
}
}
CopyOnWriteArrayList.class,copy-on-write(寫時復制)模式的array list,每當需要插入元素,不在原list上操作,而是會新建立一個list,然后將原先的引用指向副本。適合讀遠遠大於寫的場景
CopyOnWriteArraySet.class,和上面類似,list變成set而已
CopyOnWriteArraySet.class,和上面類似,list變成set而已
以下代碼可以簡單的比較一下幾個容器再並發的情況下的效率問題
public class T01_CopyOnWriteList {
public static void main(String[] args) {
List<String> lists =
// new Vector<String>();//並發效率相對高
// new ArrayList<String>();//這個並發會有問題
new CopyOnWriteArrayList<String>();//效率比較低,讀取非常快,讀的時候不加鎖
Random r =new Random();
Thread[] ths =new Thread[100];
for(int i=0;i<ths.length;i++) {
ths[i]=new Thread(()->{
for(int j=0;j<1000;j++) {
lists.add("a"+r.nextInt(10000) );
}
});
}
long start = System.currentTimeMillis();
Arrays.asList(ths).forEach(o->o.start());
Arrays.asList(ths).forEach(o->{
try {
o.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.currentTimeMillis();
System.out.println(end-start);
System.out.println(lists.size());
}
}
