JAVA多線程編中的輪詢鎖與定時鎖


  • 顯示鎖                                                                                    

          Lock接口是Java 5.0新增的接口,該接口的定義如下:

    1
    2
    3
    4
    5
    6
    7
    8
    public interface Lock {
         void lock();
         void lockInterruptibly() throws InterruptedException;
         boolean tryLock();
         boolean tryLock(long  time , TimeUnit unit) throws InterruptedException;
         void unlock();
         Condition newCondition();
    }

      與內置加鎖機制不同的是,Lock提供了一種無條件的、可輪詢的、定時的以及可中斷的鎖獲取操作,所有加鎖和解鎖的方法都是顯示的。ReentrantLock實現了Lock接口,與內置鎖相比,ReentrantLock有以下優勢:可以中斷獲取鎖操作,獲取鎖時候可以設置超時時間。以下代碼給出了Lock接口的標准使用形式:

    1
    2
    3
    4
    5
    6
    7
    Lock lock = new ReentrantLock();
    ...
    lock.lock();
    try{
         ...
    } finally {
         lock.unlock();

    1.1、輪詢鎖與定時鎖

          可定時的與可輪詢的鎖獲取方式是由tryLock方法實現的,與無條件的鎖獲取方式相比,它具有跟完善的錯誤回復機制。tryLock方法的說明如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    boolean tryLock():僅在調用時鎖為空閑狀態才獲取該鎖。如果鎖可用,則獲取鎖,並立即返回值  true 。如果鎖不可用,則此方法將立即返回值  false
    boolean tryLock(long  time , TimeUnit unit) throws InterruptedException:
      如果鎖在給定的等待時間內空閑,並且當前線程未被中斷,則獲取鎖。
      如果鎖可用,則此方法將立即返回值  true 。如果鎖不可用,出於線程調度目的,將禁用當前線程,並且在發生以下三種情況之一前,該線程將一直處於休眠狀態:
      鎖由當前線程獲得;或者
      其他某個線程中斷當前線程,並且支持對鎖獲取的中斷;或者
      已超過指定的等待時間
      如果獲得了鎖,則返回值  true
      如果當前線程:
      在進入此方法時已經設置了該線程的中斷狀態;或者
      在獲取鎖時被中斷,並且支持對鎖獲取的中斷,
      則將拋出 InterruptedException,並會清除當前線程的已中斷狀態。
      如果超過了指定的等待時間,則將返回值  false 。如果  time 小於等於 0,該方法將完全不等待。

      在內置鎖中,死鎖是一個嚴重的問題,恢復程序的唯一方法是重新啟動程序,而防止死鎖的唯一方法就是在構造程序時避免出現不一致的鎖順序,可定時的與可輪詢的鎖提供了另一種選擇:先用tryLock()嘗試獲取所有的鎖,如果不能獲取所有需要的鎖,那么釋放已經獲取的鎖,然后重新嘗試獲取所有的鎖,以下例子演示了使用tryLock避免死鎖的方法:先用tryLock來獲取兩個鎖,如果不能同時獲取,那么就回退並重新嘗試。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    public boolean transferMoney(Account fromAcct, Account toAcct, DollarAmount amount,  long timeout, TimeUnit unit) throws InsufficientFundsException, InterruptedException {
         long fixedDelay = 1;
         long randMod = 2;
         long stopTime = System.nanoTime() + unit.toNanos(timeout);
         while ( true ) {
             if (fromAcct.lock.tryLock()) {
                 try {
                     if (toAcct.lock.tryLock()) {
                         try {
                             if (fromAcct.getBalance().compareTo(amount) < 0)
                                 throw new InsufficientFundsException();
                             else {
                                 fromAcct.debit(amount);
                                 toAcct.credit(amount);
                                 return true ;
                             }
                         } finally {
                             toAcct.lock.unlock();
                         }
                     }
                 } finally {
                     fromAcct.lock.unlock();
                 }
             }
             if (System.nanoTime() < stopTime)
                 return false ;
             NANOSECONDS.sleep(fixedDelay + rnd.nextLong() % randMod);
         }
    }

    1.2、可中斷的鎖獲取操作

          lockInterruptibly方法能夠在獲得鎖的同時保持對中斷的響應,該方法說明如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    void lockInterruptibly() throws InterruptedException:
    如果當前線程未被中斷,則獲取鎖。
    如果鎖可用,則獲取鎖,並立即返回。
    如果鎖不可用,出於線程調度目的,將禁用當前線程,並且在發生以下兩種情況之一以前,該線程將一直處於休眠狀態:
    鎖由當前線程獲得;或者
    其他某個線程中斷當前線程,並且支持對鎖獲取的中斷。
    如果當前線程:
    在進入此方法時已經設置了該線程的中斷狀態;或者
    在獲取鎖時被中斷,並且支持對鎖獲取的中斷,
    則將拋出 InterruptedException,並清除當前線程的已中斷狀態。

    1.3、讀-寫鎖

          Java 5除了增加了Lock接口,還增加了ReadWriteLock接口,即讀寫鎖,該接口定義如下:

    1
    2
    3
    4
    public interface ReadWriteLock {
         Lock readLock();
         Lock writeLock();
    }

      讀寫鎖允許多個讀線程並發執行,但是不允許寫線程與讀線程並發執行,也不允許寫線程與寫線程並發執行。下面的例子使用了ReentrantReadWriteLock包裝Map,從而使他能夠在多個線程之間安全的共享:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    public class ReadWriteMap <K,V> {
         private final Map<K, V> map;
         private final ReadWriteLock lock =  new ReentrantReadWriteLock();
         private final Lock r = lock.readLock();
         private final Lock w = lock.writeLock();
         public ReadWriteMap(Map<K, V> map) {
             this .map = map;
         }
         public V put(K key, V value) {
             w.lock();
             try {
                 return map.put(key, value);
             finally {
                 w.unlock();
             }
         }
         public V remove(Object key) {
             w.lock();
             try {
                 return map.remove(key);
             finally {
                 w.unlock();
             }
         }
         public void putAll(Map<?  extends K, ?  extends V> m) {
             w.lock();
             try {
                 map.putAll(m);
             finally {
                 w.unlock();
             }
         }
         public void clear() {
             w.lock();
             try {
                 map.clear();
             finally {
                 w.unlock();
             }
         }
         public V get(Object key) {
             r.lock();
             try {
                 return map.get(key);
             finally {
                 r.unlock();
             }
         }
         public int size() {
             r.lock();
             try {
                 return map.size();
             finally {
                 r.unlock();
             }
         }
         public boolean isEmpty() {
             r.lock();
             try {
                 return map.isEmpty();
             finally {
                 r.unlock();
             }
         }
         public boolean containsKey(Object key) {
             r.lock();
             try {
                 return map.containsKey(key);
             finally {
                 r.unlock();
             }
         }
         public boolean containsValue(Object value) {
             r.lock();
             try {
                 return map.containsValue(value);
             finally {
                 r.unlock();
             }
         }
    }

    同步工具類                                                                            

    2.1、閉鎖

          閉鎖是一個同步輔助類,在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待。

          用給定的計數初始化 CountDownLatch。由於調用了 countDown() 方法,所以在當前計數到達零之前,await 方法會一直受阻塞。之后,會釋放所有等待的線程,await 的所有后續調用都將立即返回。這種現象只出現一次——計數無法被重置。如果需要重置計數,請考慮使用 CyclicBarrier。

          下例給出了閉鎖的常見用法,TestHarness創建一定數量的線程,利用它們並發的執行指定的任務,它使用兩個閉鎖,分別表示"起始門"和"結束門"。每個線程首先要做的就是在啟動門上等待,從而確保所有線程都就緒后才開始執行,而每個線程要做的最后一件事是將調用結束門的countDown方法減1,這能使主線程高效地等待直到所有工作線程都執行完畢,因此可以統計所消耗的時間:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    public class TestHarness {
         public long timeTasks( int nThreads,  final Runnable task)
                 throws InterruptedException {
             final CountDownLatch startGate =  new CountDownLatch( 1 );
             final CountDownLatch endGate =  new CountDownLatch(nThreads);
             for ( int i =  0 ; i < nThreads; i++) {
                 Thread t =  new Thread() {
                     public void run() {
                         try {
                             startGate.await();
                             try {
                                 task.run();
                             finally {
                                 endGate.countDown();
                             }
                         catch (InterruptedException ignored) {
                         }
                     }
                 };
                 t.start();
             }
             long start = System.nanoTime();
             startGate.countDown();
             endGate.await();
             long end = System.nanoTime();
             return end - start;
         }
    }

    2.2、FutureTask

          FutureTask表示可取消的異步計算。利用開始和取消計算的方法、查詢計算是否完成的方法和獲取計算結果的方法,此類提供了對 Future 的基本實現。僅在計算完成時才能獲取結果;如果計算尚未完成,則阻塞 get 方法。一旦計算完成,就不能再重新開始或取消計算。FutureTask的方法摘要如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    boolean cancel( boolean mayInterruptIfRunning)
         試圖取消對此任務的執行。
    protected void done()
         當此任務轉換到狀態 isDone(不管是正常地還是通過取消)時,調用受保護的方法。
    V get()   throws InterruptedException, ExecutionException
         如有必要,等待計算完成,然后獲取其結果。
    V get( long timeout, TimeUnit unit)  throws InterruptedException, ExecutionException, TimeoutException
         如有必要,最多等待為使計算完成所給定的時間之后,獲取其結果(如果結果可用)。
    boolean isCancelled()
         如果在任務正常完成前將其取消,則返回  true
    boolean isDone()
         如果任務已完成,則返回  true
    void run()
         除非已將此 Future 取消,否則將其設置為其計算的結果。
    protected boolean runAndReset()
         執行計算而不設置其結果,然后將此 Future 重置為初始狀態,如果計算遇到異常或已取消,則該操作失敗。
    protected void set(V v)
         除非已經設置了此 Future 或已將其取消,否則將其結果設置為給定的值。
    protected void setException(Throwable t)
         除非已經設置了此 Future 或已將其取消,否則它將報告一個 ExecutionException,並將給定的 throwable 作為其原因。

          FutureTask可以用來表示一些時間較長的計算,這些計算可以在使用計算結果之前啟動,以下代碼就是模擬一個高開銷的計算,我們可以先調用start()方法開始計算,然后在需要結果時,再調用get得到結果:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    public class Preloader {
         ProductInfo loadProductInfo()  throws DataLoadException {
             return null ;
         }
         private final FutureTask<ProductInfo> future =  new FutureTask<ProductInfo>(
                 new Callable<ProductInfo>() {
                     public ProductInfo call()  throws DataLoadException {
                         return loadProductInfo();
                     }
                 });
         private final Thread thread =  new Thread(future);
         public void start() {
             thread.start();
         }
         public ProductInfo get()  throws DataLoadException, InterruptedException {
             try {
                 return future.get();
             catch (ExecutionException e) {
                 Throwable cause = e.getCause();
                 if (cause  instanceof DataLoadException)
                     throw (DataLoadException) cause;
                 else
                     throw new RuntimeException(e);
             }
         }
         interface ProductInfo {
         }
    }
    class DataLoadException  extends Exception {
    }

    2.3、信號量

          從概念上講,信號量維護了一個許可集。如有必要,在許可可用前會阻塞每一個 acquire(),然后等待獲取許可。每個 release() 添加一個許可,從而可能釋放一個正在阻塞的獲取者。但是,不使用實際的許可對象,Semaphore 只對可用許可的號碼進行計數,並采取相應的行動。

          Semaphore 通常用於限制可以訪問某些資源(物理或邏輯的)的線程數目。例如,下面的類使用信號量控制對內容池的訪問:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    class Pool {
        private static final int MAX_AVAILABLE =  100 ;
        private final Semaphore available =  new Semaphore(MAX_AVAILABLE,  true );
        public Object getItem()  throws InterruptedException {
          available.acquire();
          return getNextAvailableItem();
        }
        public void putItem(Object x) {
          if (markAsUnused(x))
            available.release();
        }
        // Not a particularly efficient data structure; just for demo
        protected Object[] items = ... whatever kinds of items being managed
        protected boolean [] used =  new boolean [MAX_AVAILABLE];
        protected synchronized Object getNextAvailableItem() {
          for ( int i =  0 ; i < MAX_AVAILABLE; ++i) {
            if (!used[i]) {
               used[i] =  true ;
               return items[i];
            }
          }
          return null // not reached
        }
        protected synchronized boolean markAsUnused(Object item) {
          for ( int i =  0 ; i < MAX_AVAILABLE; ++i) {
            if (item == items[i]) {
               if (used[i]) {
                 used[i] =  false ;
                 return true ;
               else
                 return false ;
            }
          }
          return false ;
        }
      }

       獲得一項前,每個線程必須從信號量獲取許可,從而保證可以使用該項。該線程結束后,將項返回到池中並將許可返回到該信號量,從而允許其他線程獲取該項。注意,調用 acquire() 時無法保持同步鎖,因為這會阻止將項返回到池中。信號量封裝所需的同步,以限制對池的訪問,這同維持該池本身一致性所需的同步是分開的。

          將信號量初始化為 1,使得它在使用時最多只有一個可用的許可,從而可用作一個相互排斥的鎖。這通常也稱為二進制信號量,因為它只能有兩種狀態:一個可用的許可,或零個可用的許可。按此方式使用時,二進制信號量具有某種屬性(與很多 Lock 實現不同),即可以由線程釋放“鎖”,而不是由所有者(因為信號量沒有所有權的概念)。在某些專門的上下文(如死鎖恢復)中這會很有用。

          Semaphore的構造方法可選地接受一個公平 參數。當設置為 false 時,此類不對線程獲取許可的順序做任何保證。特別地,闖入 是允許的,也就是說可以在已經等待的線程前為調用 acquire() 的線程分配一個許可,從邏輯上說,就是新線程將自己置於等待線程隊列的頭部。當公平設置為 true時,信號量保證對於任何調用獲取方法的線程而言,都按照處理它們調用這些方法的順序(即先進先出;FIFO)來選擇線程、獲得許可。注意,FIFO 排序必然應用到這些方法內的指定內部執行點。所以,可能某個線程先於另一個線程調用了acquire,但是卻在該線程之后到達排序點,並且從方法返回時也類似。還要注意,非同步的tryAcquire 方法不使用公平設置,而是使用任意可用的許可。

          通常,應該將用於控制資源訪問的信號量初始化為公平的,以確保所有線程都可訪問資源。為其他的種類的同步控制使用信號量時,非公平排序的吞吐量優勢通常要比公平考慮更為重要。

          Semaphore還提供便捷的方法來同時 acquire 和釋放多個許可。小心,在未將公平設置為 true 時使用這些方法會增加不確定延期的風險。

          內存一致性效果:線程中調用“釋放”方法(比如 release())之前的操作 happen-before 另一線程中緊跟在成功的“獲取”方法(比如 acquire())之后的操作。

    2.4、柵欄

          CyclicBarrier是一個同步輔助類,它允許一組線程互相等待,直到到達某個公共屏障點。在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此時 CyclicBarrier很有用。因為該 barrier在釋放等待線程后可以重用,所以稱它為循環的barrier。

          CyclicBarrier支持一個可選的Runnable命令,在一組線程中的最后一個線程到達之后(但在釋放所有線程之前),該命令只在每個屏障點運行一次。若在繼續所有參與線程之前更新共享狀態,此屏障操作很有用。

          示例用法:下面是一個在並行分解設計中使用barrier的例子:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    class Solver {
         final int N;
         final float [][] data;
         final CyclicBarrier barrier;
         class Worker  implements Runnable {
             int myRow;
             Worker( int row) {
                 myRow = row;
             }
             public void run() {
                 while (!done()) {
                     processRow(myRow);
                     try {
                         barrier.await();
                     catch (InterruptedException ex) {
                         return ;
                     catch (BrokenBarrierException ex) {
                         return ;
                     }
                 }
             }
         }
         public Solver( float [][] matrix) {
          data = matrix;
          N = matrix.length;
          barrier =  new CyclicBarrier(N,
                          new Runnable() {
                            public void run() {
                              //mergeRows(...);
                            }
                          });
          for ( int i =  0 ; i < N; ++i)
            new Thread( new Worker(i)).start();
          waitUntilDone();
        }
    }

          在這個例子中,每個 worker 線程處理矩陣的一行,在處理完所有的行之前,該線程將一直在屏障處等待。處理完所有的行之后,將執行所提供的 Runnable 屏障操作,並合並這些行。如果合並者確定已經找到了一個解決方案,那么 done() 將返回 true,所有的 worker 線程都將終止。

          如果屏障操作在執行時不依賴於正掛起的線程,則線程組中的任何線程在獲得釋放時都能執行該操作。為方便此操作,每次調用 await() 都將返回能到達屏障處的線程的索引。然后,您可以選擇哪個線程應該執行屏障操作.

          對於失敗的同步嘗試,CyclicBarrier 使用了一種要么全部要么全不 (all-or-none) 的破壞模式:如果因為中斷、失敗或者超時等原因,導致線程過早地離開了屏障點,那么在該屏障點等待的其他所有線程也將通過 BrokenBarrierException以反常的方式離開。

          內存一致性效果:線程中調用 await() 之前的操作 happen-before 那些是屏障操作的一部份的操作,后者依次 happen-before 緊跟在從另一個線程中對應 await() 成功返回的操作。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM