-
顯示鎖
Lock接口是Java 5.0新增的接口,該接口的定義如下:
12345678public
interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long
time
, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
與內置加鎖機制不同的是,Lock提供了一種無條件的、可輪詢的、定時的以及可中斷的鎖獲取操作,所有加鎖和解鎖的方法都是顯示的。ReentrantLock實現了Lock接口,與內置鎖相比,ReentrantLock有以下優勢:可以中斷獲取鎖操作,獲取鎖時候可以設置超時時間。以下代碼給出了Lock接口的標准使用形式:
1234567Lock lock = new ReentrantLock();
...
lock.lock();
try{
...
} finally {
lock.unlock();
1.1、輪詢鎖與定時鎖
可定時的與可輪詢的鎖獲取方式是由tryLock方法實現的,與無條件的鎖獲取方式相比,它具有跟完善的錯誤回復機制。tryLock方法的說明如下:
123456789101112131415161718boolean tryLock():僅在調用時鎖為空閑狀態才獲取該鎖。如果鎖可用,則獲取鎖,並立即返回值
true
。如果鎖不可用,則此方法將立即返回值
false
。
boolean tryLock(long
time
, TimeUnit unit) throws InterruptedException:
如果鎖在給定的等待時間內空閑,並且當前線程未被中斷,則獲取鎖。
如果鎖可用,則此方法將立即返回值
true
。如果鎖不可用,出於線程調度目的,將禁用當前線程,並且在發生以下三種情況之一前,該線程將一直處於休眠狀態:
鎖由當前線程獲得;或者
其他某個線程中斷當前線程,並且支持對鎖獲取的中斷;或者
已超過指定的等待時間
如果獲得了鎖,則返回值
true
。
如果當前線程:
在進入此方法時已經設置了該線程的中斷狀態;或者
在獲取鎖時被中斷,並且支持對鎖獲取的中斷,
則將拋出 InterruptedException,並會清除當前線程的已中斷狀態。
如果超過了指定的等待時間,則將返回值
false
。如果
time
小於等於 0,該方法將完全不等待。
在內置鎖中,死鎖是一個嚴重的問題,恢復程序的唯一方法是重新啟動程序,而防止死鎖的唯一方法就是在構造程序時避免出現不一致的鎖順序,可定時的與可輪詢的鎖提供了另一種選擇:先用tryLock()嘗試獲取所有的鎖,如果不能獲取所有需要的鎖,那么釋放已經獲取的鎖,然后重新嘗試獲取所有的鎖,以下例子演示了使用tryLock避免死鎖的方法:先用tryLock來獲取兩個鎖,如果不能同時獲取,那么就回退並重新嘗試。
123456789101112131415161718192021222324252627282930public
boolean transferMoney(Account fromAcct, Account toAcct, DollarAmount amount, long timeout, TimeUnit unit) throws InsufficientFundsException, InterruptedException {
long fixedDelay = 1;
long randMod = 2;
long stopTime = System.nanoTime() + unit.toNanos(timeout);
while (
true
) {
if (fromAcct.lock.tryLock()) {
try {
if (toAcct.lock.tryLock()) {
try {
if (fromAcct.getBalance().compareTo(amount) < 0)
throw new InsufficientFundsException();
else
{
fromAcct.debit(amount);
toAcct.credit(amount);
return
true
;
}
} finally {
toAcct.lock.unlock();
}
}
} finally {
fromAcct.lock.unlock();
}
}
if (System.nanoTime() < stopTime)
return
false
;
NANOSECONDS.sleep(fixedDelay + rnd.nextLong() % randMod);
}
}
1.2、可中斷的鎖獲取操作
lockInterruptibly方法能夠在獲得鎖的同時保持對中斷的響應,該方法說明如下:
1234567891011121314void lockInterruptibly() throws InterruptedException:
如果當前線程未被中斷,則獲取鎖。
如果鎖可用,則獲取鎖,並立即返回。
如果鎖不可用,出於線程調度目的,將禁用當前線程,並且在發生以下兩種情況之一以前,該線程將一直處於休眠狀態:
鎖由當前線程獲得;或者
其他某個線程中斷當前線程,並且支持對鎖獲取的中斷。
如果當前線程:
在進入此方法時已經設置了該線程的中斷狀態;或者
在獲取鎖時被中斷,並且支持對鎖獲取的中斷,
則將拋出 InterruptedException,並清除當前線程的已中斷狀態。
1.3、讀-寫鎖
Java 5除了增加了Lock接口,還增加了ReadWriteLock接口,即讀寫鎖,該接口定義如下:
1234public
interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
讀寫鎖允許多個讀線程並發執行,但是不允許寫線程與讀線程並發執行,也不允許寫線程與寫線程並發執行。下面的例子使用了ReentrantReadWriteLock包裝Map,從而使他能夠在多個線程之間安全的共享:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091public
class
ReadWriteMap <K,V> {
private
final
Map<K, V> map;
private
final
ReadWriteLock lock =
new
ReentrantReadWriteLock();
private
final
Lock r = lock.readLock();
private
final
Lock w = lock.writeLock();
public
ReadWriteMap(Map<K, V> map) {
this
.map = map;
}
public
V put(K key, V value) {
w.lock();
try
{
return
map.put(key, value);
}
finally
{
w.unlock();
}
}
public
V remove(Object key) {
w.lock();
try
{
return
map.remove(key);
}
finally
{
w.unlock();
}
}
public
void
putAll(Map<?
extends
K, ?
extends
V> m) {
w.lock();
try
{
map.putAll(m);
}
finally
{
w.unlock();
}
}
public
void
clear() {
w.lock();
try
{
map.clear();
}
finally
{
w.unlock();
}
}
public
V get(Object key) {
r.lock();
try
{
return
map.get(key);
}
finally
{
r.unlock();
}
}
public
int
size() {
r.lock();
try
{
return
map.size();
}
finally
{
r.unlock();
}
}
public
boolean
isEmpty() {
r.lock();
try
{
return
map.isEmpty();
}
finally
{
r.unlock();
}
}
public
boolean
containsKey(Object key) {
r.lock();
try
{
return
map.containsKey(key);
}
finally
{
r.unlock();
}
}
public
boolean
containsValue(Object value) {
r.lock();
try
{
return
map.containsValue(value);
}
finally
{
r.unlock();
}
}
}
同步工具類
2.1、閉鎖
閉鎖是一個同步輔助類,在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待。
用給定的計數初始化 CountDownLatch。由於調用了 countDown() 方法,所以在當前計數到達零之前,await 方法會一直受阻塞。之后,會釋放所有等待的線程,await 的所有后續調用都將立即返回。這種現象只出現一次——計數無法被重置。如果需要重置計數,請考慮使用 CyclicBarrier。
下例給出了閉鎖的常見用法,TestHarness創建一定數量的線程,利用它們並發的執行指定的任務,它使用兩個閉鎖,分別表示"起始門"和"結束門"。每個線程首先要做的就是在啟動門上等待,從而確保所有線程都就緒后才開始執行,而每個線程要做的最后一件事是將調用結束門的countDown方法減1,這能使主線程高效地等待直到所有工作線程都執行完畢,因此可以統計所消耗的時間:
123456789101112131415161718192021222324252627282930public
class
TestHarness {
public
long
timeTasks(
int
nThreads,
final
Runnable task)
throws
InterruptedException {
final
CountDownLatch startGate =
new
CountDownLatch(
1
);
final
CountDownLatch endGate =
new
CountDownLatch(nThreads);
for
(
int
i =
0
; i < nThreads; i++) {
Thread t =
new
Thread() {
public
void
run() {
try
{
startGate.await();
try
{
task.run();
}
finally
{
endGate.countDown();
}
}
catch
(InterruptedException ignored) {
}
}
};
t.start();
}
long
start = System.nanoTime();
startGate.countDown();
endGate.await();
long
end = System.nanoTime();
return
end - start;
}
}
2.2、FutureTask
FutureTask表示可取消的異步計算。利用開始和取消計算的方法、查詢計算是否完成的方法和獲取計算結果的方法,此類提供了對 Future 的基本實現。僅在計算完成時才能獲取結果;如果計算尚未完成,則阻塞 get 方法。一旦計算完成,就不能再重新開始或取消計算。FutureTask的方法摘要如下:
1234567891011121314151617181920212223242526272829boolean
cancel(
boolean
mayInterruptIfRunning)
試圖取消對此任務的執行。
protected
void
done()
當此任務轉換到狀態 isDone(不管是正常地還是通過取消)時,調用受保護的方法。
V get()
throws
InterruptedException, ExecutionException
如有必要,等待計算完成,然后獲取其結果。
V get(
long
timeout, TimeUnit unit)
throws
InterruptedException, ExecutionException, TimeoutException
如有必要,最多等待為使計算完成所給定的時間之后,獲取其結果(如果結果可用)。
boolean
isCancelled()
如果在任務正常完成前將其取消,則返回
true
。
boolean
isDone()
如果任務已完成,則返回
true
。
void
run()
除非已將此 Future 取消,否則將其設置為其計算的結果。
protected
boolean
runAndReset()
執行計算而不設置其結果,然后將此 Future 重置為初始狀態,如果計算遇到異常或已取消,則該操作失敗。
protected
void
set(V v)
除非已經設置了此 Future 或已將其取消,否則將其結果設置為給定的值。
protected
void
setException(Throwable t)
除非已經設置了此 Future 或已將其取消,否則它將報告一個 ExecutionException,並將給定的 throwable 作為其原因。
FutureTask可以用來表示一些時間較長的計算,這些計算可以在使用計算結果之前啟動,以下代碼就是模擬一個高開銷的計算,我們可以先調用start()方法開始計算,然后在需要結果時,再調用get得到結果:
1234567891011121314151617181920212223242526272829303132333435public
class
Preloader {
ProductInfo loadProductInfo()
throws
DataLoadException {
return
null
;
}
private
final
FutureTask<ProductInfo> future =
new
FutureTask<ProductInfo>(
new
Callable<ProductInfo>() {
public
ProductInfo call()
throws
DataLoadException {
return
loadProductInfo();
}
});
private
final
Thread thread =
new
Thread(future);
public
void
start() {
thread.start();
}
public
ProductInfo get()
throws
DataLoadException, InterruptedException {
try
{
return
future.get();
}
catch
(ExecutionException e) {
Throwable cause = e.getCause();
if
(cause
instanceof
DataLoadException)
throw
(DataLoadException) cause;
else
throw
new
RuntimeException(e);
}
}
interface
ProductInfo {
}
}
class
DataLoadException
extends
Exception {
}
2.3、信號量
從概念上講,信號量維護了一個許可集。如有必要,在許可可用前會阻塞每一個 acquire(),然后等待獲取許可。每個 release() 添加一個許可,從而可能釋放一個正在阻塞的獲取者。但是,不使用實際的許可對象,Semaphore 只對可用許可的號碼進行計數,並采取相應的行動。
Semaphore 通常用於限制可以訪問某些資源(物理或邏輯的)的線程數目。例如,下面的類使用信號量控制對內容池的訪問:
1234567891011121314151617181920212223242526272829303132333435363738394041class
Pool {
private
static
final
int
MAX_AVAILABLE =
100
;
private
final
Semaphore available =
new
Semaphore(MAX_AVAILABLE,
true
);
public
Object getItem()
throws
InterruptedException {
available.acquire();
return
getNextAvailableItem();
}
public
void
putItem(Object x) {
if
(markAsUnused(x))
available.release();
}
// Not a particularly efficient data structure; just for demo
protected
Object[] items = ... whatever kinds of items being managed
protected
boolean
[] used =
new
boolean
[MAX_AVAILABLE];
protected
synchronized
Object getNextAvailableItem() {
for
(
int
i =
0
; i < MAX_AVAILABLE; ++i) {
if
(!used[i]) {
used[i] =
true
;
return
items[i];
}
}
return
null
;
// not reached
}
protected
synchronized
boolean
markAsUnused(Object item) {
for
(
int
i =
0
; i < MAX_AVAILABLE; ++i) {
if
(item == items[i]) {
if
(used[i]) {
used[i] =
false
;
return
true
;
}
else
return
false
;
}
}
return
false
;
}
}
獲得一項前,每個線程必須從信號量獲取許可,從而保證可以使用該項。該線程結束后,將項返回到池中並將許可返回到該信號量,從而允許其他線程獲取該項。注意,調用 acquire() 時無法保持同步鎖,因為這會阻止將項返回到池中。信號量封裝所需的同步,以限制對池的訪問,這同維持該池本身一致性所需的同步是分開的。
將信號量初始化為 1,使得它在使用時最多只有一個可用的許可,從而可用作一個相互排斥的鎖。這通常也稱為二進制信號量,因為它只能有兩種狀態:一個可用的許可,或零個可用的許可。按此方式使用時,二進制信號量具有某種屬性(與很多 Lock 實現不同),即可以由線程釋放“鎖”,而不是由所有者(因為信號量沒有所有權的概念)。在某些專門的上下文(如死鎖恢復)中這會很有用。
Semaphore的構造方法可選地接受一個公平 參數。當設置為 false 時,此類不對線程獲取許可的順序做任何保證。特別地,闖入 是允許的,也就是說可以在已經等待的線程前為調用 acquire() 的線程分配一個許可,從邏輯上說,就是新線程將自己置於等待線程隊列的頭部。當公平設置為 true時,信號量保證對於任何調用獲取方法的線程而言,都按照處理它們調用這些方法的順序(即先進先出;FIFO)來選擇線程、獲得許可。注意,FIFO 排序必然應用到這些方法內的指定內部執行點。所以,可能某個線程先於另一個線程調用了acquire,但是卻在該線程之后到達排序點,並且從方法返回時也類似。還要注意,非同步的tryAcquire 方法不使用公平設置,而是使用任意可用的許可。
通常,應該將用於控制資源訪問的信號量初始化為公平的,以確保所有線程都可訪問資源。為其他的種類的同步控制使用信號量時,非公平排序的吞吐量優勢通常要比公平考慮更為重要。
Semaphore還提供便捷的方法來同時 acquire 和釋放多個許可。小心,在未將公平設置為 true 時使用這些方法會增加不確定延期的風險。
內存一致性效果:線程中調用“釋放”方法(比如 release())之前的操作 happen-before 另一線程中緊跟在成功的“獲取”方法(比如 acquire())之后的操作。
2.4、柵欄
CyclicBarrier是一個同步輔助類,它允許一組線程互相等待,直到到達某個公共屏障點。在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此時 CyclicBarrier很有用。因為該 barrier在釋放等待線程后可以重用,所以稱它為循環的barrier。
CyclicBarrier支持一個可選的Runnable命令,在一組線程中的最后一個線程到達之后(但在釋放所有線程之前),該命令只在每個屏障點運行一次。若在繼續所有參與線程之前更新共享狀態,此屏障操作很有用。
示例用法:下面是一個在並行分解設計中使用barrier的例子:
123456789101112131415161718192021222324252627282930313233343536373839404142class
Solver {
final
int
N;
final
float
[][] data;
final
CyclicBarrier barrier;
class
Worker
implements
Runnable {
int
myRow;
Worker(
int
row) {
myRow = row;
}
public
void
run() {
while
(!done()) {
processRow(myRow);
try
{
barrier.await();
}
catch
(InterruptedException ex) {
return
;
}
catch
(BrokenBarrierException ex) {
return
;
}
}
}
}
public
Solver(
float
[][] matrix) {
data = matrix;
N = matrix.length;
barrier =
new
CyclicBarrier(N,
new
Runnable() {
public
void
run() {
//mergeRows(...);
}
});
for
(
int
i =
0
; i < N; ++i)
new
Thread(
new
Worker(i)).start();
waitUntilDone();
}
}
在這個例子中,每個 worker 線程處理矩陣的一行,在處理完所有的行之前,該線程將一直在屏障處等待。處理完所有的行之后,將執行所提供的 Runnable 屏障操作,並合並這些行。如果合並者確定已經找到了一個解決方案,那么 done() 將返回 true,所有的 worker 線程都將終止。
如果屏障操作在執行時不依賴於正掛起的線程,則線程組中的任何線程在獲得釋放時都能執行該操作。為方便此操作,每次調用 await() 都將返回能到達屏障處的線程的索引。然后,您可以選擇哪個線程應該執行屏障操作.
對於失敗的同步嘗試,CyclicBarrier 使用了一種要么全部要么全不 (all-or-none) 的破壞模式:如果因為中斷、失敗或者超時等原因,導致線程過早地離開了屏障點,那么在該屏障點等待的其他所有線程也將通過 BrokenBarrierException以反常的方式離開。
內存一致性效果:線程中調用 await() 之前的操作 happen-before 那些是屏障操作的一部份的操作,后者依次 happen-before 緊跟在從另一個線程中對應 await() 成功返回的操作。