【前言:無論是大數據從業人員還是Java從業人員,掌握Java高並發和多線程是必備技能之一。本文主要闡述Java並發包下的阻塞隊列和並發容器,其實研讀過大數據相關技術如Spark、Storm等源碼的,會發現它們底層大多用到了Java並發隊列、同步類容器、ReentrantLock等。建議大家結合本篇文章,仔細分析一下相關源碼】
BlockingQueue
阻塞隊列,位於java.util.concurrent並發包下,它很好的解決了多線程中如何安全、高效的數據傳輸問題。所謂“阻塞”是指在某些情況下線程被掛起,當滿足一定條件時會被自動喚醒,可以通過API進行控制。
常見的阻塞隊列主要分為兩種FIFO(先進先出)和LIFO(后進先出),當然通過不同的實現方式,還可以引申出多種不同類型的隊列。首先了解一下BlockingQueue的幾個核心API:put、take一對阻塞存取;add、poll一對非阻塞存取。
插入數據
put(anObj):把anObj加到BlockingQueue里,如果BlockQueue沒有空間,則調用此方法的線程被阻塞,直到BlockingQueue里面有空間再繼續插入
add(anObj):把anObj加到BlockingQueue里,如果BlockingQueue可以容納,則返回true,否則拋出異常
offer(anObj):表示如果可能的話,將anObj加到BlockingQueue里,如果BlockingQueue可以容納,則返回true,否則返回false。
讀取數據
take():取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態,直到Blocking有新的對象被加入為止
poll(time):取走BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數規定的時間,取不到時返回null
BlockingQueue核心成員介紹
ArrayBlockingQueue
基於數組實現的有界阻塞隊列。因為基於數組實現,所以具有查找快,增刪慢的特點。
生產者和消費者用的是同一把鎖,不能並行執行效率低。它底層使用了一種標准互斥鎖ReentrantLock,即讀讀、讀寫,寫寫都互斥,當然可以控制對象內部是否采用公平鎖,默認是非公平鎖。消費方式是FIFO。
生產和消費數據時,直接將枚舉對象插入或刪除,不會產生或銷毀額外的對象實例。
應用:因為底層生產和消費用了同一把鎖,定長數組不用頻繁創建和銷毀對象,適合於想按照隊列順序去執行任務,還不想出現頻繁的GC的場景。
LinkedBlockingQueue
基於鏈表實現的阻塞隊列,同樣具有增刪快,定位慢的特點。
需要注意一點:默認情況下創建的LinkedBlockingQueue容量是Integer.MAX_VALUE, 在這種情況下,如果生產者的速度一旦大於消費者的速度,可能還沒有等到隊列滿阻塞產生,系統內存就有可能已被消耗盡。可以通過指定容量創建LinkedBlockingQueue避免這種極端情況的發生。
雖然底層使用的也是ReentrantLock但take和put是分離的(生產和消費的鎖不是同一把鎖),高並發場景下效率仍然高於ArrayBlockingQueue。put方法在隊列滿的時候會阻塞直到有隊列成員被消費,take方法在隊列空的時候會阻塞,直到有隊列成員被放進來。
DelayQueue
DelayQueue是一個沒有大小限制的隊列,因此往隊列中插入數據的操作(生產者)永遠不會被阻塞,而只有獲取數據的操作(消費者)才會被阻塞。DelayQueue中的元素,只有指定的延遲時間到了,才能夠從隊列中獲取到該元素。
應用場景:
1.客戶端長時間占用連接的問題,超過這個空閑時間了,可以移除的
2.處理長時間不用的緩存:如果隊列里面的對象長時間不用,超過空閑時間,就移除
3.任務超時處理
PriorityBlockingQueue
PriorityBlockingQueue不會阻塞數據生產者,而只會在沒有可消費的數據時,阻塞數據的消費者。因此必須控制生產者生產數據的速度,避免消費者消費數據速度跟不上,否則時間一長,會最終耗盡所有的可用堆內存空間。
在向PriorityBlockingQueue中添加元素時,元素通過在實現實現Comparable接口,重寫compareTo()來定義優先級的邏輯。它內部控制線程同步的鎖采用的是公平鎖。
SynchronousQueue
一種無緩沖的等待隊列,來一個任務就執行這個任務,這期間不能添加任何的任務。也就是不用阻塞了,其實對於少量任務而言,這種做法更高效。
聲明一個SynchronousQueue有兩種不同的方式,公平模式和非公平模式:
公平模式:SynchronousQueue會采用公平鎖,並配合一個FIFO隊列來阻塞多余的生產者和消費者,從而體現整體的公平策略;
非公平模式(SynchronousQueue默認):SynchronousQueue采用非公平鎖,同時配合一個LIFO隊列來管理多余的生產者和消費者,而后一種模式,如果生產者和消費者的處理速度有差距,則很容易出現飢渴的情況,即可能有某些生產者或者是消費者的數據永遠都得不到處理。
ConcurrentLinkedQueue
不上鎖,高並發場景效率遠高於ArrayBlockingQueue和LinkedBlockingQueue等
容器
同步類容器
第一類:Vector、Stack、HashTable都是同步類,線程安全的,但高並發場景下仍然可能出現問題如ConcurrentModificationException。
第二類:Collections提供的一些工廠類(靜態),效率低
並發類容器
CopyOnWrite容器
寫時復制的容器:當我們往一個容器添加元素的時候,不直接往當前容器添加,而是先將當前容器進行copy,復制出一個新的容器,然后往新的容器里添加元素,添加完元素之后,再將原容器的引用指向新的容器,非常適合讀多寫少的場景。
但同時存在如下問題:
數據一致性問題:CopyOnWrite容器是弱一致性的,即只能保證數據的最終一致性,不能保證數據的實時一致性。所以如果你希望寫入的的數據能夠即時讀到,不要使用CopyOnWrite容器。
內存占用問題:因為CopyOnWrite 的寫時復制機制,所以在進行寫操作的時候,內存里會同時駐扎兩個對象的內存,舊的對象和新寫入的對象。如果這些對象占用的內存比較大,如果控制不好,比如寫特別多的情景,很有可能造成頻繁的Yong GC 和Full GC。針對內存占用問題,可以通過壓縮容器中的元素的方法來減少大對象的內存消耗,或者不使用CopyOnWrite容器,而使用其他的並發容器,如ConcurrentHashMap。
有兩種常見的CopyOnWrite容器:CopyOnWriteArrayList和CopyOnWriteArraySet,其中CopyOnWriteArrayList是ArrayList 的一個線程安全的變體。
ConcurrentHashMap
筆者分JDK1.7和JDK1.8兩部分說明ConcurrentHashMap。
JDK1.7 ConcurrentHashMap
JDK1.7采用"鎖分段"技術來降低鎖的粒度,它把整個map划分為一系列由segment組成的單元,一個segment相當於一個hashtable。通過這種方式,加鎖的對象就從整個map變成了一個segment。ConcurrentHashMap線程安全並且提高性能原因就在於:對map中的讀是並發的,無需加鎖;只有在put、remove操作時才加鎖,而加鎖僅是對需要操作的segment加鎖,不會影響其他segment的讀寫。因此不同的segment之間可以並發使用,極大地提高了性能。
根據源碼又可得出查找、插入、刪除的過程:通過key的hash確定segement(插入時如果segment大小達到擴容閾值則進行擴容) --> 確定鏈表數組HashEntry下標(插入/刪除時,獲取鏈表頭) --> 遍歷鏈表【查詢:調用equals()進行比對,找到與所查找key相等的結點並讀取;插入:如果找到相同的key的結點則更新value值,如果沒有則插入新結點;刪除:找到被刪除結點后,以被刪除結點的next結點開始建立新的鏈表,然后再把原鏈表頭直到被刪結點的前繼結點依次復制、插入新鏈表,最后把新鏈表頭設置為當前數組下標元素取代舊鏈表。
JDK1.8ConcurrentHashMap
JDK1.8中的ConcurrentHashMap在JDK1.7上做了很多優化:
1. 取消segments字段,直接采用transient volatile HashEntry<K,V>[] table保存數據,采用table數組元素作為鎖,從而實現了對每一行數據進行加鎖,通過進一步降低鎖粒度來減少並發沖突的概率
2. 將原先table數組+鏈表的數據結構,變更為table數組+鏈表+紅黑樹的結構。對於hash表來說,最核心的能力在於將key hash之后能均勻的分布在數組中。如果hash之后散列的很均勻,那么table數組中的每個隊列長度主要為0或者1。但實際情況並非總是如此理想,雖然ConcurrentHashMap類默認的加載因子為0.75,但是在數據量過大或者運氣不佳的情況下,還是會存在一些隊列長度過長的情況,如果還是采用單向列表方式,那么查詢某個節點的時間復雜度為O(n);因此,對於個數超過8(默認值)的列表,jdk1.8中采用了紅黑樹的結構,那么查詢的時間復雜度可以降低到O(logN),可以改進性能
3. 新增字段transient volatile CounterCell[] counterCells,可方便的計算集合中所有元素的個數,性能大大優於jdk1.7中的size()方法
相信通過這些介紹,大家對於諸如"為什么選擇ConcurrentHashMap?"會有很好的思路了。
關注微信公眾號:大數據學習與分享,獲取更對技術干貨