下面看下JUC包下的一大並發神器ReentrantLock,是一個可重入的互斥鎖,具有比synchronized更為強大的功能。
ReentrantLock基本用法
先來看一下ReentrantLock的簡單用法
public class MyDomain1 {
private Lock lock = new ReentrantLock();
public void method1() {
System.out.println("進入method1方法");
try {
lock.lock();
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + " i=" + i);
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class Mythread1_1 extends Thread {
private MyDomain1 myDomain1;
public Mythread1_1(MyDomain1 myDomain1) {
this.myDomain1 = myDomain1;
}
@Override
public void run() {
myDomain1.method1();
}
}
開啟三個線程同時執行測試方法
@Test
public void test1() throws InterruptedException {
MyDomain1 myDomain1 = new MyDomain1();
Mythread1_1 a = new Mythread1_1(myDomain1);
Mythread1_1 c = new Mythread1_1(myDomain1);
Mythread1_1 d = new Mythread1_1(myDomain1);
a.start();
c.start();
d.start();
a.join();
c.join();
d.join();
}
執行結果:
進入method1方法 Thread-0 i=0 進入method1方法 進入method1方法 Thread-0 i=1 Thread-0 i=2 Thread-0 i=3 Thread-0 i=4 Thread-1 i=0 Thread-1 i=1 Thread-1 i=2 Thread-1 i=3 Thread-1 i=4 Thread-2 i=0 Thread-2 i=1 Thread-2 i=2 Thread-2 i=3 Thread-2 i=4
可以看到,代碼流程進入到lock.lock()以后沒有任何的交替打印,都是一個線程執行完后一個線程才開始執行,說明ReentrantLock具有加鎖的功能。
看下ReentrantLock源碼的構造方法:
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
可以看到ReentrantLock支持兩種加鎖模式:公平鎖和非公平鎖。它是如何實現的呢?繼續往下看
我們測試用例中,默認使用的是非公平鎖的加鎖方法,看下 NonfairSync 的lock() 方法
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
第12行的 compareAndSetState方法,當第一個線程執行次方法時,會將 state 設置為1,執行成功后,exclusiveOwnerThread=線程1。
此時線程1正常執行業務,當線程2走到lock方法時,此時線程12執行compareAndSetState方法將返回false,執行 acquire(1)
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
非公平鎖實現的tryAcquire
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
此時線程2得到的state應該是1,並且 current != getExclusiveOwnerThread(),所以線程2會繼續執行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)。
注意第8行到第13行,如果此時線程1已經釋放了鎖,那么線程2得到的state就是0了,它將走獲取鎖的邏輯,
第14行到第20行,這塊就是ReentrantLock支持可重入的實現,也就是如果當前執行的線程是持有鎖的線程,那么就可以獲取鎖,並將state+1。
如果線程1此時還沒有釋放鎖,那么線程2將走到等待隊列里
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
這個for循環對於線程2來說,首先再次嘗試去獲取鎖,因為此時線程1可能已經釋放鎖了,如果依舊獲取鎖失敗,則執行
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
這塊代碼,最好打個斷點一步一步去執行,更容易看出每一步執行的邏輯以及值。
這個ws是節點predecessor的waitStatus,很明顯是0,所以此時把pred的waitStatus設置為Noed.SIGNAL即-1並返回false。
既然返回了false,上面的if自然不成立,再走一次for循環,還是先嘗試獲取鎖,不成功,繼續走shouldParkAfterFailedAcquire,此時waitStatus為-1,小於0,走第三行的判斷,返回true。
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
最后一步,線程2調用LockSupport的park方法。
接下來就到線程1執行完任務后,將執行unlock方法 釋放鎖
public void unlock() {
sync.release(1);
}
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
首先tryRelease(1) ,代碼邏輯比較簡單,就是將state設置0 (注意這是同一個鎖只lock一次的情況下),並將 exclusiveOwnerThread設置為null
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
當鎖釋放完成后,繼續執行release方法的 unparkSuccessor(h),
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
h的下一個Node,這個Node里面的線程就是線程2,由於這個Node不等於null,線程2最終被unpark了,線程2可以繼續運行。
有一個很重要的問題是:鎖被解了怎樣保證整個FIFO隊列減少一個Node呢?
還記得線程2被park在 acquireQueued方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
被阻塞的線程2是被阻塞在第14行,注意這里並沒有return語句,阻塞完成線程2繼續進行for循環。線程2所在的Node的前驅Node是p,線程2嘗試tryAcquire,成功,
然后線程2就成為了head節點了,把p的next設置為null,這樣原頭Node里面的所有對象都不指向任何塊內存空間,h屬於棧內存的內容,方法結束被自動回收,
這樣隨着方法的調用完畢,原頭Node也沒有任何的引用指向它了,這樣它就被GC自動回收了。此時,遇到一個return語句,acquireQueued方法結束,后面的Node也是一樣的原理。
至此線程2 lock方法執行完成,並成功獲取到鎖。
至此ReentrantLock的非公平鎖的加鎖與鎖釋放邏輯已經大致清楚了,那么公平鎖的加鎖過程又是如何呢?
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
1:tryAcquire(1),因為是第一個線程,所以當前status=0,嘗試獲取鎖,hasQueuedPredecessors方法也是和非公平鎖一個代碼上的區別
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
公平鎖獲取鎖之前首先判斷當前隊列是否存在(head==tail)[不存在],設置staus=1,獲取鎖成功。
如果等待隊列中存在等待線程,則取出第一個等待的線程(head.next),並返回第一個等待的線程是否是當前線程,
只有當等到隊列的第一個等待的線程是當前線程嘗試獲取鎖的線程,才會獲取鎖成功。
假如此時線程t2,也來獲取鎖,調用tryAcquire(1)時,因為status!=0,返回fasle,調用addWaiter(Node.EXCLUSIVE),
此時會生成一個隊列,隊列的head為 new Node(), tail為t2的Node,調用acquireQueued(t2的Node),因為此時t2所在Node的prev為head,所以會嘗試直接獲取一次鎖,
如果獲取成功,將t2的Node設置為head,如果沒有獲取鎖,shouldParkAfterFailedAcquire(),t2 Park()。
ReentrantLock持有的鎖
定義一個對象,分別有兩個測試方法,一個用ReentrantLock加鎖,一個用synchronized加鎖
public class MyDomain1 {
private Lock lock = new ReentrantLock();
public void method1() {
System.out.println("進入method1方法");
try {
lock.lock();
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + " i=" + i);
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
// 為了測試 lock 和 synchronized同步方法不是同一把鎖
public synchronized void method2() {
System.out.println("進入method2方法");
for (int j = 0; j < 5; j++) {
System.out.println(Thread.currentThread().getName() + " j=" + j);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
定義兩個線程類,分別調用一個方法
public class Mythread1_1 extends Thread {
private MyDomain1 myDomain1;
public Mythread1_1(MyDomain1 myDomain1) {
this.myDomain1 = myDomain1;
}
@Override
public void run() {
myDomain1.method1();
}
}
public class Mythread1_2 extends Thread {
private MyDomain1 myDomain1;
public Mythread1_2(MyDomain1 myDomain1) {
this.myDomain1 = myDomain1;
}
@Override
public void run() {
myDomain1.method2();
}
}
@Test
public void test1() throws InterruptedException {
MyDomain1 myDomain1 = new MyDomain1();
Mythread1_1 a = new Mythread1_1(myDomain1);
Mythread1_2 b = new Mythread1_2(myDomain1);
a.start();
b.start();
a.join();
b.join();
}
執行結果:
進入method1方法 Thread-0 i=0 進入method2方法 Thread-1 j=0 Thread-0 i=1 Thread-1 j=1 Thread-1 j=2 Thread-0 i=2 Thread-0 i=3 Thread-1 j=3 Thread-0 i=4 Thread-1 j=4
可以看到兩個線路交替打印,說明 ReentrantLock 和 synchronized同步方法不是同一把鎖
Condition
ReentrantLock實現等待/通知模型,這也是比synchronized更為強大的功能點之一。
1、一個ReentrantLock里面可以創建多個Condition實例,實現多路通知
2、notify()方法進行通知時,被通知的線程時Java虛擬機隨機選擇的,但是ReentrantLock結合Condition可以實現有選擇性地通知
3、await()和signal()之前,必須要先lock()獲得鎖,使用完畢在finally中unlock()釋放鎖,這和wait()、notify()/notifyAll()使用前必須先獲得對象鎖是一樣的
先看個示例
定義一個對象並new了兩個condition,然后分別執行await方法,再定義一個signal方法,只喚醒其中一個condition
public class MyDomain2 {
private Lock lock = new ReentrantLock();
private Condition conditionA = lock.newCondition();
private Condition conditionB = lock.newCondition();
public void await() {
System.out.println("進入await方法");
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + " conditionA await " + System.currentTimeMillis());
conditionA.await();
System.out.println(Thread.currentThread().getName() + " conditionA await out " + System.currentTimeMillis());
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void await2() {
System.out.println("進入await2方法");
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + " conditionB await " + System.currentTimeMillis());
conditionB.await();
System.out.println(Thread.currentThread().getName() + " conditionB await out " + System.currentTimeMillis());
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void signal() {
System.out.println("進入signal方法");
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + " conditionA signal " + System.currentTimeMillis());
conditionA.signal();
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName() + " conditionA signal " + System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
一個線程執行await方法,一個線程負責執行signal
public class Mythread2_1 extends Thread {
private MyDomain2 myDomain2;
public Mythread2_1(MyDomain2 myDomain2) {
this.myDomain2 = myDomain2;
}
@Override
public void run() {
myDomain2.await();
}
}
public class Mythread2_2 extends Thread {
private MyDomain2 myDomain2;
public Mythread2_2(MyDomain2 myDomain2) {
this.myDomain2 = myDomain2;
}
@Override
public void run() {
myDomain2.signal();
}
}
測試方法
@Test
public void test2() throws InterruptedException {
MyDomain2 myDomain2 = new MyDomain2();
Mythread2_1 a = new Mythread2_1(myDomain2);
Mythread2_2 b = new Mythread2_2(myDomain2);
a.start();
Thread.sleep(5000);
b.start();
a.join();
b.join();
}
執行結果:
進入await方法 Thread-0 conditionA await 1639549418811 進入signal方法 Thread-1 conditionA signal 1639549423817 Thread-1 conditionA signal 1639549426820 Thread-0 conditionA await out 1639549426820
可以看到進入await方法后,線程1 park住了,5秒鍾后,待signal執行完成后,線程1才開始繼續執行。
同時condition還有signalAll方法,可以喚醒同一個condition所有在等待的線程。
看過 ReentrantLock源碼的應該注意到 AbstractQueuedSynchronizer, 它也是JUC包實現的核心抽象同步器,
也是CountDownLatch、Semphore等並發類的核心組件,這個我們后續再繼續研究。
參考文獻
1:《Java並發編程的藝術》
2:《Java多線程編程核心技術》
