使用ReentrantLock可以替代內置鎖,當使用內置鎖的時候,我們可以使用wait() nitify()和notifyAll()來控制線程之間的協作,那么,當我們使用ReentrantLock的時候,我們怎么來處理線程之間的寫作呢?
JDK5.0為我們提供了Condition對象來替代內置鎖的 wait(),notify()和notifyAll()方法

內置鎖的話,就只能有一個等待隊列,所有的在某個對象上執行wait()方法的線程都會被加入到該對象的等待隊列中去(線程會被掛起),需要其他的線程在同一個對象上調用notify()或者是notifyAll()方法來喚醒等待隊列中的線程
而使用Condition的話,可以使用不同的等待隊列,只需要使用lock.newCondition()即可定義一個Condition對象,每一個Condition對象上都會有一個等待隊列(底層使用AQS),調用某個Condition對象的await()方法,就可以把當前線程加入到這個Condition對象的等待隊列上
其他的線程調用同一個Condition對象的sinal()或者是signalAll()方法則會喚醒等待隊列上的線程,使其能夠繼續執行
我們以一個現實中的例子來說明若何使用ReentrantLock和Condition如何替代synchronized和wait(),notify(),notifyAll():
我們模擬兩個線程,一個線程執行登錄操作,該登錄操作會阻塞,然后等待另外一個線程將其喚醒(類似掃描登錄的場景,頁面會阻塞,等待掃碼和確認,然后頁面才會跳轉)
首先是使用內置鎖的例子:
package com.jiaoyiping.baseproject.condition;
import java.io.Serializable;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
/**
* Created with Intellij IDEA
*
* @author: jiaoyiping
* Mail: jiaoyiping@gmail.com
* Date: 2019/04/12
* Time: 15:29
* To change this template use File | Settings | Editor | File and Code Templates
*/
//使用內置鎖來實現的等待/通知模型
public class LoginServiceUseInnerLock {
private ConcurrentHashMap<String, Result> loginMap = new ConcurrentHashMap<>();
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(2);
LoginServiceUseInnerLock loginService = new LoginServiceUseInnerLock();
String uuid = UUID.randomUUID().toString();
System.out.println("[" + Thread.currentThread().getName() + "] 使用的UUID是: " + uuid);
new Thread(() -> {
loginService.login(uuid, 20_000);
countDownLatch.countDown();
}, "登錄線程").start();
Thread.sleep(2_000);
new Thread(() -> {
loginService.confirm(uuid);
countDownLatch.countDown();
}, "確認線程").start();
countDownLatch.await();
System.out.println("[" + Thread.currentThread().getName() + "] 兩個線程都執行完畢了");
}
public void login(String code, int timeout) {
Result result = new Result();
result.setMessage("超時");
loginMap.put(code, result);
synchronized (result) {
try {
//超時的話,會自動返回,程序繼續
System.out.println("[" + Thread.currentThread().getName() + "] 登錄線程掛起");
result.wait(timeout);
System.out.println("[" + Thread.currentThread().getName() + "] 登錄線程繼續執行,得到的結果是:" + result.getMessage());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
loginMap.remove(code);
}
}
}
public void confirm(String code) {
assert code != null;
Result result = loginMap.get(code);
if (result == null) {
System.out.println("[" + Thread.currentThread().getName() + "] 請求不存在或者已經過期");
return;
}
result.setMessage("成功");
synchronized (result) {
//喚醒等待隊列上的線程
System.out.println("[" + Thread.currentThread().getName() + "] 確認線程開始喚醒阻塞的線程");
result.notify();
}
}
class Result implements Serializable {
private static final long serialVersionUID = -4279280559711939661L;
String message;
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public Result() {
}
public Result(String message) {
this.message = message;
}
}
使用內置鎖的時候,我們把random生成的key和一個自己定義的Result對象放置到ConcurrentHashMap中去,登錄線程調用 Result對象的wait(timeout) 方法將當前線程掛起,並加入到Result對象的等待隊列上去
確認線程根據key值,找到對應的Result對象,設置好message,然后調用Result對象的notify()方法喚醒等待隊列上的線程,登錄線程得以繼續執行
那我們如何使用ReentrantLock和Condition來重寫這個例子:
package com.jiaoyiping.baseproject.condition;
import java.io.Serializable;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* Created with Intellij IDEA
*
* @author: jiaoyiping
* Mail: jiaoyiping@gmail.com
* Date: 2019/04/12
* Time: 14:56
* To change this template use File | Settings | Editor | File and Code Templates
*/
//使用ReentrantLock和Condition來實現的等待/通知模型
public class LoginServiceUseCondition {
private ReentrantLock lock = new ReentrantLock();
ConcurrentHashMap<String, Result> conditions = new ConcurrentHashMap<>();
public static void main(String[] args) throws InterruptedException {
LoginServiceUseCondition loginService = new LoginServiceUseCondition();
String uuid = UUID.randomUUID().toString();
System.out.println("[" + Thread.currentThread().getName() + "] 使用的UUID是:" + uuid);
CountDownLatch countDownLatch = new CountDownLatch(2);
new Thread(() -> {
loginService.login(uuid, 30_000);
countDownLatch.countDown();
}, "登錄線程").start();
Thread.sleep(5_000);
new Thread(() -> {
try {
Thread.sleep(3_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
loginService.confirm(uuid);
countDownLatch.countDown();
}, "確認線程").start();
countDownLatch.await();
System.out.println("[" + Thread.currentThread().getName() + "] 兩個線程都執行完畢了,退出");
}
/**
* 過了超時時間之后,鎖會自動釋放
*
* @param code
* @param timeout
*/
public void login(String code, int timeout) {
assert code != null;
try {
lock.tryLock(timeout, TimeUnit.MILLISECONDS);
Condition condition = lock.newCondition();
Result result = new Result("超時", condition);
conditions.put(code, result);
System.out.println("[" + Thread.currentThread().getName() + "] login()的請求開始阻塞");
condition.await();
System.out.println("[" + Thread.currentThread().getName() + "] 結束等待,繼續執行,拿到的結果是" + result.getMessage());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
//確認線程(拿這個UUID,去找到對應的Condition,喚醒上邊的等待隊列,並把Condition對象移除掉)
public void confirm(String code) {
assert code != null;
Result result = conditions.get(code);
Condition condition = result.getCondition();
if (condition != null) {
try {
System.out.println("[" + Thread.currentThread().getName() + "] 找到對應的Condition對象,將其等待隊列中的線程喚醒");
lock.lock();
result.setMessage("成功");
condition.signal();
conditions.remove(code);
} finally {
lock.unlock();
}
}
}
class Result implements Serializable {
String message;
final Condition condition;
public Result(String message, Condition condition) {
this.message = message;
this.condition = condition;
}
public Result(Condition condition) {
this.condition = condition;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public Condition getCondition() {
return condition;
}
}
}
上邊的例子說明了怎么使用ReentrantLock和Condition來代替內置鎖和wait(),notify(),notifyAll()
下邊的一個來自jdk中的例子,演示了如何使用同一個ReentrantLock上的多個等待隊列的情況
來自JDK文檔中的示例(我稍加改造,加上了main方法和一些日志):
package com.jiaoyiping.baseproject.condition;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.IntStream;
/**
* Created with Intellij IDEA
*
* @author: jiaoyiping
* Mail: jiaoyiping@gmail.com
* Date: 2019/04/12
* Time: 21:17
* To change this template use File | Settings | Editor | File and Code Templates
*/
public class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[10];
int putptr, takeptr, count;
public static void main(String[] args) throws InterruptedException {
BoundedBuffer boundedBuffer = new BoundedBuffer();
CountDownLatch countDownLatch = new CountDownLatch(40);
//分別啟動20個put線程和20個take線程
IntStream.rangeClosed(1, 20).forEach(i -> {
new Thread(() -> {
try {
boundedBuffer.put(new Object());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}, "put線程 - " + i).start();
});
IntStream.rangeClosed(1, 20).forEach(i -> {
new Thread(() -> {
try {
boundedBuffer.take();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}, "take線程-" + i).start();
});
countDownLatch.await();
System.out.println("[" + Thread.currentThread().getName() + "] 所有線程都執行完畢,退出");
}
public void put(Object x) throws InterruptedException {
lock.lock();
try {
//put的線程,當隊列滿的時候掛起
while (count == items.length) {
System.out.println("[" + Thread.currentThread().getName() + "] 線程掛起");
notFull.await();
}
Thread.sleep(1_000);
items[putptr] = x;
if (++putptr == items.length) {
putptr = 0;
}
++count;
System.out.println("[" + Thread.currentThread().getName() + "] 執行完畢寫操作,喚醒take線程");
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
//take的線程,當隊列為空的時候,掛起
while (count == 0) {
System.out.println("[" + Thread.currentThread().getName() + "] 線程掛起");
notEmpty.await();
}
Thread.sleep(1_000);
Object x = items[takeptr];
if (++takeptr == items.length) {
takeptr = 0;
}
--count;
System.out.println("[" + Thread.currentThread().getName() + "] 執行完畢讀操作,喚醒put線程");
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
我們看到,以上代碼中,使用到了兩個Condition:notFull和notEmpty,都是通過lock對象的newCondition()方法得來的
items被放滿之后,put的線程會在notFull的等待隊列上進行等待(執行了notFull.await()方法) put線程執行完操作之后,會調用 notEmpty.signal()來試圖喚醒在notEmpty上等待的線程(也就是給take線程發了一個信號,告訴它,items不是空的了,你可以過來take了)
當item空了之后,take線程會在notEmpty的等待隊列上進行等待(執行了notEmpty的await()方法) 當take線程執行完操作之后,會調用notFull.signal()來喚醒在notFull上等待的線程(也就是給put線程發一個信號,告訴它,items不滿了,你可以進行put操作了)
和內置方法類似,在調用await(),signal(),signalAll()等方法的時候,也必須要獲得鎖,也就是必須在 lock.lock()和lock.unlock()代碼塊兒之間才能調用這些方法,否則就會拋出IllegalMonitorStateException
