同步容器類
早期版本的JDK提供的同步容器類為Vector和Hashtable,JDK1.2 提供了Collections.synchronizedXxx等工程方法,將普通的容器繼續包裝。對每個共有方法都進行同步。
Collection類中提供了多個synchronizedXxx方法,該方法返回指定集合對象對應的同步對象。synchronizedXxx方法本質是對相應容器的包裝。
例:使用Collections類獲得同步容器。
public static void main(String[] args) { Collection c = Collections.synchronizedCollection(new ArrayList()); List list = Collections.synchronizedList(new ArrayList()); // 包裝List Set set = Collections.synchronizedSet(new HashSet()); // 包裝Set Map map = Collections.synchronizedMap(new HashMap()); // 包裝Map }
synchronizedMap源碼探究
Collections.synchronizedMap(new HashMap())方法內部會new一個SynchronizedMap對象。
該方法源碼如下:
SynchronizedMap類是一個靜態內部類。
該類源碼如下:
SynchronizedMap的每個方法,都是對Map本身做了一次帶鎖的操作。該類本身並沒有太多的應用,性能較差。
同步容器的問題
同步容器類在單個方法被使用時可以保證線程安全。復合操作則需要額外的客戶端加鎖來保護。
迭代器與ConcurrentModificationException
使用Iterator迭代容器或使用使用for-each遍歷容器,在迭代過程中修改容器會拋出ConcurrentModificationException異常。想要避免出現ConcurrentModificationException,就必須在迭代過程持有容器的鎖。但是若容器較大,則迭代的時間也會較長。那么需要訪問該容器的其他線程將會長時間等待。從而會極大降低性能。
若不希望在迭代期間對容器加鎖,可以使用"克隆"容器的方式。使用線程封閉,由於其他線程不會對容器進行修改,可以避免ConcurrentModificationException。但是在創建副本的時候,存在較大性能開銷。
隱式迭代
toString,hashCode,equalse,containsAll,removeAll,retainAll等方法都會隱式的Iterate,也即可能拋出ConcurrentModificationException。
並發容器
針對於同步容器的巨大缺陷。java.util.concurrent中提供了並發容器。並發容器包注重以下特性:
- 根據具體場景進行設計,盡量避免使用鎖,提高容器的並發訪問性。
- 並發容器定義了一些線程安全的復合操作。
- 並發容器在迭代時,可以不封閉在synchronized中。但是未必每次看到的都是"最新的、當前的"數據。如果說將迭代操作包裝在synchronized中,可以達到"串行"的並發安全性,那么並發容器的迭代達到了"臟讀"。
CopyOnWriteArrayList和CopyOnWriteArraySet分別代替List和Set,主要是在遍歷操作為主的情況下來代替同步的List和同步的Set,這也就是上面所述的思路:迭代過程要保證不出錯,除了加鎖,另外一種方法就是"克隆"容器對象。
ConcurrentLinkedQuerue是Query實現,是一個先進先出的隊列。一般的Queue實現中操作不會阻塞,如果隊列為空,那么取元素的操作將返回空。Queue一般用LinkedList實現的,因為去掉了List的隨機訪問需求,因此並發性更好。
BlockingQueue擴展了Queue,增加了可阻塞的插入和獲取操作,如果隊列為空,那么獲取操作將阻塞,直到隊列中有一個可用的元素。如果隊列已滿,那么插入操作就阻塞,直到隊列中出現可用的空間。
Map並發容器
ConcurrentMap<K,V>接口
該接口定義Map的原子操作:putIfAbsent、remove、replace。
若沒有則增加
V putIfAbsent(K key, V value) :若不包含key,則放入value。若包含key,則返回key對應的value。等價於:
if (!map.containsKey(key)) return map.put(key, value); else return map.get(key);
若相等則移除
boolean remove(Object key, Object value) :當key對應到指定的value時,才移除該key-value對。等效於:
if (map.containsKey(key) && map.get(key).equals(value)) { map.remove(key); return true; } else { return false; }
若相等則替換
boolean replace(K key, V oldValue, V newValue) :當key對應到指定的value時,才替換key對應的value值。等效於:
if (map.containsKey(key) && map.get(key).equals(oldValue)) { map.put(key, newValue); return true; } else { return false; }
若擁有則替換
V replace(K key,V value):只有目前將鍵的條目映射到某一值時,才替換該鍵的條目。
等效於:
if (map.containsKey(key)) { return map.put(key, value); } else { return null; }
ConcurrentHashMap<K,V>類
ConcurrentHashMap是HashMap的線程安全版本。ConcurrentHashMap類實現了ConcurrentMap接口,具備putIfAbsent、remove、replace原子操作。此類與 Hashtable 相似(與 HashMap 不同)ConcurrentHashMap不允許將 null 用作鍵或值,這與ConcurrentHashMap的具體實現有關。
ConcurrentHashMap實現也采用了散列機制,但是采用了分段鎖(Lock Striping)機制提供了並發性能。並發環境下實現更的吞吐量,而在單線程環境下只損失非常小的性能。
三種Map的比較:
- HashMap是根據散列值分段存儲。
- SynchronizedMap在同步的時候鎖住了所有的段。
- ConcurrentHashMap加鎖的時候根據散列值鎖住了散列值鎖對應的那段,因此提高了並發性能。
ConcurrentHashMap和其他並發容易一樣改進了同步容器的問題,其迭代器不會拋出CurrentModificationException異常。ConcurrentHashMap返回迭代器具有弱的一致性。弱一致性的迭代器可以容忍並發修改,當創建迭代器時會創建已有的元素,可以(但是不保證)在迭代器被構造后將修改操作反映給容器。ConcurrentHashMap弱一致性使得像size()和isEmpty()方法變弱了,即size()和isEmpty()方法返回的結果在計算時可能是過期了。但這種方法在並發環境下用處很小,因為它們返回值總在不斷變化。這些操作被弱化,換來的是對重要操作的性能優化(包括get、put、containsKey、remove等)。
ConcurrentSkipListMap類
ConcurrentSkipListMap是TreeMap的線程安全版本。
Queue 並發容器
Queue 隊列
隊列按照 FIFO(先進先出)原則對元素進行排序。隊列的頭部 是隊列中時間最長的元素。隊列的尾部 是隊列中時間最短的元素。新的元素插入到隊列的尾部,隊列獲取操作從隊列頭部獲得元素。
對於Queue而言是在Collection的基礎上增加了offer/remove/poll/element/peek方法,另外重新定義了add方法。
拋出異常 |
返回特殊值 |
操作描述 |
|
插入 |
add(e) |
offer(e) |
將元素加入到隊列尾部。 |
移除 |
remove() |
poll() |
移除隊列頭部的元素。 |
檢查 |
element() |
peek() |
返回隊列頭部的元素而不移除此元素。 |
PriorityQueue類在非並發情況下實現了Queue接口。
ConcurrentLinkedQueue類
ConcurrentLinkedQueue是使用非阻塞的方式實現的基於鏈接節點的無界的線程安全隊列,性能非常好。
多個線程共享訪問一個公共 collection 時,ConcurrentLinkedQueue 是一個恰當的選擇。此隊列不允許使用 null 元素。
BlockingQueue 接口——阻塞隊列
對於Queue來說,BlockingQueue是主要的線程安全版本。這是一個可阻塞的版本,也就是允許添加/刪除元素被阻塞,直到成功為止。
BlockingQueue相對於Queue而言增加了兩個操作:put/take。下面是一張整理的表格。
拋出異常 |
返回特殊值 |
阻塞 |
超時 |
操作描述 |
|
插入 |
add(e) |
offer(e) |
put(e) |
offer(e,time,unit) |
將元素加入到隊列尾部。 |
移除 |
remove() |
poll() |
take() |
poll(time,unit) |
移除隊列頭部的元素。 |
檢查 |
element() |
peek() |
返回隊列頭部的元素而不移除此元素。 |
BlockingQueue非常適合做生產者消費者模型。一般而言BlockingQueue內部會使用鎖,所以BlockingQueue的性能並不太高。
ArrayBlockingQueue類
一個由數組支持的有界阻塞隊列。ArrayBlockingQueue是一個典型的"有界緩存區",由於數組大小固定,所以一旦創建了這樣的“緩存區",就不能再增加其容量。
阻塞條件:試圖向已滿隊列中執行put元素會導致操作受阻塞;試圖從空隊列中take元素將導致類似阻塞。
源碼解析
ArrayBlockingQueue使用ReentrantLock保證線程安全,並用Condition對線程進行通訊。
put方法 若滿,則等待。
take方法 若空,則等待。
LinkedBlockingQueue類
一個基於鏈表的有界阻塞隊列。鏈接隊列的吞吐量通常要高於基於數組的隊列,但是在大多數並發應用程序中,其可預知的性能要低。
LinkedBlockingQueue的一個構造方法可以設定容量范圍。如果未指定容量,則它等於Integer.MAX_VALUE。除非插入節點會使隊列超出容量,否則每次插入后會動態地創建鏈接節點。
PriorityBlockingQueue類
一個支持優先級的無界阻塞隊列,即該阻塞隊列中的元素可自動排序。默認情況下,元素采取自然升序排列。可以自定義類實現compareTo()方法來指定元素排序規則。此隊列在邏輯上是無界的。此類不允許使用 null 元素。需要注意的是,不能保證相同優先級元素的順序。
DelayQueue類
DelayQueue是一種延時獲取元素的無界阻塞隊列。隊列使用PriorityQueue類實現。隊列中的元素必須實現Delay接口,在創建元素時,可以指定從隊列獲取元素的限制時長。只有在延遲期滿,元素才能被提出隊列。
這個隊列的特性是,隊列中的元素都要延遲時間(超時時間),只有一個元素達到了延時時間才能出隊列,也就是說每次從隊列中獲取的元素總是最先到達延時的元素。
DelayQueue非常有用,可以用DelayQueue運用在以下應用場景:
- 緩存系統的設計:可以用DelayQueue保存緩存元素。使用一個線程循環檢查隊列中的元素,一旦可以從DelayQueue中獲取元素時,表示緩存有效期到了。
- 定時任務調度:使用DelayQueue保存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,比如TimeQueue就是使用DelayQueue實現的。
SynchronousQueue類
SynchronousQueue是一個不存儲元素的阻塞隊列。每個put操作必須等待一個take操作,否則不能繼續添加元素。
SynchronousQueue內部其實沒有任何一個元素,容量是0,嚴格說並不是一種容器。由於隊列沒有容量,因此不能調用peek操作。因為僅在試圖要移除元素時,該元素才存在。除非另一個線程試圖移除某個元素,否則也不能(使用任何方法)插入元素;也不能迭代隊列,因為其中沒有元素可用於迭代。
SynchronousQueue更像是一種信道(管道),資源從一個方向快速傳遞到另一方向。SynchronousQueue的吞吐量高於LinkedBlockingQueue和ArrayBlockingQueue。
例:使用BlockingQueue實現生產者-消費者模型。服務端(ICE服務)接受客戶端的請求(accept),請求計算此人的好友生日,然后將計算的結果存取緩存中(Memcache)中。該例采用了ExecutorService實現多線程的功能,盡可能的提高吞吐量,阻塞隊列使用的是LinkedBlockingQueue。
import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; public class BirthdayService { final int workerNumber; final Worker[] workers; final ExecutorService threadPool; static volatile boolean running = true; public BirthdayService(int workerNumber, int capacity) { if (workerNumber <= 0) throw new IllegalArgumentException(); this.workerNumber = workerNumber; workers = new Worker[workerNumber]; for (int i = 0; i < workerNumber; i++) { workers[i] = new Worker(capacity); } boolean b = running; // kill the resorting threadPool = Executors.newFixedThreadPool(workerNumber); for (Worker w : workers) { threadPool.submit(w); } } Worker getWorker(int id) { return workers[id % workerNumber]; } class Worker implements Runnable { final BlockingQueue<Integer> queue; public Worker(int capacity) { queue = new LinkedBlockingQueue<Integer>(capacity); } public void run() { while (true) { try { consume(queue.take()); } catch (InterruptedException e) { return; } } } void put(int id) { try { queue.put(id); } catch (InterruptedException e) { return; } } } public void accept(int id) { // accept client request getWorker(id).put(id); } protected void consume(int id) { // do the work // get the list of friends and save the birthday to cache } }
Deque雙向隊列
Deque接口定義了雙向隊列。雙向隊列允許在隊列頭和尾部進行入隊出隊操作。Deque不僅具有FIFO的Queue實現,也有FILO的實現,也就是不僅可以實現隊列,也可以實現一個堆棧。需要說明的是LinkedList實現了Deque接口,即LinkedList是一個雙向隊列。
Deque在Queue的基礎上增加了更多的操作方法。
第一個元素 | 最后一個元素 | 描述 | |||
拋出異常 | 返回特殊值 | 拋出異常 | 返回特殊值 | ||
插入 | addFirst(e) | offerFirst | addLast(e) | offerLast(e) | 將元素加入列表 |
push(e) | add(e) | offer(e) | |||
移除 | removeFirst(e) | pollFirst(e) | removeLast() | pollLast() | 將元素從隊列移除 |
remove()/pop() | poll() | ||||
檢查 | getFirst() | peekFirst() | getLast() | peekLast() | 查看隊列頭或尾元素 |
element() | peek() |
對於非阻塞Deque的實現類有ArrayDeque和LinkedList。
BlockingDeque接口
BlockingDeque 方法有四種形式,使用不同的方式處理無法立即滿足但在將來某一時刻可能滿足的操作:第一種方式拋出異常;第二種返回一個特殊值(null 或 false,具體取決於操作);第三種無限期阻塞當前線程,直至操作成功;第四種只阻塞給定的最大時間,然后放棄。下表中總結了這些方法:
第一個元素 | 最后一個元素 | 描述 | |||||||
拋出異常 | 返回特殊值 | 阻塞 | 超時 | 拋出異常 | 返回特殊值 | 阻塞 | 超時 | ||
插入 | addFirst(e) | offerFirst | putFirst(e) | offerFirst(e,time,unit) | addLast(e) | offerLast(e) | putLast(e) | offerLast(e,time,unit) | 將元素加入列表 |
push(e) | add(e) | offer(e) | |||||||
移除 | removeFirst(e) | pollFirst(e) | takeFirst() | pollFirst(time,unit) | removeLast() | pollLast() | takeLast() | pollLast(time,unit) | 將元素從隊列移除 |
remove()/pop() | poll() | ||||||||
檢查 | getFirst() | peekFirst() | getLast() | peekLast() | 查看隊列頭或尾元素 | ||||
element() | peek() | |
BlockingDeque 是線程安全的,但不允許 null 元素,並且可能有(也可能沒有)容量限制。
BlockingDeque 實現可以直接用作 FIFO BlockingQueue。繼承自 BlockingQueue 接口的方法精確地等效於下表中描述的 BlockingDeque 方法:
BlockingQueue 方法 | 等效的 BlockingDeque 方法 |
插入 | |
add(e) | addLast(e) |
offer(e) | offerLast(e) |
put(e) | putLast(e) |
offer(e, time, unit) | offerLast(e, time, unit) |
移除 | |
remove() | removeFirst() |
poll() | pollFirst() |
take() | takeFirst() |
poll(time, unit) | pollFirst(time, unit) |
檢查 | |
element() | getFirst() |
peek() | peekFirst() |
LinkedBlockingDeque類
LinkedBlockingDeque是一個基於鏈表的雙向阻塞隊列。雙向隊列因為多一個操作隊列的入口,在多線程同時入隊時,也就降低了一半競爭。在初始化LinkedBlockingDeque時,可以設置容量,防止其過度膨脹。另外,雙向阻塞隊列可以運用在“工作竊取”模式中。
寫時復制 CopyOnWrite
CopyOnWrite表示讀時共享,寫時復制。對於讀多寫少的場景特別適用。
CopyOnWriteArrayList 與 CopyOnWriteArraySet
對於List或者Set而言,增、刪操作其實都是針對整個容器,因此每次操作都不可避免的需要鎖定整個容器空間,性能肯定會大打折扣。要實現一個線程安全的List/Set,只需要在修改操作的時候進行同步即可,比如使用java.util.Collections.synchronizedList(List<T>)或者java.util.Collections.synchronizedSet(Set<T>)。
當然也可以使用Lock來實現線程安全的List/Set。ReadWriteLock當然是一種實現。CopyOnWriteArrayList/CopyOnWriteArraySet確實另外一種思路。
CopyOnWriteArrayList/CopyOnWriteArraySet的基本思想是一旦對容器有修改,那么就"復制"一份新的集合,在新的集合上修改,然后將新集合復制給舊的引用。當然了這部分少不了要加鎖。顯然對於CopyOnWriteArrayList/CopyOnWriteArraySet來說最大的好處就是"讀"操作不需要鎖了。