看了前兩篇你肯定已經理解了 java 並發編程的低層構建。然而,在實際編程中,應該經可能的遠離低層結構,畢竟太底層的東西用起來是比較容易出錯的,特別是並發編程,既難以調試,也難以發現問題,我們還是使用由並發處理的專業人員實現的較高層次的結構要方便、安全得多。
阻塞隊列
對於許多線程問題,都可以使用一個或多個隊列來安全、優雅的進行數據的傳遞。比如經典的生產者--消費者問題,生產者不停的生成某些數據,消費者需要處理數據,在多線程環境中,如何安全的將數據從生產者線程傳遞到消費者線程?
無需使用鎖和條件對象,java 自帶的阻塞隊列就能夠完美的解決這個問題。阻塞隊列中所有方法都是線程安全的,所以我們進行讀取、寫入操作時無需考慮並發問題。阻塞隊列主要有以下幾種方法:
方法 | 正常結果 | 異常結果 |
---|---|---|
add | 添加一個元素 | 隊列滿,拋出 IllegalStateException 異常 |
element | 返回隊列頭元素 | 隊列空,拋出 NoSuckElementException 異常 |
offer | 添加一個元素,返回 true | 隊列滿,返回 false |
peek | 返回隊列的頭元素 | 隊列空,返回 null |
poll | 移出並返回隊列頭元素 | 隊列空,返回 null |
put | 添加一個元素 | 隊列滿,阻塞 |
remove | 移出並返回頭元素 | 隊列空,拋出 NoSuckElementException 異常 |
take | 移出並返回頭元素 | 隊列空,則阻塞 |
上面的方法主要分成了三類,第一類:異常情況下拋出異常;第二類:異常情況返回 false/null;第三類:異常情況下阻塞。可以根據自身情況選擇合適的方法來操作隊列。
阻塞隊列的實現
在 java.util.concurrent 包中,提供了阻塞隊列的幾種實現,當前也可以自己實現 BlockingQueue 接口,實現自己的阻塞隊列。
- LinkdedBlockingQueue:鏈式阻塞隊列。一般情況下鏈式的結構容量都是沒有上限的,但是也可以選擇手動指定最大容量。
- LinkdedBlockingDeque:鏈式阻塞雙端隊列。
- PriorityBlockingQueue:優先級隊列。按照優先級移出,無容量上限。
- ArrayBlockingQueue:數組隊列,需指定容量。可選指定是否需要公平性,如果設置了公平性,等待了最長時間的線程會優先得到處理,但是會降低性能。
延遲隊列
DelayQueue 也是阻塞隊列的一種,不過它要求隊列中的元素實現Delayed
接口。需要重新兩個方法:
- long getDelay(TimeUnit unit)返回延遲的時間,負值表示延遲結束,只有延遲結束的情況下,元素才能從隊列中移出。
- int compareTo(Delayed o)比較方法,DelayQueue 使用該方法對元素進行排序。
傳遞隊列
在 Java SE 7 中新增了一個 TransferQueue 接口,允許生產者等待,直到消費者消費了某個元素。原本生產者消費者是沒有關系的,生產者並不知道某個元素是否被消費者消費了。通過此接口可以讓生產者知道某個元素確實被消費了。如果生產者調用:
q.transer(item)
方法,這個調用會阻塞,知道 item 被消費線程取出消費。LinkedTransferQueue 實現了此接口。
線程安全的集合
如果多個線程並發的操作集合,會很容易出現問題,我們可以選擇鎖來保護共享數據,但是更好的選擇是使用線程安全的集合來作為替代。本節介紹 Java 類庫中提供的線程安全的集合(上一節介紹的阻塞隊列也在其中)。
這類集合,size 是通過便利得出的,較慢。而且如果 size 數量大於 20 億,有可能超過 int 的范圍,使用 size 方法無法獲取到大小,在 java8 中引入了 mappingCount 方法,返回值類型為 long。
映射 map
映射是日常使用中非常常見的一種數據結構。共有以下幾種線程安全的映射:
- ConcurrentSkipListMap:有序映射,根據鍵排序
- ConcurrentHashMap:無序映射
映射條目的原子更新
一旦涉及到多線程環境,做啥都比較麻煩,比如更新一個 map 中某個鍵值對的值,下面的操作顯然是不正確的:
int old = map.get(key);
map.put(key,old+1);
假如有兩個線程同時操作一個 key,雖然 put 方法是線程安全的,但是由於兩個線程之前讀取的 old 是一樣的,這樣就會導致某個線程的修改被覆蓋掉。
有以下幾種安全的更新方法:
- 使用 repalce(key,oldValue,newValue)方法,此方法會在 key,oldValue 完全匹配時將 oldValue 換為 newValue 返回 true,否則返回 false。
- 使用 AtomicLong 或者 LongAdder 作為映射的值,這兩個的操作方法是原子性的,因此可以安全的修改值。 3.使用 compute 類似方法完成更新。比如下面的:
# 如果key不再map中,v的值為null
map.compute(key,(k,v)->v==null?1:v+1);
# 如果不存在key
map.computeIfAbsent(key,key->new LongAdder())
# 如果存在key
map.computeIfPresent(key,key->key+1)
# 和compute方法類似,不過不處理鍵
map.merge(key,value,(existingValue,newValue)->existingValue+newValue+1)
批操作
java8 引入的,即使有其他線程在處理映射,批操作也能安全的執行。批操作會遍歷映射,處理便利過程中找到的元素,且無需凍結當前映射的快照。顯然通過批操作獲取的結果不是完全精確的,因為遍歷過程中,元素可能會被改變。
有以下三種不同的操作:
- 搜索(search),遍歷結果直到返回一個非 null 的結果
- 歸約(reduce),組合所有鍵或值,需提供累加函數
- forEach,遍歷所有的鍵值對
每個操作都有 4 個版本: - operationKeys:處理鍵
- operationValues:處理值
- operation:處理鍵值
- operationEntries:處理需要 map.Entry 對象
並發集合
線程安全的 set 集合只有以下一種:
- ConcurrentSkipListSet:有序 set
如果我們想要一個 hash 結構的,線程安全的 set,有以下幾種辦法.
- 通過 ConcurrentHashMap.<Key>newKeySet()生成一個 Set
,比如:
Set<String> sets = ConcurrentHashMap.<String>newKeySet();
這其實只是 ConcurrentHashMap<Key,Boolean>的一個包裝器,所有的值都為 true
- 通過現有映射對象的 keySet 方法,生成這個映射的鍵集。如果刪除這個集的某個元素,映射上對於元素也會被刪除。但是不能添加元素,因為沒有相應的值。java8 新增了一個 keySet 方法,可以設置一個默認值,這樣就能為向集合中增加元素。
數組
在 Concurrent 包中只有一個CopyOnWriteArrayList
數組。該數組所有的修改都會對底層數組進行復制,也就是每插入一個元素都會將原來的數組復制一份並加入新的元素。
當構建一個迭代器時,迭代器指向的是當前數組的引用,如果后來數組被修改了,迭代器指向的任然是舊的數組。
任何集合類都可以通過使用同步包裝器變成線程安全的,如下:
//線程安全的列表
List<String> list1 = Collections.synchronizedList(new ArrayList<>());
//線程安全的map
Map<String,String> map1 = Collections.synchronizedMap(new HashMap<>());
//線程安全的set
Set<String> set1 = Collections.synchronizedSet(new HashSet<>());
並行數組算法
在 java 8 中,Arrays 類提供了大量的並行化操作。
- Arrays.parallelSort
對一個基本數據類型或對象的數組進行排序
- Arrays.paralletSetAll
用一個函數計算得到的值填充一個數組。這個函數接收元素索引,然后計算值。例如:
# 將所有值加上對於的序號
Arrays.parallelSetAll(arr,i->i+ arr[i]);
- parallelPrefix
用對應一個給定結合操作的前綴的累加結果替換各個數組元素。看文字描述不太容易看懂,這里用一個例子說明:
int[] arr = {1,2,3,4}
Arrays.parallelPrefix(arr,(x,y)->x*y);
// arr變成:[1,1*2,1*2*3,1*2*3*4]