-
顯示鎖
Lock接口是Java 5.0新增的接口,該接口的定義如下:
12345678publicinterface Lock {void lock();void lockInterruptibly() throws InterruptedException;boolean tryLock();boolean tryLock(longtime, TimeUnit unit) throws InterruptedException;void unlock();Condition newCondition();}與內置加鎖機制不同的是,Lock提供了一種無條件的、可輪詢的、定時的以及可中斷的鎖獲取操作,所有加鎖和解鎖的方法都是顯示的。ReentrantLock實現了Lock接口,與內置鎖相比,ReentrantLock有以下優勢:可以中斷獲取鎖操作,獲取鎖時候可以設置超時時間。以下代碼給出了Lock接口的標准使用形式:
1234567Lock lock = new ReentrantLock();...lock.lock();try{...} finally {lock.unlock();1.1、輪詢鎖與定時鎖
可定時的與可輪詢的鎖獲取方式是由tryLock方法實現的,與無條件的鎖獲取方式相比,它具有跟完善的錯誤回復機制。tryLock方法的說明如下:
123456789101112131415161718boolean tryLock():僅在調用時鎖為空閑狀態才獲取該鎖。如果鎖可用,則獲取鎖,並立即返回值true。如果鎖不可用,則此方法將立即返回值false。boolean tryLock(longtime, TimeUnit unit) throws InterruptedException:如果鎖在給定的等待時間內空閑,並且當前線程未被中斷,則獲取鎖。如果鎖可用,則此方法將立即返回值true。如果鎖不可用,出於線程調度目的,將禁用當前線程,並且在發生以下三種情況之一前,該線程將一直處於休眠狀態:鎖由當前線程獲得;或者其他某個線程中斷當前線程,並且支持對鎖獲取的中斷;或者已超過指定的等待時間如果獲得了鎖,則返回值true。如果當前線程:在進入此方法時已經設置了該線程的中斷狀態;或者在獲取鎖時被中斷,並且支持對鎖獲取的中斷,則將拋出 InterruptedException,並會清除當前線程的已中斷狀態。如果超過了指定的等待時間,則將返回值false。如果time小於等於 0,該方法將完全不等待。在內置鎖中,死鎖是一個嚴重的問題,恢復程序的唯一方法是重新啟動程序,而防止死鎖的唯一方法就是在構造程序時避免出現不一致的鎖順序,可定時的與可輪詢的鎖提供了另一種選擇:先用tryLock()嘗試獲取所有的鎖,如果不能獲取所有需要的鎖,那么釋放已經獲取的鎖,然后重新嘗試獲取所有的鎖,以下例子演示了使用tryLock避免死鎖的方法:先用tryLock來獲取兩個鎖,如果不能同時獲取,那么就回退並重新嘗試。
123456789101112131415161718192021222324252627282930publicboolean transferMoney(Account fromAcct, Account toAcct, DollarAmount amount, long timeout, TimeUnit unit) throws InsufficientFundsException, InterruptedException {long fixedDelay = 1;long randMod = 2;long stopTime = System.nanoTime() + unit.toNanos(timeout);while (true) {if (fromAcct.lock.tryLock()) {try {if (toAcct.lock.tryLock()) {try {if (fromAcct.getBalance().compareTo(amount) < 0)throw new InsufficientFundsException();else{fromAcct.debit(amount);toAcct.credit(amount);returntrue;}} finally {toAcct.lock.unlock();}}} finally {fromAcct.lock.unlock();}}if (System.nanoTime() < stopTime)returnfalse;NANOSECONDS.sleep(fixedDelay + rnd.nextLong() % randMod);}}1.2、可中斷的鎖獲取操作
lockInterruptibly方法能夠在獲得鎖的同時保持對中斷的響應,該方法說明如下:
1234567891011121314void lockInterruptibly() throws InterruptedException:如果當前線程未被中斷,則獲取鎖。如果鎖可用,則獲取鎖,並立即返回。如果鎖不可用,出於線程調度目的,將禁用當前線程,並且在發生以下兩種情況之一以前,該線程將一直處於休眠狀態:鎖由當前線程獲得;或者其他某個線程中斷當前線程,並且支持對鎖獲取的中斷。如果當前線程:在進入此方法時已經設置了該線程的中斷狀態;或者在獲取鎖時被中斷,並且支持對鎖獲取的中斷,則將拋出 InterruptedException,並清除當前線程的已中斷狀態。1.3、讀-寫鎖
Java 5除了增加了Lock接口,還增加了ReadWriteLock接口,即讀寫鎖,該接口定義如下:
1234publicinterface ReadWriteLock {Lock readLock();Lock writeLock();}讀寫鎖允許多個讀線程並發執行,但是不允許寫線程與讀線程並發執行,也不允許寫線程與寫線程並發執行。下面的例子使用了ReentrantReadWriteLock包裝Map,從而使他能夠在多個線程之間安全的共享:
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091publicclassReadWriteMap <K,V> {privatefinalMap<K, V> map;privatefinalReadWriteLock lock =newReentrantReadWriteLock();privatefinalLock r = lock.readLock();privatefinalLock w = lock.writeLock();publicReadWriteMap(Map<K, V> map) {this.map = map;}publicV put(K key, V value) {w.lock();try{returnmap.put(key, value);}finally{w.unlock();}}publicV remove(Object key) {w.lock();try{returnmap.remove(key);}finally{w.unlock();}}publicvoidputAll(Map<?extendsK, ?extendsV> m) {w.lock();try{map.putAll(m);}finally{w.unlock();}}publicvoidclear() {w.lock();try{map.clear();}finally{w.unlock();}}publicV get(Object key) {r.lock();try{returnmap.get(key);}finally{r.unlock();}}publicintsize() {r.lock();try{returnmap.size();}finally{r.unlock();}}publicbooleanisEmpty() {r.lock();try{returnmap.isEmpty();}finally{r.unlock();}}publicbooleancontainsKey(Object key) {r.lock();try{returnmap.containsKey(key);}finally{r.unlock();}}publicbooleancontainsValue(Object value) {r.lock();try{returnmap.containsValue(value);}finally{r.unlock();}}}同步工具類
2.1、閉鎖
閉鎖是一個同步輔助類,在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待。
用給定的計數初始化 CountDownLatch。由於調用了 countDown() 方法,所以在當前計數到達零之前,await 方法會一直受阻塞。之后,會釋放所有等待的線程,await 的所有后續調用都將立即返回。這種現象只出現一次——計數無法被重置。如果需要重置計數,請考慮使用 CyclicBarrier。
下例給出了閉鎖的常見用法,TestHarness創建一定數量的線程,利用它們並發的執行指定的任務,它使用兩個閉鎖,分別表示"起始門"和"結束門"。每個線程首先要做的就是在啟動門上等待,從而確保所有線程都就緒后才開始執行,而每個線程要做的最后一件事是將調用結束門的countDown方法減1,這能使主線程高效地等待直到所有工作線程都執行完畢,因此可以統計所消耗的時間:
123456789101112131415161718192021222324252627282930publicclassTestHarness {publiclongtimeTasks(intnThreads,finalRunnable task)throwsInterruptedException {finalCountDownLatch startGate =newCountDownLatch(1);finalCountDownLatch endGate =newCountDownLatch(nThreads);for(inti =0; i < nThreads; i++) {Thread t =newThread() {publicvoidrun() {try{startGate.await();try{task.run();}finally{endGate.countDown();}}catch(InterruptedException ignored) {}}};t.start();}longstart = System.nanoTime();startGate.countDown();endGate.await();longend = System.nanoTime();returnend - start;}}2.2、FutureTask
FutureTask表示可取消的異步計算。利用開始和取消計算的方法、查詢計算是否完成的方法和獲取計算結果的方法,此類提供了對 Future 的基本實現。僅在計算完成時才能獲取結果;如果計算尚未完成,則阻塞 get 方法。一旦計算完成,就不能再重新開始或取消計算。FutureTask的方法摘要如下:
1234567891011121314151617181920212223242526272829booleancancel(booleanmayInterruptIfRunning)試圖取消對此任務的執行。protectedvoiddone()當此任務轉換到狀態 isDone(不管是正常地還是通過取消)時,調用受保護的方法。V get()throwsInterruptedException, ExecutionException如有必要,等待計算完成,然后獲取其結果。V get(longtimeout, TimeUnit unit)throwsInterruptedException, ExecutionException, TimeoutException如有必要,最多等待為使計算完成所給定的時間之后,獲取其結果(如果結果可用)。booleanisCancelled()如果在任務正常完成前將其取消,則返回true。booleanisDone()如果任務已完成,則返回true。voidrun()除非已將此 Future 取消,否則將其設置為其計算的結果。protectedbooleanrunAndReset()執行計算而不設置其結果,然后將此 Future 重置為初始狀態,如果計算遇到異常或已取消,則該操作失敗。protectedvoidset(V v)除非已經設置了此 Future 或已將其取消,否則將其結果設置為給定的值。protectedvoidsetException(Throwable t)除非已經設置了此 Future 或已將其取消,否則它將報告一個 ExecutionException,並將給定的 throwable 作為其原因。FutureTask可以用來表示一些時間較長的計算,這些計算可以在使用計算結果之前啟動,以下代碼就是模擬一個高開銷的計算,我們可以先調用start()方法開始計算,然后在需要結果時,再調用get得到結果:
1234567891011121314151617181920212223242526272829303132333435publicclassPreloader {ProductInfo loadProductInfo()throwsDataLoadException {returnnull;}privatefinalFutureTask<ProductInfo> future =newFutureTask<ProductInfo>(newCallable<ProductInfo>() {publicProductInfo call()throwsDataLoadException {returnloadProductInfo();}});privatefinalThread thread =newThread(future);publicvoidstart() {thread.start();}publicProductInfo get()throwsDataLoadException, InterruptedException {try{returnfuture.get();}catch(ExecutionException e) {Throwable cause = e.getCause();if(causeinstanceofDataLoadException)throw(DataLoadException) cause;elsethrownewRuntimeException(e);}}interfaceProductInfo {}}classDataLoadExceptionextendsException {}2.3、信號量
從概念上講,信號量維護了一個許可集。如有必要,在許可可用前會阻塞每一個 acquire(),然后等待獲取許可。每個 release() 添加一個許可,從而可能釋放一個正在阻塞的獲取者。但是,不使用實際的許可對象,Semaphore 只對可用許可的號碼進行計數,並采取相應的行動。
Semaphore 通常用於限制可以訪問某些資源(物理或邏輯的)的線程數目。例如,下面的類使用信號量控制對內容池的訪問:
1234567891011121314151617181920212223242526272829303132333435363738394041classPool {privatestaticfinalintMAX_AVAILABLE =100;privatefinalSemaphore available =newSemaphore(MAX_AVAILABLE,true);publicObject getItem()throwsInterruptedException {available.acquire();returngetNextAvailableItem();}publicvoidputItem(Object x) {if(markAsUnused(x))available.release();}// Not a particularly efficient data structure; just for demoprotectedObject[] items = ... whatever kinds of items being managedprotectedboolean[] used =newboolean[MAX_AVAILABLE];protectedsynchronizedObject getNextAvailableItem() {for(inti =0; i < MAX_AVAILABLE; ++i) {if(!used[i]) {used[i] =true;returnitems[i];}}returnnull;// not reached}protectedsynchronizedbooleanmarkAsUnused(Object item) {for(inti =0; i < MAX_AVAILABLE; ++i) {if(item == items[i]) {if(used[i]) {used[i] =false;returntrue;}elsereturnfalse;}}returnfalse;}}獲得一項前,每個線程必須從信號量獲取許可,從而保證可以使用該項。該線程結束后,將項返回到池中並將許可返回到該信號量,從而允許其他線程獲取該項。注意,調用 acquire() 時無法保持同步鎖,因為這會阻止將項返回到池中。信號量封裝所需的同步,以限制對池的訪問,這同維持該池本身一致性所需的同步是分開的。
將信號量初始化為 1,使得它在使用時最多只有一個可用的許可,從而可用作一個相互排斥的鎖。這通常也稱為二進制信號量,因為它只能有兩種狀態:一個可用的許可,或零個可用的許可。按此方式使用時,二進制信號量具有某種屬性(與很多 Lock 實現不同),即可以由線程釋放“鎖”,而不是由所有者(因為信號量沒有所有權的概念)。在某些專門的上下文(如死鎖恢復)中這會很有用。
Semaphore的構造方法可選地接受一個公平 參數。當設置為 false 時,此類不對線程獲取許可的順序做任何保證。特別地,闖入 是允許的,也就是說可以在已經等待的線程前為調用 acquire() 的線程分配一個許可,從邏輯上說,就是新線程將自己置於等待線程隊列的頭部。當公平設置為 true時,信號量保證對於任何調用獲取方法的線程而言,都按照處理它們調用這些方法的順序(即先進先出;FIFO)來選擇線程、獲得許可。注意,FIFO 排序必然應用到這些方法內的指定內部執行點。所以,可能某個線程先於另一個線程調用了acquire,但是卻在該線程之后到達排序點,並且從方法返回時也類似。還要注意,非同步的tryAcquire 方法不使用公平設置,而是使用任意可用的許可。
通常,應該將用於控制資源訪問的信號量初始化為公平的,以確保所有線程都可訪問資源。為其他的種類的同步控制使用信號量時,非公平排序的吞吐量優勢通常要比公平考慮更為重要。
Semaphore還提供便捷的方法來同時 acquire 和釋放多個許可。小心,在未將公平設置為 true 時使用這些方法會增加不確定延期的風險。
內存一致性效果:線程中調用“釋放”方法(比如 release())之前的操作 happen-before 另一線程中緊跟在成功的“獲取”方法(比如 acquire())之后的操作。
2.4、柵欄
CyclicBarrier是一個同步輔助類,它允許一組線程互相等待,直到到達某個公共屏障點。在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此時 CyclicBarrier很有用。因為該 barrier在釋放等待線程后可以重用,所以稱它為循環的barrier。
CyclicBarrier支持一個可選的Runnable命令,在一組線程中的最后一個線程到達之后(但在釋放所有線程之前),該命令只在每個屏障點運行一次。若在繼續所有參與線程之前更新共享狀態,此屏障操作很有用。
示例用法:下面是一個在並行分解設計中使用barrier的例子:
123456789101112131415161718192021222324252627282930313233343536373839404142classSolver {finalintN;finalfloat[][] data;finalCyclicBarrier barrier;classWorkerimplementsRunnable {intmyRow;Worker(introw) {myRow = row;}publicvoidrun() {while(!done()) {processRow(myRow);try{barrier.await();}catch(InterruptedException ex) {return;}catch(BrokenBarrierException ex) {return;}}}}publicSolver(float[][] matrix) {data = matrix;N = matrix.length;barrier =newCyclicBarrier(N,newRunnable() {publicvoidrun() {//mergeRows(...);}});for(inti =0; i < N; ++i)newThread(newWorker(i)).start();waitUntilDone();}}在這個例子中,每個 worker 線程處理矩陣的一行,在處理完所有的行之前,該線程將一直在屏障處等待。處理完所有的行之后,將執行所提供的 Runnable 屏障操作,並合並這些行。如果合並者確定已經找到了一個解決方案,那么 done() 將返回 true,所有的 worker 線程都將終止。
如果屏障操作在執行時不依賴於正掛起的線程,則線程組中的任何線程在獲得釋放時都能執行該操作。為方便此操作,每次調用 await() 都將返回能到達屏障處的線程的索引。然后,您可以選擇哪個線程應該執行屏障操作.
對於失敗的同步嘗試,CyclicBarrier 使用了一種要么全部要么全不 (all-or-none) 的破壞模式:如果因為中斷、失敗或者超時等原因,導致線程過早地離開了屏障點,那么在該屏障點等待的其他所有線程也將通過 BrokenBarrierException以反常的方式離開。
內存一致性效果:線程中調用 await() 之前的操作 happen-before 那些是屏障操作的一部份的操作,后者依次 happen-before 緊跟在從另一個線程中對應 await() 成功返回的操作。
