線程同步是指兩個並發執行的線程在同一時間不同時執行某一部分的程序。同步問題在生活中也很常見,就比如在麥當勞點餐,假設只有一個服務員能夠提供點餐服務。每個服務員在同一時刻只能接待一個顧客的點餐,那么除了正在接待的顧客,其他人只能等待排隊。當一個點餐服務完成之后,其他顧客就可以上去進行點餐。
從這個例子中可以看到如下幾個關注點:
- 點餐服務為臨界區域(critical area),其可同時進行的數量,即為有多少人可進入臨界區域。
- 排隊即為對目前暫時無法取得點餐服務的人的一種處理方式。這種處理方式的特性有公平性(按次序),效率性(接手最快為最好)等。
- 顧客進行排隊和從隊伍中叫一個顧客來進行服務即為睡眠(park)和喚醒(unpark)機制。
並發中線程同步是重點需關注的問題,線程同步自然也有一定的模式,DougLea就寫出了一個簡單的框架AQS用來支持一大類線程同步工具,如ReentrantLock,CountdownLatch,Semphaore等。
AQS是concurrent包中的一系列同步工具的基礎實現,其提供了狀態位,線程阻塞-喚醒方法,CAS操作。基本原理就是根據狀態位來控制線程的入隊阻塞、出隊喚醒來解決同步問題。
入隊:
出隊:
二、代碼分析
下面以ReentrantLock來說明AQS的組成構件的工作情況:
在ReentrantLock中封裝了一個同步器Sync,繼承了AbstractQueuedSynchronizer,根據對臨界區的訪問的公平性要求不同,又分為NonfairSync和FairSync。為了簡化起見,就取最簡單的NonFairSync作為例子來說明:
1. 對於臨界區的控制:
java.util.concurrent.locks.ReentrantLock.NonfairSync
1: final void lock() {
2:
3: if (compareAndSetState(0, 1))
4:
5: setExclusiveOwnerThread(Thread.currentThread());
6:
7: else
8:
9: acquire(1);
10:
11: }
12:
從以上代碼可以看出,其主要目的是采用cas比較臨界區的狀態。
1.1. 如果為0,將其設置為1,並記錄當前線程(當前線程可進入臨界區);
1.2. 如果為1,嘗試獲取臨界區控制
java.util.concurrent.locks.AbstractQueuedSynchronizer
1: public final void acquire(int arg) {
2:
3: if (!tryAcquire(arg) &&
4:
5: acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
6:
7: selfInterrupt();
8:
9: }
10:
1.2.1. NonFairLock的tryAcquire實現為:
1: final boolean nonfairTryAcquire(int acquires) {
2:
3: final Thread current = Thread.currentThread();
4:
5: int c = getState();
6:
7: if (c == 0) {
8:
9: if (compareAndSetState(0, acquires)) {
10:
11: setExclusiveOwnerThread(current);
12:
13: return true;
14:
15: }
16:
17: }
18:
19: else if (current == getExclusiveOwnerThread()) {
20:
21: int nextc = c + acquires;
22:
23: if (nextc < 0) // overflow
24:
25: throw new Error("Maximum lock count exceeded");
26:
27: setState(nextc);
28:
29: return true;
30:
31: }
32:
33: return false;
34:
35: }
36:
上述代碼主要是針對大部分線程進入臨界區工作時間不會很長而進行的性能優化,第一次嘗試失敗了,極有可能過一會兒鎖就釋放了,因此重新去嘗試獲取鎖。
1.2.2. 以下這段代碼是鎖的精華部分
java.util.concurrent.locks.AbstractQueuedSynchronizer
1: final boolean acquireQueued(final Node node, int arg) {
2:
3: try {
4:
5: boolean interrupted = false;
6:
7: for (;;) {
8:
9: final Node p = node.predecessor();
10:
11: if (p == head && tryAcquire(arg)) {
12:
13: setHead(node);
14:
15: p.next = null; // help GC
16:
17: return interrupted;
18:
19: }
20:
21: if (shouldParkAfterFailedAcquire(p, node) &&
22:
23: parkAndCheckInterrupt())
24:
25: interrupted = true;
26:
27: }
28:
29: } catch (RuntimeException ex) {
30:
31: cancelAcquire(node);
32:
33: throw ex;
34:
35: }
36:
37: }
38:
在無限循環中完成了對線程的阻塞和喚醒。阻塞在parkAndCheckInterrupt()喚醒后從此處進行釋放。
算法過程:
- 從加入隊列的node開始反向查找,將前一個元素賦值給p;
- 如果p是head,那么試着再獲得一次鎖tryAcquire(arg),成功則將head指針往后移動,並跳出循環;
- 如果上一步驟嘗試失敗,那么進行測試是否要park ,如果狀態為0,將其標記為SIGNAL,並返回false;
- 再重復檢查一次,發現其頭部的waitStatus為-1.Node.signal。確認需要park successor; 進行parkAndCheckInterrupt()將當前線程阻塞。
2. 對於臨界區的釋放
2.1. java.util.concurrent.locks.AbstractQueuedSynchronizer
1: public final boolean release(int arg) {
2:
3: if (tryRelease(arg)) {
4:
5: Node h = head;
6:
7: if (h != null && h.waitStatus != 0)
8:
9: unparkSuccessor(h);
10:
11: return true;
12:
13: }
14:
15: return false;
16:
17: }
18:
2.1.1. java.util.concurrent.locks.ReentrantLock.Sync
1: protected final boolean tryRelease(int releases) {
2:
3: int c = getState() - releases;
4:
5: if (Thread.currentThread() != getExclusiveOwnerThread())
6:
7: throw new IllegalMonitorStateException();
8:
9: boolean free = false;
10:
11: if (c == 0) {
12:
13: free = true;
14:
15: setExclusiveOwnerThread(null);
16:
17: }
18:
19: setState(c);
20:
21: return free;
22:
23: }
24:
將state進行變化-releases,檢查當前線程是否是拿住鎖的線程,否則擲出異常.如果為0,將持有鎖線程標記為null。
從ReentrantLock例子可以看出AQS的工作原理,更為精妙的是,在這幾個基本機制作用下衍生了許多種並發工具,以后的介紹中可以看到。
參考文獻:
The java.util.concurrent Synchronizer Framework----Doug Lea