在Java中通常實現鎖有兩種方式,一種是synchronized關鍵字,另一種是Lock。二者其實並沒有什么必然聯系,但是各有各的特點,在使用中可以進行取舍的使用。首先我們先對比下兩者。
實現:#####
首先最大的不同:synchronized是基於JVM層面實現的,而Lock是基於JDK層面實現的。曾經反復的找過synchronized的實現,可惜最終無果。但Lock卻是基於JDK實現的,我們可以通過閱讀JDK的源碼來理解Lock的實現。
使用:#####
對於使用者的直觀體驗上Lock是比較復雜的,需要lock和realse,如果忘記釋放鎖就會產生死鎖的問題,所以,通常需要在finally中進行鎖的釋放。但是synchronized的使用十分簡單,只需要對自己的方法或者關注的同步對象或類使用synchronized關鍵字即可。但是對於鎖的粒度控制比較粗,同時對於實現一些鎖的狀態的轉移比較困難。例如:
特點:#####
| tips | synchronized | Lock |
|---|---|---|
| 鎖獲取超時 | 不支持 | 支持 |
| 獲取鎖響應中斷 | 不支持 | 支持 |
優化:#####
在JDK1.5之后synchronized引入了偏向鎖,輕量級鎖和重量級鎖,從而大大的提高了synchronized的性能,同時對於synchronized的優化也在繼續進行。期待有一天能更簡單的使用java的鎖。
在以前不了解Lock的時候,感覺Lock使用實在是太復雜,但是了解了它的實現之后就被深深吸引了。
Lock的實現主要有ReentrantLock、ReadLock和WriteLock,后兩者接觸的不多,所以簡單分析一下ReentrantLock的實現和運行機制。
ReentrantLock類在java.util.concurrent.locks包中,它的上一級的包java.util.concurrent主要是常用的並發控制類.

下面是ReentrantLock的UML圖,從圖中可以看出,ReentrantLock實現Lock接口,在ReentrantLock中引用了AbstractQueuedSynchronizer的子類,所有的同步操作都是依靠AbstractQueuedSynchronizer(隊列同步器)實現。

研究一個類,需要從一個類的靜態域,靜態類,靜態方法和成員變量開始。
private static final long serialVersionUID = 7373984872572414699L;
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
/**
* Base of synchronization control for this lock. Subclassed
* into fair and nonfair versions below. Uses AQS state to
* represent the number of holds on the lock.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
/**
* Performs non-fair tryLock. tryAcquire is
* implemented in subclasses, but both need nonfair
* try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
// Methods relayed from outer class
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
/**
* Reconstitutes this lock instance from a stream.
* @param s the stream
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
從上面的代碼可以看出來首先ReentrantLock是可序列化的,其次是ReentrantLock里有一個對AbstractQueuedSynchronizer的引用。
看完了成員變量和靜態域,我們需要了解下構造方法:
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
從上面代碼可以看出,ReentrantLock支持兩種鎖模式,公平鎖和非公平鎖。默認的實現是非公平的。公平和非公平鎖的實現如下:
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
AbstractQueuedSynchronizer 是一個抽象類,所以在使用這個同步器的時候,需要通過自己實現預期的邏輯,Sync、FairSync和NonfairSync都是ReentrantLock為了實現自己的需求而實現的內部類,之所以做成內部類,我認為是只在ReentrantLock使用上述幾個類,在外部沒有使用到。
我們着重關注默認的非公平鎖的實現:
在ReentrantLock調用lock()的時候,調用的是下面的代碼:
/**
* Acquires the lock.
*
* <p>Acquires the lock if it is not held by another thread and returns
* immediately, setting the lock hold count to one.
*
* <p>If the current thread already holds the lock then the hold
* count is incremented by one and the method returns immediately.
*
* <p>If the lock is held by another thread then the
* current thread becomes disabled for thread scheduling
* purposes and lies dormant until the lock has been acquired,
* at which time the lock hold count is set to one.
*/
public void lock() {
sync.lock();
}
sync的實現是NonfairSync,所以調用的是NonfairSync的lock方法:
/**
* Sync object for non-fair locks
* tips:調用Lock的時候,嘗試獲取鎖,這里采用的CAS去嘗試獲取鎖,如果獲取鎖成功
* 那么,當前線程獲取到鎖,如果失敗,調用acquire處理。
*
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
接下來看看compareAndSetState方法是怎么進行鎖的獲取操作的:
/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a <tt>volatile</tt> read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return true if successful. False return indicates that the actual
* value was not equal to the expected value.
*
* tips: 1.compareAndSetState的實現主要是通過Unsafe類實現的。
* 2.之所以命名為Unsafe,是因為這個類對於JVM來說是不安全的,我們平時也是使用不了這個類的。
* 3.Unsafe類內封裝了一些可以直接操作指定內存位置的接口,是不是感覺和C有點像了?
* 4.Unsafe類封裝了CAS操作,來達到樂觀的鎖的爭搶的效果
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
主要的說明都在方法的注釋中,接下來簡單的看一下 compareAndSwapInt的實現:
/**
* Atomically update Java variable to <tt>x</tt> if it is currently
* holding <tt>expected</tt>.
* @return <tt>true</tt> if successful
*/
public final native boolean compareAndSwapInt(Object o, long offset,
int expected,
int x);
一個native方法,沮喪.....但是從注釋看意思是,以CAS的方式將制定字段設置為指定的值。同時我們也明白了這個方法可能是用java實現不了,只能依賴JVm底層的C代碼實現。下面看看操作的stateOffset:
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
//這個方法很有意思,主要的意思是獲取AbstractQueuedSynchronizer的state成員的偏移量
//通過這個偏移量來更新state成員,另外state是volatile的來保證可見性。
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
stateOffset 是AbstractQueuedSynchronizer內部定義的一個狀態量,AbstractQueuedSynchronizer是線程的競態條件,所以只要某一個線程CAS改變狀態成功,同時在沒有釋放的情況下,其他線程必然失敗(對於Unsafe類還不是很熟悉,后面還需要系統的學習)。
對於競爭成功的線程會調用 setExclusiveOwnerThread方法:
/**
* The current owner of exclusive mode synchronization.
*/
private transient Thread exclusiveOwnerThread;
/**
* Sets the thread that currently owns exclusive access. A
* <tt>null</tt> argument indicates that no thread owns access.
* This method does not otherwise impose any synchronization or
* <tt>volatile</tt> field accesses.
*/
protected final void setExclusiveOwnerThread(Thread t) {
exclusiveOwnerThread = t;
}
這個實現是比較簡單的,只是獲取當前線程的引用,令AbstractOwnableSynchronizer中的exclusiveOwnerThread引用到當前線程。競爭失敗的線程,會調用acquire方法,這個方法也是ReentrantLock設計的精華之處:
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
* tips:此處主要是處理沒有獲取到鎖的線程
* tryAcquire:重新進行一次鎖獲取和進行鎖重入的處理。
* addWaiter:將線程添加到等待隊列中。
* acquireQueued:自旋獲取鎖。
* selfInterrupt:中斷線程。
* 三個條件的關系為and,如果 acquireQueued返回true,那么線程被中斷selfInterrupt會中斷線程
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
AbstractQueuedSynchronizer為抽象方法,調用tryAcquire時,調用的為NonfairSync的tryAcquire。
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
/**
* Performs non-fair tryLock. tryAcquire is
* implemented in subclasses, but both need nonfair
* try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
nonfairTryAcquire方法主要是做重入鎖的實現,synchronized本身支持鎖的重入,而ReentrantLock則是通過此處實現。在鎖狀態為0時,重新嘗試獲取鎖。如果已經被占用,那么做一次是否當前線程為占用鎖的線程的判斷,如果是一樣的那么進行計數,當然在鎖的relase過程中會進行遞減,保證鎖的正常釋放。
如果沒有重新獲取到鎖或者鎖的占用線程和當前線程是一個線程,方法返回false。那么把線程添加到等待隊列中,調用addWaiter:
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
這里主要是用當前線程構建一個Node的等待隊列雙向鏈表,這里addWaiter中和enq中的部分邏輯是重復的,個人感覺可能是如果能一次成功就避免了enq中的死循環。因為tail節點是volatile的同時node也是不會發生競爭的所以node.prev = pred;是安全的。但是tail的next是不斷競爭的,所以利用compareAndSetTail保證操作的串行化。接下來調用acquireQueued方法:
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
此處是做Node節點線程的自旋過程,自旋過程主要檢查當前節點是不是head節點的next節點,如果是,則嘗試獲取鎖,如果獲取成功,那么釋放當前節點,同時返回。至此一個非公平鎖的鎖獲取過程結束。
如果這里一直不斷的循環檢查,其實是很耗費性能的,JDK的實現肯定不會這么“弱智”,所以有了shouldParkAfterFailedAcquire和parkAndCheckInterrupt,這兩個方法就實現了線程的等待從而避免無限的輪詢:
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
首先,檢查一下當前Node的前置節點pred是否是SIGNAL,如果是SIGNAL,那么證明前置Node的線程已經Park了,如果waitStatus>0,那么當前節點已經Concel或者中斷。那么不斷調整當前節點的前置節點,將已經Concel的和已經中斷的線程移除隊列。如果waitStatus<0,那么設置waitStatus為SIGNAL,因為調用shouldParkAfterFailedAcquire的方法為死循環調用,所以終將返回true。接下來看parkAndCheckInterrupt方法,當shouldParkAfterFailedAcquire返回True的時候執行parkAndCheckInterrupt方法:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
此方法比較簡單,其實就是使當前的線程park,即暫停了線程的輪詢。當Unlock時會做后續節點的Unpark喚醒線程繼續爭搶鎖。
接下來看一下鎖的釋放過程,鎖釋放主要是通過unlock方法實現:
/**
* Attempts to release this lock.
*
* <p>If the current thread is the holder of this lock then the hold
* count is decremented. If the hold count is now zero then the lock
* is released. If the current thread is not the holder of this
* lock then {@link IllegalMonitorStateException} is thrown.
*
* @throws IllegalMonitorStateException if the current thread does not
* hold this lock
*/
public void unlock() {
sync.release(1);
}
主要是調用AbstractQueuedSynchronizer同步器的release方法:
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease方法為ReentrantLock中的Sync的tryRelease方法:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
tryRelease方法主要是做了一個釋放鎖的過程,將同步狀態state -1,直到減到0為止,這主要是兼容重入鎖設計的,同時setExclusiveOwnerThread(null)清除當前占用的線程。這些head節點后的線程和新進的線程就可以開始爭搶。這里需要注意的是對於同步隊列中的線程來說在setState(c),且c為0的時候,同步隊列中的線程是沒有競爭鎖的,因為線程被park了還沒有喚醒。但是此時對於新進入的線程是有機會獲取到鎖的。
下面代碼是進行線程的喚醒:
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
因為在setState(c)釋放了鎖之后,是沒有線程競爭的,所以head是當前的head節點,先檢查當前的Node是否合法,如果合法則unpark it。開始鎖的獲取。就回到了上面的for循環執行獲取鎖邏輯:

至此鎖的釋放就結束了,可以看到ReentrantLock是一個不斷的循環的狀態模型,里面有很多東西值得我們學習和思考。
ReentrantLock具有公平和非公平兩種模式,也各有優缺點:
公平鎖是嚴格的以FIFO的方式進行鎖的競爭,但是非公平鎖是無序的鎖競爭,剛釋放鎖的線程很大程度上能比較快的獲取到鎖,隊列中的線程只能等待,所以非公平鎖可能會有“飢餓”的問題。但是重復的鎖獲取能減小線程之間的切換,而公平鎖則是嚴格的線程切換,這樣對操作系統的影響是比較大的,所以非公平鎖的吞吐量是大於公平鎖的,這也是為什么JDK將非公平鎖作為默認的實現。
最后:#####
關於並發和Lock還有很多的點還是比較模糊,我也會繼續學習,繼續總結,如果文章中有什么問題,還請各位看客及時指出,共同學習。
