

1.ConcurrentHashMap
1.1為什么要使用ConcurrentHashMap
在並發編程中使用HashMap可能導致程序死循環。而使用線程安全的HashTable效率又非常低下,基於以上兩個原因,便有了ConcurrentHashMap的登場機會。
(1)線程不安全的HashMap
在多線程環境下,使用HashMap進行put操作會引起死循環,導致CPU利用率接近100%。原因是表擴容的時候,容易形成環鏈
(2)效率低下的HashTable
HashTable容器使用synchronized來保證線程安全,但在線程競爭激烈的情況下HashTable的效率非常低下。因為當一個線程訪問HashTable的同步方法,其他線程也訪問HashTable的同步方法時,會進入阻塞或輪詢狀態。如線程1使用put進行元素添加,線程2不但不能使用put方法添加元素,也不能使用get方法來獲取元素,所以競爭越激烈效率越低。
(3)ConcurrentHashMap的鎖分段技術可有效提升並發訪問率
HashTable容器在競爭激烈的並發環境下表現出效率低下的原因是所有訪問HashTable的線程都必須競爭同一把鎖,假如容器里有多把鎖,每一把鎖用於鎖容器其中一部分數據,那么當多線程訪問容器里不同數據段的數據時,線程間就不會存在鎖競爭,從而可以有效提高並發訪問效率,這就是ConcurrentHashMap所使用的鎖分段技術。首先將數據分成一段一段地存儲,然后給每一段數據配一把鎖,當一個線程占用鎖訪問其中一個段數據的時候,其他段的數據也能被其他線程訪問。
1.2 ConcurrentHashMap的結構
ConcurrentHashMap是由Segment數組結構和HashEntry數組結構組成。Segment是一種可重入鎖(ReentrantLock),在ConcurrentHashMap里扮演鎖的角色;HashEntry則用於存儲鍵值對數據。一個ConcurrentHashMap里包含一個Segment數組。Segment的結構和HashMap類似,是一種數組和鏈表結構。一個Segment里包含一個HashEntry數組,每個HashEntry是一個鏈表結構的元素,每個Segment守護着一個HashEntry數組里的元素,當對HashEntry數組的數據進行修改時,必須首先獲得與它對應的Segment鎖,如圖1-2所示。


1.3.get操作
get操作的高效之處在於整個get過程不需要加鎖,除非讀到的值是空才會加鎖重讀。我們知道HashTable容器的get方法是需要加鎖的,那么ConcurrentHashMap的get操作是如何做到不加鎖的呢?原因是它的get方法里將要使用的共享變量都定義成volatile類型,如用於統計當前Segement大小的count字段和用於存儲的HashEntry的value。定義成volatile的變量,能夠在線程之間保持可見性,能夠被多線程同時讀,並且保證不會讀到過期的值,但是只能被單線程寫(有一種情況可以被多線程寫,就是寫入的值不依賴於原值),在get操作里只需要讀不需要寫共享變量count和value,所以可以不用加鎖。之所以不會讀到過期的值,是因為根據Java內存模型的happen before原則,對volatile字段的寫入操作先於讀操作,即使兩個線程同時修改和獲取volatile變量,get操作也能拿到最新的值,這是volatile替換鎖的經典應用場景。
1.4put操作
由於put方法里需要對共享變量進行寫入操作,所以為了線程安全,在操作共享變量時必須加鎖。put方法首先定位到Segment,然后在Segment里進行插入操作。插入操作需要經歷兩個步驟,第一步判斷是否需要對Segment里的HashEntry數組進行擴容,第二步定位添加元素的位置,然后將其放在HashEntry數組里。
如何擴容?首先會創建一個容量是原來容量兩倍的數組,然后將原數組里的元素進行再散列后插入到新的數組里。為了高效,ConcurrentHashMap不會對整個容器進行擴容,而只對某個segment進行擴容。
1.5size操作
如果要統計整個ConcurrentHashMap里元素的大小,就必須統計所有Segment里元素的大小后求和。Segment里的全局變量count是一個volatile變量,那么在多線程場景下,是不是直接把所有Segment的count相加就可以得到整個ConcurrentHashMap大小了呢?不是的,雖然相加時可以獲取每個Segment的count的最新值,但是可能累加前使用的count發生了變化,那么統計結果就不准了。所以,最安全的做法是在統計size的時候把所有Segment的put、remove和clean方法全部鎖住,但是這種做法顯然非常低效。因為在累加count操作過程中,之前累加過的count發生變化的幾率非常小,所以ConcurrentHashMap的做法是先嘗試2次通過不鎖住Segment的方式來統計各個Segment大小,如果統計的過程中,容器的count發生了變化,則再采用加鎖的方式來統計所有Segment的大小。
那么ConcurrentHashMap是如何判斷在統計的時候容器是否發生了變化呢?使用modCount變量,在put、remove和clean方法里操作元素前都會將變量modCount進行加1,那么在統計size前后比較modCount是否發生變化,從而得知容器的大小是否發生變化。
2 ConcurrentLinkedQueue
一種使用
非阻塞算法的基於
鏈接節點的
無界線程安全隊列,它采用先進先出的規則對節點進行排序,當我們添加一個元素的時候,它會添加到隊列的尾部;當我們獲取一個元素時,它會返回隊列頭部的元素。它采用了CAS算法來實現


入隊和出隊,都不會立即更新head和tair指針:

- 添加元素1。設置tail節點的next節點為元素1節點。
- 添加元素2。設置元素1的next節點為元素2節點,然后將tail節點指向元素2節點。
- 添加元素3,設置tail節點的next節點為元素3節點。
- 添加元素4,設置元素3的next節點為元素4節點,然后將tail節點指向元素4節點。
發現入隊主要做兩件事情:
第一是將入隊節點設置成tail節點的next節點(如果next為空)或者next(如果next不為空)的next節點;
第二是更新tail節點。如果tail節點的next節點為空,則將入隊節點設置成tail的next節點。如果tail節點的next節點不為空,則將入隊節點設置成tail節點,所以tail節點不總是尾節

出隊列的就是從隊列里返回一個節點元素,並清空該節點對元素的引用。讓我們通過每個節點出隊的快照來觀察一下head節點的變化,如圖6-5所示。
從圖中可知,並不是每次出隊時都更新head節點,當head節點里有元素時,直接彈出head節點里的元素,而不會更新head節點。只有當head節點里沒有元素時,出隊操作才會更新head節點。這種做法也是通過hops變量來減少使用CAS更新head節點的消耗,從而提高出隊效率。
首先獲取頭節點的元素,然后判斷頭節點元素是否為空,如果為空,表示另外一個線程已經進行了一次出隊操作將該節點的元素取走,如果不為空,則使用CAS的方式將頭節點的引用設置成null,如果CAS成功,則直接返回頭節點的元素,如果不成功,表示另外一個線程已經進行了一次出隊操作更新了head節點,導致元素發生了變化,需要重新獲取頭節點。
3什么是阻塞隊列
阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作支持阻塞的插入和移除方法。
1)支持阻塞的插入方法:意思是當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿。
2)支持阻塞的移除方法:意思是在隊列為空時,獲取元素的線程會等待隊列變為非空。


·拋出異常:當隊列滿時,如果再往隊列里插入元素,會拋出IllegalStateException("Queuefull")異常。當隊列空時,從隊列里獲取元素會拋NoSuchElementException異常。
·返回特殊值:當往隊列插入元素時,會返回元素是否插入成功,成功返回true。如果是移除方法,則是從隊列里取出一個元素,如果沒有則返回null。
·一直阻塞:當阻塞隊列滿時,如果生產者線程往隊列里put元素,隊列會一直阻塞生產者線程,直到隊列可用或者響應中斷退出。當隊列空時,如果消費者線程從隊列里take元素,隊列會阻塞住消費者線程,直到隊列不為空。
·超時退出:當阻塞隊列滿時,如果生產者線程往隊列里插入元素,隊列會阻塞生產者線程一段時間,如果超過了指定的時間,生產者線程就會退出。
JDK 7提供了7個阻塞隊列,如下。
·ArrayBlockingQueue:一個由數組結構組成的有界阻塞隊列。
·LinkedBlockingQueue:一個由鏈表結構組成的有界阻塞隊列。
·PriorityBlockingQueue:一個支持優先級排序的無界阻塞隊列。
·DelayQueue:一個使用優先級隊列實現的無界阻塞隊列。
·SynchronousQueue:一個不存儲元素的阻塞隊列。
·LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。
·LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。
4.Fork/Join框架
Fork/Join框架是Java 7提供的一個用於並行執行任務的框架,是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架。
1.工作竊取(work-stealing)
是指某個線程從其他隊列里竊取任務來執行。干完活的線程與其等着,不如去幫其他線程干活,於是它就去其他線程的隊列里竊取一個任務來執行。而在這時它們會訪問同一個隊列,所以為了減少竊取任務線程和被竊取任務線程間的競爭,通常會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。
優點:充分利用線程進行並行計算,減少了線程間的競爭。
缺點:在某些情況下還是存在競爭,比如雙端隊列里只有一個任務時。並
且該算法會消耗了更多的系統資源,比如創建多個線程和多個雙端隊列。
2.java實現
Fork/Join使用兩個類來完成以上兩件事情。
①ForkJoinTask:我們要使用ForkJoin框架,必須首先創建一個ForkJoin任務。它提供在任務中執行fork()和join()操作的機制。通常情況下,我們不需要直接繼承ForkJoinTask類,只需要繼承它的子類,Fork/Join框架提供了以下兩個子類。
·RecursiveAction:用於沒有返回結果的任務。
·RecursiveTask:用於有返回結果的任務。
②ForkJoinPool:ForkJoinTask需要通過ForkJoinPool來執行。
public class CountTask extends RecursiveTask<Integer> { private static final int THRESHOLD = 2;// 閾值 private int start; private int end; public CountTask(int start, int end) { this.start = start; this.end = end; } protected Integer compute() { int sum = 0; // 如果任務足夠小就計算任務 boolean canCompute = (end - start) <= THRESHOLD; if (canCompute) { for (int i = start; i <= end; i++) { sum += i; } } else { // 如果任務大於閾值,就分裂成兩個子任務計算 int middle = (start + end) / 2; CountTask leftTask = new CountTask(start, middle); CountTask rightTask = new CountTask(middle + 1, end); // 執行子任務 leftTask.fork(); rightTask.fork(); // 等待子任務執行完,並得到其結果 int leftResult=leftTask.join(); int rightResult=rightTask.join(); // 合並子任務 sum = leftResult + rightResult; } return sum; } static public void test(){ ForkJoinPool forkJoinPool = new ForkJoinPool(); // 生成一個計算任務,負責計算1+2+3+4 CountTask task = new CountTask(1, 40000); // 執行一個任務 Future<Integer> result = forkJoinPool.submit(task); try { System.out.println(result.get()); } catch (InterruptedException e) { } catch (ExecutionException e) { } } }