Java系列筆記(6) - 並發(上)


目錄

1,基本概念

2,volatile

3,atom

4,ThreadLocal

5,CountDownLatch和CyclicBarrier

6,信號量

7,Condition

8,Exchanger

 

在Java中,JVM、並發、容器、IO/NIO是我認為最重要的知識點,本章將介紹其中的並發,這也是從“會Java”到精通Java所必須經歷的一步。本章承接上一張《Java系列筆記(5) - 線程》,其中介紹了Java線程的相關知識,是本章介紹內容的基礎,如果對於線程不熟悉的,可以先閱讀以下這篇博客了解一下。

在上一篇博客《線程》中,我們講到了Java的內存模型,事實上,Java內存模型的建立,是圍繞着一個原則來進行的:在保證線程間合作的基礎上,避免線程的不良影響。而這一原則,也是本章所介紹的並發機制的最終目的。

本文大量參考了系列文章《深入淺出Java Concurrency,http://www.blogjava.net/xylz/archive/2010/07/08/325587.html》,這是一系列十分優秀也十分明晰的文章,我在學習java並發過程中,對這個系列的文章讀了很多遍,這個系列的文章作者寫的很容易理解而且很詳盡,想要進一步理解java並發的同學,可以仔細去拜讀一下《深入淺出Java Concurrency》這一系列文章。

在原來的計划中,鎖和Condition等概念是放在一起的,不過限於篇幅問題,拆成兩個部分,上部分是本文,講解基本的並發概念、CountDownLatch,原子類、信號量等類,下部分集中講鎖和並發異常。

本文在編寫過程中,參考、引用和總結了《深入淺出Java Concurrency》的內容以及其他Java並發書籍博客的內容,篇幅有限,所以可能總結的不到位,敬請指正。

 

1,基本概念


Java並發的重要性毋庸置疑,Java並發的設計目的在於3個方面:

簡單,意味着程序員盡可能少的操作底層或者實現起來要比較容易;

高效,意味着耗用資源要少,程序處理速度要快;

線程安全,意味着在多線程下能保證數據的正確性。

在Java並發中,有幾個常見概念,需要在講述並發之前進行解釋:

 

臨界資源和臨界區

臨界資源是一般是一種內存資源,一個時刻只允許一個進程(在java中,是線程)訪問,一個線程正在使用臨界資源的時候,另一個線程不能使用。臨界資源是非可剝奪性資源,即使是操作系統(或JVM)也無法阻止這種資源的獨享行為。

臨界區是一種進程中范文臨界資源的那段程序代碼,注意,是程序代碼,不是內存資源了,這就是臨界資源與臨界區的區別。我們規定臨界區的使用原則(也即同步機制應遵循的准則)十六字訣:“空閑讓進,忙則等待,有限等待,讓權等待”–strling。讓我們分別來解釋一下:

(1)空閑讓進:臨界資源空閑時一定要讓進程進入,不發生“互斥禮讓”行為。

(2)忙則等待:臨界資源正在使用時外面的進程等待。

(3)有限等待:進程等待進入臨界區的時間是有限的,不會發生“餓死”的情況。

(4)讓權等待:進程等待進入臨界區是應該放棄CPU的使用。

並發

狹義的只就Java而言,Java多線程在訪問同一資源時,出現競爭的問題,叫做並發問題,Java並發模型是圍繞着在並發過程中如何處理原子性、可見性、有序性這3個特征來設計的。

 

線程安全

如果一個操作序列,不考慮耗時和資源消耗,在單線程執行和多線程執行的情況下,最終得到的結果永遠是相同的,則這個操作序列叫做線程安全的。

如果存在不相同的概率,則就是非線程安全的。

 

原子性(Atomicity)

如果一個操作時不可分割的,那就是一個原子操作,也叫這個操作具有原子性。相反的,一個操作時可以分割的(如a++,它實際上是a=a+1),則就是非原子操作;原子操作是線程安全的,非原子操作都是非線程安全的,但是我們可以通過同步技術(lock)或同步數據模型(Concurrent容器等)把非原子操作序列變成線程安全的原子操作。

事實上,java並發主要研究的就是3個方面的問題:

1,怎么更好的使用原子操作;

2,怎么把非原子操作變得線程安全;

3,怎么提高原子操作和非原子操作的效率並減少資源消耗。

 

可見性(Visibility)

一個變量被多個線程共享,如果一個線程修改了這個變量的值,其它線程能夠立即得知這個修改,則我們稱這個修改具有可見性。

(可參考上一章《Java系列筆記(5)-線程》中的Java線程內存模型部分),Java線程內存模型的設計,是每個線程擁有一份自己的工作內存,當變量修改之后,將新值同步到主內存。但是對於普通變量而言,這種同步,並不能保證及時性,所以可能出現工作內存以及更改,主內存尚未同步的情況。

Java中,最簡單的方法是使用volatile實現強制同步,它的實現方式是保證了變量在修改后立即同步到主內存,且每次使用該變量前,都先從主內存刷新該值。

另外,可以采用synchronized或final關鍵字實現可見性;

synchronized的實現原理在於,一個變量如果要執行unlock操作,必須先把改變量同步到主內存中(執行store和write)。因此一個變量如果被synchronized實現強制同步,則即使不用volatile,也可以實現強制可見性。

final的實現原理在於,一個變量被final修飾,則其值(或引用地址)不可以再被修改,所以其它線程如果能看到,也只是能看到這個變量的這個唯一值(對於對象而言,是唯一引用)。

需要注意,一個操作被保證了可見性,並不一定能保證原子性,比如:

volatile int a;
a++;

在上面這段代碼中,a是滿足可見性的,但是a++仍然不是原子性操作。當有多個線程執行a++時,仍然存在並發問題。

 

有序性(Ordering)

Java線程的有序性,表現為兩個方面:

在一個線程內部觀察,所有操作都是有序的,所有指令按照“串行(as-if-serial,字面意思是“像排了序一樣”,as-if-serial的真正含義是不管怎么重排序,一個單線程程序的執行結果都必須相同)” 的方式執行。

 

在線程間觀察,也就是從某個線程觀察另一個線程,則所有其他線程都可以交叉並行執行,是正序的,唯一例外的是被同步方法、同步塊、volatile等字段修飾的強制同步的代碼,需要在線程間保持有序。

 

注:關於指令重排序、as-if-serial、happens-before等,可以參考上一章《Java系列筆記(5)-線程》,也可以參考網上的眾多資料,這里不再敘述。

 

JUC

java.util.concurrent包,這個包是從JDK1.5開始引入的,在此之前,這個包獨立存在着,它是由Doug Lea開發的,名字叫backport-util-concurrent,在1.5開始引入java,命名路徑為java.util.concurrent,其中的基本實現方式,也有所改變。主要包括以下類:(來源於:深入淺出Java Concurreny(http://www.blogjava.net/xylz/archive/2010/06/30/324915.html))

 

JNI

Java native interface,java本地方法接口,由於Java本身是與平台無關的,所以在性能等方面有可能存在影響(雖然隨着java的發展,這種情況很少),為了解決這種問題,使用C/C++編寫了JNI接口,在java中可以直接調用這些代碼的產生的機器碼,從而避免嚴重影響性能的代碼段。關於JNI,可以參考這篇文章:http://www.cnblogs.com/mandroid/archive/2011/06/15/2081093.html

 

CAS

CAS,compare and swap,比較和替換(也有人直接理解為compare and set,其實是一樣的)。CAS是一種樂觀鎖做法,而且整個JUC的實現都是基於CAS機制的。

如果直接用synchronized加鎖,這是一種悲觀鎖做法,所謂悲觀鎖,就是悲觀的認為線程是絕對不安全的,必須保證在swap值之前,沒有任何其它線程操作當前值。synchronized是一種獨占鎖,性能受限於這種悲觀策略。這一點將在后面詳述。

而CAS是一種樂觀鎖機制,所謂樂觀鎖,就是相信在compare 和swap之間,被其它線程影響的可能性不大,只要compare校驗通過,就可以進行swap。

在Java中,compareAndSet的基本代碼如下:

1 public final boolean compareAndSet(int expect, int update) {
2   return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
3 }

 

從代碼中看,java的compareAndSet使用使用JNI中的unsafe接口來實現的,這是因為,現代CPU基本都提供了特殊的指令,能夠做到自動更新共享數據的同時,檢測其它線程的干擾,也就是說,CPU本身提供了compareAndSet功能。所以才能提供JNI的CAS接口。

有了JNI的CAS接口,基於該接口的JUC就能獲得更高性能。

在 Intel 處理器中,比較並交換通過指令cmpxchg實現。比較是否和給定的數值一致,如果一致則修改,不一致則不修改。

 

AQS

AbstractQueuedSynchronizer,簡稱AQS,是J.U.C最復雜的一個類。這個類是CountDownLatch/FutureTask /ReentrantLock/RenntrantReadWriteLock/Semaphore的基礎,是Lock和Executor實現的前提。參考:(http://www.blogjava.net/xylz/archive/2010/07/06/325390.html)

非阻塞算法

任何一個線程的失敗或掛起不應該影響其他線程的失敗或掛起的算法叫做非阻塞算法。現代CPU能夠提供非阻塞功能,它可以在自動更新共享數據的同時,檢查其它線程的干擾。

 

2,volatile


正如前面所述,java中volatile字段的作用是保證並發過程中某個變量的可見性。而volatile保證可見性的方法如下:

1,Java內存模型不會對volatile指令進行重排序,從而保證對volatile變量的執行順序,永遠是按照其出現順序執行的。重排序的依據是happens-before法則,happens-before法則共8條,其中有一條與volatile相關,就是:“對volatile字段的寫入操作happens-before於每一個后續的同一個字段的讀操作”。

注:happens-before法則:http://www.blogjava.net/xylz/archive/2010/07/03/325168.html

2,volatile變量不會被緩存在寄存器中(只有擁有線程可見)或者其他對CPU不可見的地方,每次總是從主存中讀取volatile變量的結果。

不過需要注意的是:volatile字段只能保證可見性,不能保證原子性,也不能保證線程安全。

 

volatile的工作原理

下面這段話摘自《深入理解Java虛擬機》:

  “觀察加入volatile關鍵字和沒有加入volatile關鍵字時所生成的匯編代碼發現,加入volatile關鍵字時,會多出一個lock前綴指令”

  lock前綴指令實際上相當於一個內存屏障(也成內存柵欄),內存屏障會提供3個功能:

  1)它確保指令重排序時不會把其后面的指令排到內存屏障之前的位置,也不會把前面的指令排到內存屏障的后面;即在執行到內存屏障這句指令時,在它前面的操作已經全部完成;

  2)它會強制將對緩存的修改操作立即寫入主存;

  3)如果是寫操作,它會導致其他CPU中對應的緩存行無效。

上面的說法解釋了volatile的工作原理的起源。不過,建議大家復習一下本系列文章第3章JVM內存分配和第5章線程的內容,來理解下面的解釋。與前面這兩章中宏觀的講解內存分配和線程內存模型相區別,下面的部分專注於解析java內存模型和volatile的工作原理,但也能更好的理解以前的知識:

注:下面的圖參考了:http://www.cnblogs.com/aigongsi/archive/2012/04/01/2429166.html,其中描述了內存模型的6種操作,比上一章中介紹的8種操作少了lock、unlock 2 種,這6種操作都是原子性的。

 

在上圖中,如果是普通變量:

1,變量值從主內存(在堆中)load到本地內存(在當前線程的棧楨中);

2,之后,線程就不再和該變量在主內存中的值由任何關系,而是直接操作在副本變量上(這樣速度很快),這時,如果主存中的count或本地內存中的副本發生任何變化,都不會影響到對方,也正是這個原因導致並發情況下出現數據不一致;

3,在修改完的某個時刻(線程退出之前),自動把本地內存中的變量副本值回寫到對象在堆中的對應變量。

 

這6步操作中:

read和load是將主存變量加載到當前本地內存;

use和assign是執行線程代碼,改變副本值,可以多次執行;

store和write是用本地內存回寫到主存;

 

如果是volatile修飾的變量:

volatile仍然在執行一個從主存加載到工作內存,並且將變更的值寫回主存的操作,但是:

1,volatile保證每次讀取該變量前,都判斷當前值是否已經失效(即是否已經與主存不一致),如果已經失效,則從主存load最新的變量;

2,volatile保證每次對該變量做出修改時,都立即寫入主存;

 

需要注意的是,雖然volatile保證了上面的特性,但是它只是保證了可見性,卻沒有保證原子性,也就是說,read-load-use-assign-store-write,這些操作序列組合起來仍然是非原子操作。舉個例子:

共享變量當前在主存中的值為count=10;線程1和線程2都對該值進行自增操作,按如下步驟進行:

1,線程1和2都讀取最新值,得到值為count=10;

2,線程1被阻塞;

3,線程2執行自增,寫回count=11;

4,線程1喚醒,由於之前已經完成了讀取變量的操作,所以這里直接進行自增。於是也自增到11,回寫主存,最終count=11;

與我們期望的兩次自增count=12沖突;

目前來說,要保證原子性,只能通過synchronized、Lock接口、Atomic*來實現。

 

說了這么多,有同學可能會問,為什么volatile這也不行那也不行,陷阱這么多,我們還要用它呢?

volatile相對於synchronized,最大的好處是某些情況下它的性能高,而且使用起來直觀簡便。而且,如果你的“代碼本身能保證原子性”,那么用volatile是個不錯的選擇:

這里所說的代碼本身能保證原子性,是指:

1,對變量的寫操作,不依賴於當前的值(就是說,不會先讀取當前值,然后在當前值的基礎上進行改變,比如,不是自增,而是賦值);

2,變量沒有包含在其它變量的不變式中(這一點不是很好理解,可以參考這里:http://www.ibm.com/developerworks/cn/java/j-jtp06197.html)

 

一個最常見的volatile的應用場景是boolean的共享狀態標志位,或者單例模式的雙重檢查鎖(參考Java並發編程:volatile關鍵字解析,http://www.cnblogs.com/dolphin0520/p/3920373.html)

 

另外,有一個關於volatile的常見的坑就是:從上面的描述可以看出,volatile對於基本數據類型(值直接從主內存向工作內存copy)才有用。但是對於對象來說,似乎沒有用,因為volatile只是保證對象引用的可見性,而對對象內部的字段,它保證不了任何事。即便是在使用ThreadLocal時,每個線程都有一份變量副本,這些副本本身也是存儲在堆中的,線程棧楨中保存的仍然是基本數據類型和變量副本的引用。

所以,千萬不要指望有了volatile修飾對象,對象就會像基本數據類型一樣整體呈現原子性的工作了。

事實上,如果一個對象被volatile修飾,那么就表示它的引用具有了可見性。從而使得對於變量引用的任何變更,都在線程間可見。

這一點在后面將要介紹的AtomicReference中就有應用。

 

3,Atom


java中,可能有一些場景,操作非常簡單,但是容易存在並發問題,比如i++,此時,如果依賴鎖機制,可能帶來性能損耗等問題,於是,如何更加簡單的實現原子性操作,就成為java中需要面對的一個問題。

java中的atom操作,比如AtomicInteger,AtomicLong,AtomicBoolean,AtomicReference,AtomicIntegerArray/AtomicLongArray/AtomicReferenceArray;這些操作中旺旺提供一些原子化操作,比如incrementAndGet(相當於i++),compareAndSet(安全賦值)等,相關方法和用法就不再贅述,網上有很多類似資料,或者直接讀源代碼也很容易懂。

在backport-util-concurrent沒有被引入java1.5並成為JUC之前,這些原子類和原子操作方法,都是使用synchronized實現的。不過JUC出現之后,這些原子操作基於JNI提供了新的實現,以AtomicInteger為例,看看它是怎么做到的:

如果是讀取值,很簡單,將value聲明為volatile的,就可以保證在沒有鎖的情況下,數據是線程可見的:

1     private volatile int value;public final int get() {
2       return value;
3     }

 

那么,涉及到值變更的操作呢?以AtomicInteger實現:++i為例:

1     public final int incrementAndGet() {
2      for (;;) {
3        int current = get();
4        int next = current + 1;  
5        if (compareAndSet(current, next))  
6          return next;  
7        }
8     } 

 

在這里采用了CAS操作,每次從內存中讀取數據然后將此數據和+1后的結果進行CAS操作,如果成功就返回結果,否則重試直到成功為止。

而這里的comparAndSet(current,next)就是前面介紹CAS的時候所說的依賴JNI實現的樂觀鎖做法:

    public final boolean compareAndSet(int expect, int update) {  
       return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

 

除了基本數據類型的原子化操作以外,JUC還提供了數組的原子化、引用的原子化,以及Updater的原子化,分別為:

下面主要介紹這3類原子化操作為什么要原子化以及分別是怎么實現的。

 

數組原子化

注意,Java中Atomic*Array,並不是對整個數組對象實現原子化(也沒有必要這樣做),而是對數組中的某個元素實現原子化。例如,對於一個整型原子數組,其中的原子方法,都是對每個元素的:

1 public final int getAndDecrement(int i) {
2       while (true) {
3        int current = get(i);
4        int next = current - 1;
5        if (compareAndSet(i, current, next))
6             return current;
7       }
8 } 

 

引用原子化

有些同學可能會疑惑,引用的操作本身不就是原子的嗎?一個對象的引用,從A切換到B,本身也不會出現非原子操作啊?這種想法本身沒有什么問題,但是考慮下嘛的場景:對象a,當前執行引用a1,線程X期望將a的引用設置為a2,也就是a=a2,線程Y期望將a的引用設置為a3,也就是a=a3。

如果線程X和線程Y都不在意a到底是從哪個引用通過賦值改變過來的,也就是說,他們不在意a1->a2->a3,或者a1->a3->a2,那么就完全沒有關系。

但是,如果他們在乎呢?

X要求,a必須從a1變為a2,也就是說compareAndSet(expect=a1,setValue=a2);Y要求a必須從a1變為a3,也就是說compareAndSet(expect=a1,setValue=a3)。如果嚴格遵循要求,應該出現X把a的引用設置為a2后,Y線程操作失敗的情況,也就是說:

X:a==a1--> a=a2;

Y:a!=a1 --> Exception;

但是如果沒有原子化,那么Y會直接將a賦值為a3,從而導致出現臟數據。

 

這就是原子引用AtomicReference存在的原因。

1      public final V getAndSet(V newValue) {
2            while (true) {
3                V x = get();
4                if (compareAndSet(x, newValue))
5                    return x;
6            }
7        }

注意,AtomicReference要求引用也是volatile的。

Updater原子化

其它幾個Atomic類,都是對被volatile修飾的基本數據類型的自身數據進行原子化操作,但是如果一個被volatile修飾的變量本身已經存在在類中,那要如何提供原子化操作呢?比如,一個Person,其中有個屬性為age,private volatile int age;,如何對age提供原子化操作呢?

1 private AtomicIntegerFieldUpdater<Person> updater = AtomicIntegerFieldUpdater.newUpdater(Person.class, "age");  
2 updater.getAndIncrement(5);//加5歲
3 updater.compareAndSet(person, 30, 35)//如果一個人的年齡是30,設置為35。

 

4,ThreadLocal


對於多線程的Java程序而言,難免存在多線程競爭資源的情況。對於競爭的資源,解決的方式往往分為以時間換空間或以空間換時間兩種方式。

1,后者的做法是將一份資源復制成多份,占用多份空間,但是每個線程自己訪問自己的資源,從而消除競爭,這種做法是ThreadLocal的做法,它雖然消除了競爭,但它是通過數據隔離的方法實現的,所以被隔離的各份數據是無法同步的,本節就要介紹這種做法。

2,也有很多資源是無法復制成多份或者不適合復制成多份的,如打印機資源。因此以時間換空間的做法就是只有一份資源,大家按照一定的順序串行的去訪問這個資源。這種方式的主要做法,就是在資源上加鎖,加鎖的方法,將在后面第9節介紹。

 

示例

下面通過一個典型的ThreadLocal的應用案例作為入口,來分析ThreadLocal的原理和用法(更詳細代碼請參考《Java並發編程:深入剖析ThreadLocal》http://www.cnblogs.com/dolphin0520/p/3920407.html):

設想下面的場景:

編寫一個數據庫連接器(或 http session管理器),要求多個線程能夠連接和關閉數據庫,優先考慮下面的方案:

 1     class ConnectionManager {
 2        private static Connection connect = null;
 3        public static Connection openConnection() {
 4            if(connect == null){
 5                connect = DriverManager.getConnection();
 6            }
 7            return connect;
 8        }
 9        public static void closeConnection() {
10            if(connect!=null)
11                connect.close();
12        }
13     }

這個方案中,多個線程公用ConnectionManager.openConnection()和ConnectionManager.closeConnnection(),由於沒有同步控制,所以很容易出現並發問題,比如,同時創建了多個連接,或者線程1openConnection時,線程2恰好在執行closeConnection。

解決這個問題有兩種方案:

1,對connectionManager中openConnection和closeConection加synchronized強制同步。這種方案解決了並發,卻帶來了新問題,由於synchronized導致了同一只可只有一個線程能訪問被鎖對象,所以其它線程只能等待。

2,去掉ConnectionManager中的static,使得每次訪問Connectionmanager,都必須new一個對象,這樣每個線程都用自己的獨立對象,相互不影響。eg:

1     public void insert() {
2            ConnectionManager connectionManager = new ConnectionManager();
3            Connection connection = connectionManager.openConnection();
4            
5            //使用connection進行操作
6            
7            connectionManager.closeConnection();
8     }

 

這個確實解決了並發,並且也可以多線程同步執行,但是它存在嚴重的性能問題,每執行一次操作,就需要new一個對象然后再銷毀。

ThreadLocal的引入,恰當的解決了上面的問題,ThreadLocal不是線程,它是一種變量,不過,它是線程變量的副本,它是一個泛型對象,例如,線程A創建時,初始化了一個對象user,那么ThreadLocal<User> userLocal就是user在線程A中的一個副本,userLocal中的值在初始時與user相同,但是在線程A運行過程中,userlocal的任何變化不會同步到user上,不會影響user的值。

如果采用ThreadLocal,上面的數據庫連接管理器問題的解決方案是:

 1     class ConnectionManager {
 2         private static ThreadLocal<Connection> connectionHolder = new ThreadLocal<Connection>() {
 3              public Connection initialValue() {
 4         return DriverManager.getConnection(DB_URL);
 5              }
 6         };
 7         public static Connection getConnection() {
 8              return connectionHolder.get();
 9         }
10        
11        public static void closeConnection() {
12            if(connectionHolder.get()!=null)
13               connectionHolder.get().close();
14        }
15     }

 

ThreadLocal的方法

ThreadLocal提供的方法很簡單,主要有:

  1. public T get() { }
  2. public void set(T value) { }
  3. public void remove() { }
  4. protected T initialValue() { }

 

ThreadLocal的原理

分析ThreadLocal的源代碼(分析過程參考這里:http://www.cnblogs.com/dolphin0520/p/3920407.html),可得,ThreadLocal的原理是:

1,在每個線程Thread內部有一個ThreadLocal.ThreadLocalMap類型的成員變量threadLocals,這個threadLocals就是用來存儲實際的變量副本的,鍵值為當前ThreadLocal變量,value為變量副本(即T類型的變量)。

2,初始時,在Thread里面,threadLocals為空,當通過ThreadLocal變量調用get()方法或者set()方法,就會對Thread類中的threadLocals進行初始化,並且以當前ThreadLocal變量為鍵值,以ThreadLocal要保存的副本變量為value,存到threadLocals。

3,注意,一般在get之前,需要先執行set(),以保證threadlocals中有值,如果在get()之前,沒有執行過set(),則ThreadLocal會自動調用setInitialValue()方法,setInitialValue()的源代碼是這樣的:

 1         private T setInitialValue() {
 2            T value = initialValue();
 3            Thread t = Thread.currentThread();
 4            ThreadLocalMap map = getMap(t);
 5            if (map != null)
 6                map.set(this, value);
 7            else
 8                createMap(t, value);
 9            return value;
10        }

 

它先取出當前線程的ThreadLocalMap,即threadLocals變量,然后將value set進去,所以,如果沒有提前執行過set方法,initialValue()默認返回的又是null,所以可能導致運行過程中出現NPE。建議最好在聲明ThreadLocal變量時,重寫initialValue()方法,這樣即使沒有提前執行set,也能有個初始值(如前面ConnectionHolder中的代碼)。

4,然后在當前線程里面,如果要使用副本變量,就可以通過get方法在threadLocals里面查找。

ThreadLocal泛型的變量類型,不能是基本數據類型,只能是類,如果一定要將基本上數據類型做泛型參數,則可以采用Integer、Long、Double等類。

 

使用ThreadLocal的步驟

1,、在多線程的類(如ThreadDemo類)中,創建一個ThreadLocal<Object>對對象xxxLocal,用來保存線程間需要隔離處理的對象xxx。

2、在ThreadDemo類中,創建一個獲取要隔離訪問的數據的方法getXxx(),在方法中判斷,若ThreadLocal對象為null時候,應該new()一個隔離訪問類型的對象,並強制轉換為要應用的類型。

3、在ThreadDemo類的run()方法中,通過getXxx()方法獲取要操作的數據,這樣可以保證每個線程對應一個數據對象,在任何時刻都操作的是這個對象。

 

ThreadLocal實現變量副本的方法

ThreadLocal實現變量副本,並沒有真的將原來的變量clone一份出來,而是采用了一種很靈活的方法,假設對每個單獨的線程ThreadA而言,當前ThreadLocal為localXx(這是key),初始外部變量為va(這是value):

1,第一次執行set時,new了一個Entry(localXx, va),並添加到localXx的ThreadLocalMap中,此時,Entry.value的引用就是指向va的強引用;

2,此時如果執行localXx.get(),會得到va

3,此時,如果在當前線程ThreadA直接對va執行set操作,仍然會更新外部變量va的值,但如果在另外一個線程ThreadB中希望對va進行操作,則由於此時ThreadB直接執行get得到的是null,所以無法訪問va,除非我們將va聲明為final的,並set到ThreadB中;

3,后續再進行set時,比如set進來的新值為va1,則直接替換Entry中的value,得到Entry(localXx, va1),此時原來的va在ThreadLocal這里,已經得到釋放了,當前ThreadLocal跟原來的va已經沒有任何關系了。

4,如果此時再執行get操作,得到的就是新的va1;

 

從上面的步驟可以看出,ThreadLocal只是用原變量va做為初始值,但是它並未真的復制va,后續執行ThreadLocal.set之后,ThreadLocal中存放的已經是新set的對象了;

這也是為什么ThreadLocal只能對類對象有效的原因了,因為它的set,改變的是value的引用。

具體例子可以參考下面的代碼:

下面的例子中User包含兩個屬性:name、age,重寫了toString方法;

 1     public class ThreadLocalTest {
 2      ThreadLocal<User > userLocal = new ThreadLocal <User>();
 3      public void set(User user) {
 4        userLocal.set(user);
 5      }
 6      public User get() {
 7        return userLocal.get();
 8      }
 9       public static void main( String[] args) throws InterruptedException {
10         final ThreadLocalTest test = new ThreadLocalTest();
11         final User user1 = new User( "AAA", 5 );//注意這個user1被聲明成final的了
12         test.set(user1);
13         System.out.println(test.get()); //這里得到的是user1的初始值:AAA,5
14         Thread thread1 = new Thread() {
15           public void run() {
16            test.set(user1);
17            test.get().setName( "BBB");//這里get()得到的是user1,所以會影響外部主線程
18            System.out.println(test.get()); //BBB,5
19            User user2 = new User("CCC" , 5);
20            test.set(user2); //這里thread1的ThreadLocal.userLocal中存儲的值變為user2了,外部主線程中仍然是user1
21            System.out.println(test.get()); // CCC, 5
22            test.get().setName( "DDD");//這里get()得到的是user2,不會影響外部主線程
23            System.out.println(test.get()); //DDD,5
24          };
25        };
26        thread1.start();
27        thread1.join();
28        // 這里得到的值user1,已經在上面設置BBB的時候已經被更新過了
29        // 但是不會受thread中更新CCC和DDD的影響,所以這里得到的是BBB,5
30        System.out.println(test.get());
31      }
32     }

 

得到的結果為:

[AAA,5]

[BBB,5]

[CCC,5]

[DDD,5]

[BBB,5]

 

ThreadLocal的內存泄露問題

在第一次將T類型的變量value set到ThreadLocal時,它是將value set到ThreadLocalMap 中去的,但是需要注意ThreadLocalMap並不是Map接口的子類,它是一個ThreadLocal的內部類,其中的Entry是一種特殊實現:static class Entry extends WeakReference< ThreadLocal> 

對ThreadLocal.ThreadLocalMap.Entry執行set操作時,如果以前這個Entry(key,value)不存在,則會new一個Entry。如果這個Entry已經存在,則直接替換Entry.value的引用為新的value;

下面的分析和圖來自於:http://www.cnblogs.com/onlywujun/p/3524675.html

如下圖,每個thread中都存在一個map, map的類型是ThreadLocal.ThreadLocalMap. Map中的key為一個threadlocal實例. 這個Map的確使用了WeakReference(虛線),不過弱引用只是針對key. 每個key都弱引用指向threadlocal. 當把threadlocal實例置為null以后(或threadLocal實例被GC回收了,弱引用會被回收),沒有任何強引用指向threadlocal實例,所以threadlocal將會被gc回收. 但是,我們的value卻不能回收,因為存在一條從current thread連接過來的強引用. 只有當前thread結束以后, current thread就不會存在棧中,強引用斷開, Current Thread, Map, value將全部被GC回收.

 所以得出一個結論就是只要這個線程對象被gc回收,就不會出現內存泄露,但在threadLocal設為null和線程結束這段時間不會被回收的,就發生了我們認為的內存泄露。其實這是一個對概念理解的不一致,也沒什么好爭論的。最要命的是線程對象不被回收的情況,這就發生了真正意義上的內存泄露。比如使用線程池的時候,線程結束是不會銷毀的,會再次使用的。就可能出現內存泄露。

注意:Java為了最小化減少內存泄露的可能性和影響,在ThreadLocal的get,set的時候都會執行一個for循環,遍歷其中所有的entiry,清除線程Map里所有key為null的value。這也大大減小了出現內存泄露的風險。但最怕的情況就是,threadLocal對象設null了,開始發生“內存泄露”,然后使用線程池,這個線程結束,線程放回線程池中不銷毀,這個線程一直不被使用,或者分配使用了又不再調用get,set方法,那么這個期間就會發生真正的內存泄露。

關於ThreadLocal內存泄露問題的數據,有興趣的可以參考這里:http://liuinsect.iteye.com/blog/1827012

 

5,CountDownLatch和CyclicBarrier


CountDownLatch

CountDownLatch是一種Latch(門閂),它的操作類似於泄洪,或聚會。主要有兩種場景:

1,泄洪:即一個門閂(計數器為1)擋住所有線程,放開后所有線程開始執行。在門閂打開之前,所有線程在池子里等着,等着這個門閂的計數器減少到0,門閂打開之后,所有線程開始同時執行;

這種場景一個典型例子是並發測試器(啟動多個線程去執行測試用例,一聲令下,同步開始執行,即下面的beginLatch):

 1         int threadNum =10; //並發線程數
 2        CountDownLatch beginLatch = new CountDownLatch(1 );// 用於觸發各線程同時開始
 3        CountDownLatch waitLatch = new CountDownLatch(threadNum);// 用於等待各線程執行結束
 4        ExecutorService executor = Executors. newFixedThreadPool(threadNum);
 5        for (int i = 0; i < threadNum; i++) {
 6      Callable<String> thread = new SubTestThread(beginLatch, waitLatch, method, notifier);
 7        executor. submit(thread);
 8       }
 9        beginLatch.countDown(); // 開始執行!
10        waitLatch.await(); // 等待結束
11     private class SubTestThread implements Callable< String> {
12        private CountDownLatch begin;
13        private CountDownLatch wait;
14        private FrameworkMethod method;
15        public SubTestThread(CountDownLatch begin, CountDownLatch wait, FrameworkMethod method) {
16         this.begin = begin;
17         this.wait = wait;
18         this.method = method;
19       }
20        @Override
21        public String call() throws Exception {
22         try {
23          begin.await();
24          runTest(method);
25        } catch (Exception e) {
26       throw e;
27        } finally {
28          wait.countDown();
29        }
30         return null ;
31       }
32      }

 

2,聚會:即N個線程正在執行,一個門閂(計數器為N)擋住了后續操作,每個線程執行完畢后,計數器減1,當門閂計數器減到0時,表示所有線程都執行完畢(所有人到齊,party開始),可以開始執行后續動作了。

這種場景一個典型的例子是記賬匯總,即多個子公司的賬目,都要一一算完之后,才匯總到一起算總賬。上面例子中的waitLatch就是這樣的latch;

 

countDownLatch的真正原理在於latch是一種計數器,它的兩個方法分別是countDown()和await(),其中countDown()是減數1,await()是等待減到0,當每次調用countDown()時,當前latch計數器減1,減到0之前,當前線程的await()會一直卡着(阻塞,WAITING狀態),當計數器減少到0,喚醒當前線程,繼續執行await()后面的代碼;

await(long timeout, TimeUtil unit)是另一個await方法,特點是可以指定wait的時間,

-如果超出指定的等待時間,await()不再等待,返回值為false;

-如果在指定時間內,計數器減到0,則返回值為true;

-如果線程在等待中被中斷或進入方法時已經設置了中斷狀態,則拋出InterruptedException異常。

 

CyclicBarrier是一種回環柵欄,它的作用類似於上面例子中的的waitLatch,即等到多個線程達到同一個點才繼續執行后續操作,如:

 1     public class CyclicBarrierTest {
 2     public static class ComponentThread implements Runnable {
 3      CyclicBarrier barrier;// 計數器
 4      int ID; // 組件標識
 5      int[] array; // 數據數組
 6      // 構造方法
 7      public ComponentThread(CyclicBarrier barrier, int[] array, int ID) {
 8       this.barrier = barrier;
 9       this.ID = ID;
10       this.array = array;
11      }
12      public void run() {
13       try {
14        array[ID] = new Random().nextInt(100);
15        System.out.println("Component " + ID + " generates: " + array[ID]);
16        // 在這里等待Barrier處
17        System.out.println("Component " + ID + " sleep");
18        barrier.await();
19        System.out.println("Component " + ID + " awaked");
20        // 計算數據數組中的當前值和后續值
21        int result = array[ID] + array[ID + 1];
22        System.out.println("Component " + ID + " result: " + result);
23       } catch (Exception ex) {
24       }
25      }
26     }
27     /**
28      * 測試CyclicBarrier的用法
29      */
30     public static void testCyclicBarrier() {
31      final int[] array = new int[3];
32      CyclicBarrier barrier = new CyclicBarrier(2, new Runnable() {
33       // 在所有線程都到達Barrier時執行
34       public void run() {
35        System.out.println("testCyclicBarrier run");
36        array[2] = array[0] + array[1];
37       }
38      });
39      // 啟動線程
40      new Thread(new ComponentThread(barrier, array, 0)).start();
41      new Thread(new ComponentThread(barrier, array, 1)).start();
42     }
43     public static void main(String[] args) {
44      CyclicBarrierTest.testCyclicBarrier();
45     }
46     }

 

可見,cyclicBarrier與countDownLatch的后一種使用方法(聚會)很像,其實兩者能夠達到相同的目的。區別在於,cyclicBarrier可以重復使用,也就是說,當一次cyclicBarrier到達匯總點之后,可以再次開始,每次cyclicbarrier減數到0之后,會觸發匯總任務執行,然后,會把計數器再恢復成原來的值,這也是“回環”的由來。

CountDownLatch的作用是允許1或N個線程等待其他線程完成執行;而CyclicBarrier則是允許N個線程相互等待。

在實現方式上也有所不同,CountDownLatch是直接基於AQS編寫的,他的await和countDown過程,分別是一次acquireShared和releaseShared的過程;而cyclicBarrier是基於鎖、condition來實現的,讓當前線程阻塞,直到“有parties個線程到達barrier” 或 “當前線程被中斷” 或 “超時”這3者之一發生,當前線程才繼續執行。

(CyclicBarrier的原理,參考:Java多線程系列--“JUC鎖”10之 CyclicBarrier原理和示例:http://www.cnblogs.com/skywang12345/p/3533995.html?utm_source=tuicool)


6,信號量


信號量(Semaphore)與鎖類似,鎖是一次允許一次一個線程訪問(readWrite鎖除外),而信號量用來控制一組資源有多個線程訪問,比如一個店鋪最多能接受5個客戶 ,有10個客戶要求訪問的話,那么可以用信號量來控制。

Semaphore可以控同時訪問的線程個數,通過 acquire() 獲取一個許可,如果沒有就等待,而 release() 釋放一個許可。

Semaphore類位於java.util.concurrent包下,它提供了2個構造器:

1     public Semaphore(int permits) {          //參數permits表示許可數目,即同時可以允許多少線程進行訪問
2        sync = new NonfairSync(permits);
3     }
4     public Semaphore(int permits, boolean fair) {    //這個多了一個參數fair表示是否是公平的,即等待時間越久的越先獲取許可
5        sync = (fair)? new FairSync(permits) : new NonfairSync(permits);
6     }

 

下面說一下Semaphore類中比較重要的幾個方法,首先是acquire()、release()方法:

  1. public void acquire() throws InterruptedException {  }     //獲取一個許可
  2. public void acquire(int permits) throws InterruptedException { }    //獲取permits個許可
  3. public void release() { }          //釋放一個許可
  4. public void release(int permits) { }    //釋放permits個許可

acquire()用來獲取一個許可,若無許可能夠獲得,則會一直等待,直到獲得許可。

release()用來釋放許可。注意,在釋放許可之前,必須先獲獲得許可。

這4個方法都會被阻塞,如果想立即得到執行結果,可以使用下面幾個方法:

  1. public boolean tryAcquire() { };    //嘗試獲取一個許可,若獲取成功,則立即返回true,若獲取失敗,則立即返回false
  2. public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { };  //嘗試獲取一個許可,若在指定的時間內獲取成功,則立即返回true,否則則立即返回false
  3. public boolean tryAcquire(int permits) { }; //嘗試獲取permits個許可,若獲取成功,則立即返回true,若獲取失敗,則立即返回false
  4. public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { }; //嘗試獲取permits個許可,若在指定的時間內獲取成功,則立即返回true,否則則立即返回false

另外還可以通過availablePermits()方法得到可用的許可數目。

下面通過一個例子來看一下Semaphore的具體使用:

假若一個工廠有5台機器,但是有8個工人,一台機器同時只

能被一個工人使用,只有使用完了,其他工人才能繼續使用。那么我們就可以通過Semaphore來實現:

 1     public class Test {
 2     public static void main(String[] args) {
 3      int N = 8;   //工人數
 4      Semaphore semaphore = new Semaphore(5); //機器數目
 5      for(int i=0;i<N;i++)
 6       new Worker(i,semaphore).start();
 7     }
 8     static class Worker extends Thread{
 9      private int num;
10      private Semaphore semaphore;
11      public Worker(int num,Semaphore semaphore){
12       this.num = num;
13       this.semaphore = semaphore;
14      }
15      @Override
16      public void run() {
17       try {
18        semaphore.acquire();
19        System.out.println("工人"+this.num+"占用一個機器在生產...");
20        Thread.sleep(2000);
21        System.out.println("工人"+this.num+"釋放出機器");
22        semaphore.release();    
23       } catch (InterruptedException e) {
24        e.printStackTrace();
25       }
26      }
27     }
28     }

 

執行結果:

工人0占用一個機器在生產... 

工人1占用一個機器在生產... 

工人2占用一個機器在生產... 

工人4占用一個機器在生產... 

工人5占用一個機器在生產... 

工人0釋放出機器 工人2釋放出機器 

工人3占用一個機器在生產... 

工人7占用一個機器在生產... 

工人4釋放出機器 工人5釋放出機器 

工人1釋放出機器 工人6占用一個機器在生產... 

工人3釋放出機器 工人7釋放出機器 

工人6釋放出機器

7,Condition


在上一章“Java系列筆記(5)-線程”中,我們曾經說過,線程間通信並不是靠消息,而是靠共享內存,不過,本節要介紹一種更加高效的通信方式:Condition。

Condition 與上一章介紹的線程間通信的wait、notify等方法有相似之處,但也有不同。其相似之處在於,都建立與鎖的基礎上,

wait、notify都是在同步代碼塊中,建立在synchronized所作用的對象上。

而Condition直接作用在Lock對象上,因此建立一個Condition對象,必須通過lock.newCondition()來構造。

  1. Lock lock = new ReentrantLock();  
  2. Condition condition = lock.newCondition();

 在Condition中,用await()替換wait(),用signal()替換notify(),用signalAll()替換notifyAll(),傳統線程的通信方式,Condition都可以實現。

 1     lock.lock();    //synchronized
 2      try {  
 3        while(bool) {                
 4           condition.await();//this.wait();  
 5        }  
 6        System.out.println("this is condition test" );
 7        condition.signal();//this.notify();  
 8     } finally {  
 9        lock.unlock();  
10     }

 

而且,對於wait、notify機制而言,只能作用於當前同步代碼塊,不能建立多重通信條件,而,使用Condition機制,可以建立多重通信條件。

下面的例子,是一個很有意思的並發緩沖區,其中用Condition建立了兩個條件,一個寫條件,一個讀條件,這個例子的具體用法和意義,參考這里:http://blog.csdn.net/ghsau/article/details/7481142

 

 1 class BoundedBuffer {  
 2       final Lock lock = new ReentrantLock();//鎖對象  
 3       final Condition notFull  = lock.newCondition();//寫線程條件  
 4       final Condition notEmpty = lock.newCondition();//讀線程條件  
 5      
 6       final Object[] items = new Object[100];//緩存隊列  
 7       int putptr/*寫索引*/, takeptr/*讀索引*/, count/*隊列中存在的數據個數*/;  
 8      
 9       public void put(Object x) throws InterruptedException {  
10         lock.lock();  
11         try {  
12           while (count == items.length)//如果隊列滿了  
13             notFull.await();//阻塞寫線程  
14           items[putptr] = x;//賦值  
15           if (++putptr == items.length) putptr = 0;//如果寫索引寫到隊列的最后一個位置了,那么置為0  
16           ++count;//個數++  
17           notEmpty.signal();//喚醒讀線程  
18         } finally {  
19           lock.unlock();  
20         }  
21       }  
22      
23       public Object take() throws InterruptedException {  
24         lock.lock();  
25         try {  
26           while (count == 0)//如果隊列為空  
27             notEmpty.await();//阻塞讀線程  
28           Object x = items[takeptr];//取值  
29           if (++takeptr == items.length) takeptr = 0;//如果讀索引讀到隊列的最后一個位置了,那么置為0  
30           --count;//個數--  
31           notFull.signal();//喚醒寫線程  
32           return x;  
33         } finally {  
34           lock.unlock();  
35         }  
36       }  
37     } 

 

可見,用兩個條件,可以靈活的確定應該喚醒寫線程還是讀線程,這就是使用Condition的靈活之處。

8,Exchanger


從JDK1.5開始,Java開始提供一個叫Exchanger的工具套件,這個工具套件,可以真正用於兩個線程之間交換數據。

Exchanger類允許在2個線程間定義同步點,當2個線程到達這個點,他們相互交換數據類型,使用第一個線程的數據類型變成第二個的,然后第二個線程的數據類型變成第一個的。

Exchanger提供的方法非常簡單,其接口就是兩個方法:

 

  1. public V exchange(V x) throws InterruptedException
  2. public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException

從官方的javadoc可以知道,當一個線程到達exchange調用點時,如果它的伙伴線程此前已經調用了此方法,那么它的伙伴會被調度喚醒並與之進行 對象交換,然后各自返回。如果它的伙伴還沒到達交換點,那么當前線程將會被掛起,直至伙伴線程到達——完成交換正常返回;或者當前線程被中斷——拋出中斷 異常;又或者是等候超時——拋出超時異常。

Exchanger現在用的並不多,因為它的場景比較特定,而且,就算是真的有數據交換,用到Exchanger的地方,也可以用其它更加直觀的方式替代,比如用共享變量+鎖同步的方式,因此Exchanger在實際使用中比較少見,有興趣的同學,可以參考這一篇文章:http://lixuanbin.iteye.com/blog/2166772

參考資料


《深入理解Java虛擬機:JVM高級特效與最佳實現》

深入理解Java虛擬機筆記---原子性、可見性、有序性 :http://blog.csdn.net/xtayfjpk/article/details/41969915?utm_source=tuicool

深入淺出 Java Concurrency (1) : J.U.C的整體認識 : http://www.blogjava.net/xylz/archive/2010/06/30/324915.html

JAVA基礎之理解JNI原理:http://www.cnblogs.com/mandroid/archive/2011/06/15/2081093.html

深入淺出 Java Concurrency (7): 鎖機制 part 2 AQS:http://www.blogjava.net/xylz/archive/2010/07/06/325390.html

Java並發編程:volatile關鍵字解析:http://www.cnblogs.com/dolphin0520/p/3920373.html

java中volatile關鍵字的含義:http://www.cnblogs.com/aigongsi/archive/2012/04/01/2429166.html

Java 理論與實踐: 正確使用 Volatile 變量: http://www.ibm.com/developerworks/cn/java/j-jtp06197.html

Java Atomic: http://wsmajunfeng.iteye.com/blog/1520705

聊聊並發(二)——Java SE1.6中的Synchronized:http://www.infoq.com/cn/articles/java-se-16-synchronized

Java並發編程:深入剖析ThreadLocal》http://www.cnblogs.com/dolphin0520/p/3920407.html

ThreadLocal可能引起的內存泄露:http://www.cnblogs.com/onlywujun/p/3524675.html

ThreadLocal內存泄露分析:http://liuinsect.iteye.com/blog/1827012

CountDownLatch的介紹和使用:http://www.itzhai.com/the-introduction-and-use-of-a-countdownlatch.html

Java並發編程:CountDownLatch、CyclicBarrier和Semaphore:http://www.cnblogs.com/dolphin0520/p/3920397.html

CyclicBarrier使用詳解:http://xijunhu.iteye.com/blog/713433

Java系列筆記:http://www.cnblogs.com/skywang12345/p/java_threads_category.html

用信號量解決進程的同步與互斥探討: http://blog.jobbole.com/86709/

Java線程(九):Condition-線程通信更高效的方式:http://blog.csdn.net/ghsau/article/details/7481142

java.util.concurrent.Exchanger應用范例與原理淺析:http://lixuanbin.iteye.com/blog/2166772

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM