上節,我們講了AQS的阻塞與釋放實現原理,線程間通信(Condition)的原理。這次,我們就講講基於AQS實現的ReentrantLock(重入鎖)。
1. 介紹
結合上面的ReentrantLock類圖,ReentrantLock實現了Lock接口,它的內部類Sync繼承自AQS,絕大部分使用AQS的子類需要自定義的方法存在Sync中。而ReentrantLock有公平與非公平的區別,即'是否先阻塞就先獲取資源',它的主要實現就是FairSync與NonfairSync,后面會從源碼角度看看它們的區別。
2. 源碼剖析
Sync是ReentrantLock控制同步的基礎。它的子類分為了公平與非公平。使用AQS的state代表獲取鎖的數量
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
...
}
我們可以看出內部類Sync是一個抽象類,繼承它的子類(FairSync與NonfairSync)需要實現抽象方法lock。
下面我們先從非公平鎖的角度來看看獲取資源與釋放資源的原理
故事就從就兩個變量開始:
// 獲取一個非公平的獨占鎖
/**
* public ReentrantLock() {
* sync = new ReentrantLock.NonfairSync();
* }
*/
private Lock lock = new ReentrantLock();
// 獲取條件變量
private Condition condition = lock.newCondition();
2.1 上鎖(獲取資源)
lock.lock()
public void lock() {
sync.lock();
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
// 獲取資源
final void lock() {
// 若此時沒有線程獲取到資源,直接設置當前線程獨占訪問資源。
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// AQS的方法
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
// 實現在父類Sync中
return nonfairTryAcquire(acquires);
}
}
AQS的acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
上一節已經剖析過AQS的acquire的整個流程了,就差子類如何去實現tryAcquire了。
// Sync實現的非公平的tryAcquire
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 此時若沒有線程獲取到資源,當前線程就直接占用該資源
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 若當前線程已經占用了該資源,可以再次獲取該資源 ->這個行為就是可重入鎖的支撐
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
嘗試獲取資源的過程是非常簡單的,這里再貼一下acquire的流程圖
2.2 釋放資源
lock.unlock();
public void unlock() {
// AQS的方法
sync.release(1);
}
AQS的release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
release的流程已經剖析過了,接下來看看tryRelease的實現
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 這里可以看出若沒有持有鎖,就釋放資源,就會報錯
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
tryRelease的實現也很簡單,這里再貼一下release的流程圖
2.3 公平鎖與非公平鎖的區別
公平鎖與非公平鎖,即'是否先阻塞就先獲取資源', ReentrantLock中公平與否的控制就在tryAcquire中。下面我們看看,公平鎖的tryAcquire
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// (2.3.1)
// sync queue中是否存在前驅結點
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
區別在代碼(2.3.1)
hasQueuedPredecessors
判斷當前線程的前面有無其他線程排隊;若當前線程在隊列頭部或者隊列為空返回false
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
結合下面的入隊代碼(enq), 我們分析hasQueuedPredecessors為true的情況:
- h != t ,表示此時queue不為空; (s = h.next) == null, 表示另一個結點已經運行了下面的步驟(2),還沒來得及運行步驟(3)。簡言之,就是B線程想要獲取鎖的同時,A線程獲取鎖失敗剛好在入隊(B入隊的同時,之前占有的資源的線程,剛好釋放資源)
- h != t 且 (s = h.next) != null,表示此時至少有一個結點在sync queue中;s.thread != Thread.currentThread(),這個情況比較復雜,設想一下有這三個結點 A -> B C, A此時獲取到資源,而B此時因為獲取資源失敗正在sync queue阻塞,C還沒有獲取資源(還沒有執行tryAcquire)。
時刻一:A釋放資源成功后(執行tryRelease成功),B此時還沒有成功獲取資源(C執行s = h.next時,B還在sync queue中且是老二)
時刻二: C此時執行hasQueuedPredecessors,s.thread != Thread.currentThread()成立,此時s.thread表示的是B
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node())) // (1) 第一次初始化
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) { // (2) 設置queue的tail
t.next = node; // (3)
return t;
}
}
}
}
Note that 1. because cancellations due to interrupts and timeouts may occur at any time, a true return does not guarantee that some other thread will acquire before the current thread(虛假true). 2. Likewise, it is possible for another thread to win a race to enqueue after this method has returned false, due to the queue being empty(虛假false).
這位大佬對hasQueuedPredecessors進行詳細的分析,他文中解釋了虛假true以及虛假false。我這里簡單解釋一下:
- 虛假true, 當兩個線程都執行tryAcquire,都執行到hasQueuedPredecessors,都返回true,但是只有一個線程執行compareAndSetState(0, acquires)成功
- 虛假false,當一個線程A執行doAcquireInterruptibly,發生了中斷,還沒有清除掉該結點時;此時,線程B執行hasQueuedPredecessors時,返回true
3. 總結
本文介紹了利用AQS實現的鎖ReetrantLock
- 講解了tryRelease與tryAcquire的實現原理
- 說了說鎖的公平與否的實現,是否在意當前線程是否有其他線程排隊
- 分析了一下hasQueuedPredecessors的幾種情況