JDK5中添加了新的java.util.concurrent包,相對同步容器而言,並發容器通過一些機制改進了並發性能。因為同步容器將所有對容器狀態的訪問都串行化了,這樣保證了線程的安全性,所以這種方法的代價就是嚴重降低了並發性,當多個線程競爭容器時,吞吐量嚴重降低。因此JDK5開始針對多線程並發訪問設計,提供了並發性能較好的並發容器,引入了java.util.concurrent包。與Vector和Hashtable、Collections.synchronizedXxx()同步容器等相比,util.concurrent中引入的並發容器主要解決了兩個問題:
1)根據具體場景進行設計,盡量避免synchronized,提供並發性。
2)定義了一些並發安全的復合操作,並且保證並發環境下的迭代操作不會出錯。
這里先粗略的介紹下 concurrrent 包下面常見並發容器的簡單應用,后續再針對性的去深入研究。
並發容器:
這些容器的關鍵方法大部分都實現了線程安全的功能,卻不使用同步關鍵字(synchronized)。
常見阻塞隊列:
BlockingQueue.
class,阻塞隊列接口
DelayQueue.
class,阻塞隊列,無界隊列,並且元素是Delay的子類,保證元素在達到一定時間后才可以取得到
public class T07_DelayQueue { // 執行定時任務 static BlockingQueue<MyTask> tasks = new DelayQueue<>(); static Random r = new Random(); static class MyTask implements Delayed { long runningTime; MyTask(Long rt) { this.runningTime = rt; } @Override public int compareTo(Delayed o) { if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) return -1; if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) return 1; else return 0; } @Override public long getDelay(TimeUnit unit) { return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public String toString() { return "" + runningTime ; } } public static void main(String[] args) throws InterruptedException { long now = System.currentTimeMillis(); MyTask t1 =new MyTask(now +1000); MyTask t2 =new MyTask(now +2000); MyTask t3 =new MyTask(now +1500); MyTask t4 =new MyTask(now +2500); MyTask t5 =new MyTask(now +500); tasks.put(t1); tasks.put(t2); tasks.put(t3); tasks.put(t4); tasks.put(t5); System.out.println(tasks); for(int i=0;i<5;i++) { System.out.println(tasks.take()); } } }
BlockingDeque.
class,雙端阻塞隊列接口
ArrayBlockingQueue. class,阻塞隊列,數組實現。有界阻塞隊列
ArrayBlockingQueue. class,阻塞隊列,數組實現。有界阻塞隊列
public class T06_ArrayBlockingQueue { // 阻塞隊列 static BlockingQueue<String> strs = new ArrayBlockingQueue<>(10); static Random r =new Random(); public static void main(String[] args) throws InterruptedException { for(int i=0;i<10;i++) { strs.put("a"+i); } // strs.add("aaa");//Exception in thread "main" java.lang.IllegalStateException: Queue full // strs.put("aaa"); // 滿了會等待 阻塞 // strs.offer("aaa");//不會報異常 有返回值 true or false // strs.offer("aaa",1,TimeUnit.SECONDS); // 按照時間段阻塞 } }
LinkedBlockingDeque. class,阻塞雙端隊列,鏈表實現
LinkedBlockingQueue. class,阻塞隊列,鏈表實現,無界,以下是該類對與生產者消費者模型的簡單實現:
public class T05_LinkedBlockingQueue { // 阻塞隊列 static BlockingQueue<String> strs = new LinkedBlockingQueue<>(); static Random r =new Random(); //生產者消費者 public static void main(String[] args) { new Thread(()->{ for(int i=0;i<100;i++) { try { strs.put("a"+i);//如果滿了就會等待阻塞 System.err.println("a"+i); TimeUnit.MILLISECONDS.sleep(r.nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } } },"p1").start(); for(int i=0;i<5;i++) { new Thread(()->{ while(true) { try { //take 如果空了 就會阻塞 System.out.println(Thread.currentThread().getName()+" take - "+ strs.take()); } catch (InterruptedException e) { e.printStackTrace(); } } },"c" +i).start(); } } }
SynchronousQueue.class,同步隊列,但是隊列長度為0,生產者放入隊列的操作會被阻塞,直到消費者過來取,所以這個隊列根本不需要空間存放元素;有點像一個獨木橋,一次只能一人通過,還不能在橋上停留
public class T09_SynchronousQueue { // 特殊的 transferQueue ,隊列為空的 //任何添加東西都要直接丟給消費者的,不會往隊列里加的 public static void main(String[] args) throws InterruptedException { BlockingQueue<String> strs = new SynchronousQueue<>(); new Thread(()->{ try { System.out.println(strs.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); strs.put("aaa");//阻塞,等待消費者消費 // strs.add("aaa"); // 報錯 System.out.println(strs.size()); } }
常見非阻塞隊列:
ConcurrentLinkedDeque.class,非阻塞雙端隊列,鏈表實現,無界隊列
ConcurrentLinkedQueue.class,非阻塞隊列,鏈表實現
ConcurrentLinkedQueue.class,非阻塞隊列,鏈表實現
public class T04_ConcurrentQueue { public static void main(String[] args) { Queue<String> strs = new ConcurrentLinkedQueue<>(); Queue<String> strs2 = new ConcurrentLinkedDeque<>();//雙端隊列 for(int i=0;i<10;i++) { // 根據返回值來判斷是否添加成功 strs.offer("a"+i); } System.out.println(strs);//[a0, a1, a2, a3, a4, a5, a6, a7, a8, a9] System.out.println(strs.size());// 10 System.out.println(strs.poll());// a0 System.out.println(strs.size());// 9 System.out.println(strs.peek());// a1 System.out.println(strs.size());// 9 } }
轉移隊列:
TransferQueue.class,轉移隊列接口,生產者要等消費者消費的隊列,生產者嘗試把元素直接轉移給消費者
LinkedTransferQueue.class,轉移隊列的鏈表實現,它比SynchronousQueue更快,transfer 方法有客戶端准備消費,直接把消息直接傳遞給消費者,不放到隊列里,沒有消費者線程的話該線程會阻塞。但是可以調用 add put 王隊列里丟,隊列還是有容量的。
LinkedTransferQueue.class,轉移隊列的鏈表實現,它比SynchronousQueue更快,transfer 方法有客戶端准備消費,直接把消息直接傳遞給消費者,不放到隊列里,沒有消費者線程的話該線程會阻塞。但是可以調用 add put 王隊列里丟,隊列還是有容量的。
public class T08_TransferQueue { public static void main(String[] args) throws InterruptedException { LinkedTransferQueue<String> strs = new LinkedTransferQueue<>(); new Thread(()->{ try { System.out.println(strs.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); strs.transfer("aaa"); // new Thread(()->{ // try { // System.out.println(strs.take()); // } catch (InterruptedException e) { // e.printStackTrace(); // } // }).start(); } }
常見集合容器:
ConcurrentMap.class,並發Map的接口,定義了putIfAbsent(k,v)、remove(k,v)、replace(k,oldV,newV)、replace(k,v)這四個並發場景下特定的方法
ConcurrentHashMap.class,並發HashMap,ConcurrentHashMap在並發中效率比 HashTable高,因為 HashTable 在往里添加東西的時候葯鎖定整個對象,而 ConcurrentHashMap 分成了16段,插入的時候只鎖定了其中的一段,其實就是把鎖細粒度化了,因此在多線程情況下回比 hashTable高 ,同樣也比 Collections.synchronizedMap(map1) 高。
ConcurrentSkipListMap.class,跳表數據結構,它也是NavigableMap的實現類(要求元素之間可以比較),只有你確實需要快速的遍歷操作,並且可以承受額外的插入開銷的時候,在高並發中要求排序才去使用它。
ConcurrentHashMap.class,並發HashMap,ConcurrentHashMap在並發中效率比 HashTable高,因為 HashTable 在往里添加東西的時候葯鎖定整個對象,而 ConcurrentHashMap 分成了16段,插入的時候只鎖定了其中的一段,其實就是把鎖細粒度化了,因此在多線程情況下回比 hashTable高 ,同樣也比 Collections.synchronizedMap(map1) 高。
ConcurrentSkipListMap.class,跳表數據結構,它也是NavigableMap的實現類(要求元素之間可以比較),只有你確實需要快速的遍歷操作,並且可以承受額外的插入開銷的時候,在高並發中要求排序才去使用它。
ConcurrentSkipListSet.class,和上面類似,只不過map變成了set
以下代碼可以簡單的比較一下幾個容器再並發的情況下的效率問題
public class T01_ConcurrentMap { public static void main(String[] args) {
Map<String,String> map =new ConcurrentHashMap<>();//並發性比較高 // Map<String,String> map =new ConcurrentSkipListMap<>();//並發性高而且要求排序 // Map<String,String> map =new Hashtable<>();//並發性不是跟高 // Map<String,String> map1 =new HashMap<>(); // Map<String, String> map = Collections.synchronizedMap(map1);//並發性不是很高 Random r =new Random(); Thread[] ths =new Thread[100]; CountDownLatch latch =new CountDownLatch(ths.length); long start = System.currentTimeMillis(); for(int i=0;i<ths.length;i++) { ths[i]=new Thread(()->{ for(int j=0;j<10000;j++) { map.put("a"+r.nextInt(10000), "a"+r.nextInt(10000)); } latch.countDown(); }); } Arrays.asList(ths).forEach(o->o.start()); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } long end = System.currentTimeMillis(); System.out.println(end-start); } }
CopyOnWriteArrayList.class,copy-on-write(寫時復制)模式的array list,每當需要插入元素,不在原list上操作,而是會新建立一個list,然后將原先的引用指向副本。適合讀遠遠大於寫的場景
CopyOnWriteArraySet.class,和上面類似,list變成set而已
CopyOnWriteArraySet.class,和上面類似,list變成set而已
以下代碼可以簡單的比較一下幾個容器再並發的情況下的效率問題
public class T01_CopyOnWriteList { public static void main(String[] args) { List<String> lists = // new Vector<String>();//並發效率相對高 // new ArrayList<String>();//這個並發會有問題 new CopyOnWriteArrayList<String>();//效率比較低,讀取非常快,讀的時候不加鎖 Random r =new Random(); Thread[] ths =new Thread[100]; for(int i=0;i<ths.length;i++) { ths[i]=new Thread(()->{ for(int j=0;j<1000;j++) { lists.add("a"+r.nextInt(10000) ); } }); } long start = System.currentTimeMillis(); Arrays.asList(ths).forEach(o->o.start()); Arrays.asList(ths).forEach(o->{ try { o.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); long end = System.currentTimeMillis(); System.out.println(end-start); System.out.println(lists.size()); } }