Java多線程並發工具類


Semaphore-信號燈機制

當我們創建一個可擴展大小的線程池,並且需要在線程池內同時讓有限數目的線程並發運行時,就需要用到Semaphore(信號燈機制),Semaphore 通常用於限制可以訪問某些資源(物理或邏輯的)的線程數目,它是一個計數信號量,從概念上講,信號量維護了一個許可集合,如有必要,在許可可用前會阻塞每一個acquire(),然后再獲取該許可,每個release() 添加一個許可,從而可能釋放一個正在阻塞的獲取者。        

        在線程池內創建線程並運行時,每個線程必須從信號量獲取許可,從而保證可以使用該項。該線程結束后,線程返回到池中並將許可返回到該信號量,從而允許其他線程獲取該項。注意,調用acquire() 時無法保持同步鎖定,因為這會阻止線程返回到池中。信號量封裝所需的同步,以限制對池的訪問,這同維持該池本身一致性所需的同步是分開的。下面通過一個例子加以說明: 

 
public class SemaphoreTest {  
    public static void main(String[] args) {  
        ExecutorService service = Executors.newCachedThreadPool();  
        final  Semaphore sp = new Semaphore(3);  
        for(int i=0;i<5;i++){  
            Runnable runnable = new Runnable(){  
                    public void run(){  
                    try {  
                        sp.acquire();  
                    } catch (InterruptedException e1) {  
                        e1.printStackTrace();  
                    }  
                    System.out.println("線程" + Thread.currentThread().getName() +   
                            "進入,當前已有" + (3-sp.availablePermits()) + "個並發");  
                    try {  
                        Thread.sleep((long)(Math.random()*10000));  
                    } catch (InterruptedException e) {  
                        e.printStackTrace();  
                    }  
                    System.out.println("線程" + Thread.currentThread().getName() +   
                            "即將離開");                      
                    sp.release();  
                    //下面代碼有時候執行不准確,因為其沒有和上面的代碼合成原子單元  
                    System.out.println("線程" + Thread.currentThread().getName() +   
                            "已離開,當前已有" + (3-sp.availablePermits()) + "個並發");  
                }  
            };  
            service.execute(runnable);            
        }  
    }  
}  

       該例子定義了一個newCachedThreadPool,在該Pool中利用for循環同時創建5個線程,現在通過Semaphore,創建一個只允許在線程池中有3個線程並發運行,sp.acquire()表示某個線程獲得了一個信號燈,開始運行,在運行結束時,通過sp.release()還回這個信號燈,以便剩下的線程獲得信號燈運行,sp.availablePermits()指的是當前信號燈庫中有多少個可以被使用,由於例子中定義有3個信號燈,所以3-sp.availablePermits()就代表了當前有多少個線程在並發運行,上例運行結果如下:

線程pool-1-thread-1進入,當前已有2個並發  
線程pool-1-thread-2進入,當前已有2個並發  
線程pool-1-thread-3進入,當前已有3個並發  
線程pool-1-thread-1即將離開  
線程pool-1-thread-1已離開,當前已有2個並發  
線程pool-1-thread-4進入,當前已有3個並發  
線程pool-1-thread-3即將離開  
線程pool-1-thread-3已離開,當前已有2個並發  
線程pool-1-thread-5進入,當前已有3個並發  
線程pool-1-thread-2即將離開  
線程pool-1-thread-2已離開,當前已有2個並發  
線程pool-1-thread-4即將離開  
線程pool-1-thread-4已離開,當前已有1個並發  
線程pool-1-thread-5即將離開  
線程pool-1-thread-5已離開,當前已有0個並發  

Semaphore作為互斥鎖使用:

       當信號量初始化為 1,使得它在使用時最多只有一個可用的許可,從而可用作一個相互排斥的鎖。這通常也稱為二進制信號量,因為它只能有兩種狀態:一個可用的許可,或零個可用的許可。按此方式使用時,與傳統互斥鎖最大不同就是在釋放的時候並不是必須要擁有鎖的對象釋放,也可以由其他的對象釋放,因為信號量沒有所有權的概念。在某些專門的上下文(如死鎖恢復)中這會很有用。

Semaphore的構造方法有兩種:

第一種:

Semaphore(int permits) //用給定的許可數和非公平的公平設置創建一個 Semaphore。

       第一種構造方法創建的信號燈,現在在獲取的時候是隨機的,沒有一定的順序,例如上例中,在前三個線程中的一個運行完畢以后,釋放一個信號燈,剩下的兩個線程就會隨機的一個線程得到這個信號燈而運行。

第二種:

Semaphore(int permits, boolean fair) //用給定的許可數和給定的公平設置創建一個 Semaphore  

      第二種構造方法可選地接受一個公平 參數。當設置為 false 時,此類不對線程獲取許可的順序做任何保證。特別地,闖入 是允許的,也就是說可以在已經等待的線程前為調用acquire() 的線程分配一個許可,從邏輯上說,就是新線程將自己置於等待線程隊列的頭部。當公平設置為 true 時,信號量保證對於任何調用acquire() 方法的線程而言,都按照處理它們調用這些方法的順序(即先進先出;FIFO)來選擇線程、獲得許可。注意,FIFO 排序必然應用到這些方法內的指定內部執行點。所以,可能某個線程先於另一個線程調用了acquire(),但是卻在該線程之后到達排序點,並且從方法返回時也類似。還要注意,非同步的tryAcquire() 方法不使用公平設置,而是使用任意可用的許可。
      通常,應該將用於控制資源訪問的信號量初始化為公平的,以確保所有線程都可訪問資源。為其他的種類的同步控制使用信號量時,非公平排序的吞吐量優勢通常要比公平考慮更為重要。

 

CyclicBarrier-用路障實現分階段線程並發

生活中我們常常會遇到這樣的情景:10個朋友邀約去公園玩,彼此約好上午十點在小區門口集合然后一塊租車過去,可能上午九點就會有人開始到了門口,但是因為人沒有來全,必須等剩下的人,最后等到人全后大家一塊到公園,到達公園后又約定分頭開始玩,下午6點的時候公園門口集合,然后一塊回去。在我們java編程過程中也會遇到類似的情況,要求必須幾個線程都運行完后才可以進行下一步的操作,這就用到了循環路障類--CyclicBarrier。

       CyclicBarrier一個同步輔助類,它允許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)。在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此時 CyclicBarrier 很有用。因為該 barrier 在釋放等待線程后可以重用,所以稱它為循環的 barrier。它 還支持一個可選的 Runnable 命令,在一組線程中的最后一個線程到達之后(但在釋放所有線程之前),該命令只在每個屏障點運行一次。若在繼續所有參與線程之前更新共享狀態,此屏障操作很有用。

      CyclicBarrier有兩種構造方法:

第一種:

CyclicBarrier(int parties) //創建一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啟動,但它不會在每個 barrier 上執行預定義的操作  

第二種:

CyclicBarrier(int parties, Runnable barrierAction)   // 創建一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啟動,並在啟動 barrier 時執行給定的屏障操作,該操作由最后一個進入 barrier 的線程執行。  

下面通過一個實例進行說明:

public class CyclicBarrierTest {  
  
 public static void main(String[] args) {  
  ExecutorService service = Executors.newCachedThreadPool();  
  final  CyclicBarrier cb = new CyclicBarrier(3);  
  for(int i=0;i<3;i++){  
   Runnable runnable = new Runnable(){  
     public void run(){  
     try {  
      Thread.sleep((long)(Math.random()*10000));   
      System.out.println("線程" + Thread.currentThread().getName() +   
        "即將到達集合地點1,當前已有" + (cb.getNumberWaiting()+1) + "個已經到達," + (cb.getNumberWaiting()==2?"都到齊了,繼續走啊":"正在等候"));        
      cb.await();  
        
      Thread.sleep((long)(Math.random()*10000));   
      System.out.println("線程" + Thread.currentThread().getName() +   
        "即將到達集合地點2,當前已有" + (cb.getNumberWaiting()+1) + "個已經到達," + (cb.getNumberWaiting()==2?"都到齊了,繼續走啊":"正在等候"));  
      cb.await();   
      Thread.sleep((long)(Math.random()*10000));   
      System.out.println("線程" + Thread.currentThread().getName() +   
        "即將到達集合地點3,當前已有" + (cb.getNumberWaiting() + 1) + "個已經到達," + (cb.getNumberWaiting()==2?"都到齊了,繼續走啊":"正在等候"));        
      cb.await();        
     } catch (Exception e) {  
      e.printStackTrace();  
     }      
    }  
   };  
   service.execute(runnable);  
  }  
  service.shutdown();  
 }  
}  

      該例中定義了一個循環路障類:CyclicBarrier cb = new CyclicBarrier(3);在路障中設定必須要三個線程等待( cb.await())的時候才可以往下運行,從而達到下一個集合點。該例利用Thread.sleep((long)(Math.random()*10000)); 產生一個隨機的休眠時間,用它來實現不同線程到達路障時的時間不同,到的早的線程需要等待到的晚的線程,當3個線程都cb,.await();時。路障打開,3個線程再往下運行。程序運行的結果如下:

線程pool-1-thread-2即將到達集合地點1,當前已有1個已經到達,正在等候  
線程pool-1-thread-1即將到達集合地點1,當前已有2個已經到達,正在等候  
線程pool-1-thread-3即將到達集合地點1,當前已有3個已經到達,都到齊了,繼續走啊  
線程pool-1-thread-1即將到達集合地點2,當前已有1個已經到達,正在等候  
線程pool-1-thread-2即將到達集合地點2,當前已有2個已經到達,正在等候  
線程pool-1-thread-3即將到達集合地點2,當前已有3個已經到達,都到齊了,繼續走啊  
線程pool-1-thread-3即將到達集合地點3,當前已有1個已經到達,正在等候  
線程pool-1-thread-1即將到達集合地點3,當前已有2個已經到達,正在等候  
線程pool-1-thread-2即將到達集合地點3,當前已有3個已經到達,都到齊了,繼續走啊  

       上例中是通過for循環產生了剛好3個線程,如果把產生的線程數改成4個的話,在這4個線程當中,只要有3個到達路障的時候,路障就會打開,其運行結果如下:

線程pool-1-thread-3即將到達集合地點1,當前已有1個已經到達,正在等候  
線程pool-1-thread-1即將到達集合地點1,當前已有2個已經到達,正在等候  
線程pool-1-thread-4即將到達集合地點1,當前已有3個已經到達,都到齊了,繼續走啊  
線程pool-1-thread-2即將到達集合地點1,當前已有1個已經到達,正在等候  
線程pool-1-thread-1即將到達集合地點2,當前已有2個已經到達,正在等候  
線程pool-1-thread-4即將到達集合地點2,當前已有3個已經到達,都到齊了,繼續走啊  
線程pool-1-thread-2即將到達集合地點2,當前已有1個已經到達,正在等候  
線程pool-1-thread-3即將到達集合地點2,當前已有2個已經到達,正在等候  
線程pool-1-thread-4即將到達集合地點3,當前已有3個已經到達,都到齊了,繼續走啊  
線程pool-1-thread-3即將到達集合地點3,當前已有1個已經到達,正在等候  
線程pool-1-thread-2即將到達集合地點3,當前已有2個已經到達,正在等候  
線程pool-1-thread-1即將到達集合地點3,當前已有3個已經到達,都到齊了,繼續走啊  

  

CountDownLatch-線程並發的發令槍

田徑賽百米短跑時,運動員會在起跑線做准備動作,等到發令槍一聲響,運動員就會奮力奔跑。在多線程運行時,也有這么一個發令槍--CountDownLatch,它通過控制事先定義的計數來控制線程的運行。

       CountDownLatch的構造方法如下:

CountDownLatch(int count); //構造一個用給定計數初始化的 CountDownLatch。  

       主要用到的方法有:

void await(); //使當前線程在鎖存器倒計數至零之前一直等待,除非線程被中斷。  
boolean await(long timeout, TimeUnit unit); //使當前線程在鎖存器倒計數至零之前一直等待,除非線程被中斷或超出了指定的等待時間。  
void countDown(); //遞減鎖存器的計數,如果計數到達零,則釋放所有等待的線程。  
long getCount();  // 返回當前計數。  
String toString(); //返回標識此鎖存器及其狀態的字符串。  

       CountDownLatch是一個同步輔助類,在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待。對於給定的計數 初始化 CountDownLatch,可以調用了 countDown() 方法,在當前計數到達零之前,await() 方法會一直受阻塞,當計數到達零時,會釋放所有等待的線程,await() 的所有后續調用都將立即返回。但這種現象只出現一次——計數無法被重置,如果需要重置計數,請考慮使用 CyclicBarrier。

       CountDownLatch 是一個通用同步工具,它有很多用途,將計數  初始化為1的 CountDownLatch可 用作一個簡單的開/關鎖存器或入口,在通過調用 countDown() 的線程打開入口前,所有調用 await() 的線程都一直在入口處等待。用 N 初始化的 CountDownLatch 可以使一個線程在 N 個線程完成某項操作之前一直等待,或者使其在某項操作完成 N 次之前一直等待。

       CountDownLatch 的一個有用特性是,它不要求調用 countDown() 方法的線程等到計數到達零時才繼續,而在所有線程都能通過之前,它只是阻止任何線程繼續通過await(),它比CyclicBarrier有更大的靈活性,它可以控制不確定數目的線程,而不是像CyclicBarrier在確定數目的線程wait()時就會通過,只有當countDown()的值為0時才允許所有的線程通過。

       下面通過一個例子加以說明:

public class CountdownLatchTest {  
    public static void main(String[] args) {  
        ExecutorService service = Executors.newCachedThreadPool();  
        final CountDownLatch cdOrder = new CountDownLatch(1);  
        final CountDownLatch cdAnswer = new CountDownLatch(3);        
        for(int i=0;i<3;i++){  
            Runnable runnable = new Runnable(){  
                    public void run(){  
                    try {  
                        System.out.println("線程" + Thread.currentThread().getName() +   
                                "正准備接受命令");           
                        cdOrder.await();  
                        System.out.println("線程" + Thread.currentThread().getName() +   
                        "已接受命令");                                 
                        Thread.sleep((long)(Math.random()*10000));    
                        System.out.println("線程" + Thread.currentThread().getName() +   
                                "回應命令處理結果");                  
                        cdAnswer.countDown();                         
                    } catch (Exception e) {  
                        e.printStackTrace();  
                    }                 
                }  
            };  
            service.execute(runnable);  
        }         
        try {  
            Thread.sleep((long)(Math.random()*10000));  
          
            System.out.println("線程" + Thread.currentThread().getName() +   
                    "即將發布命令");                        
            cdOrder.countDown();  
            System.out.println("線程" + Thread.currentThread().getName() +   
            "已發送命令,正在等待結果");      
            cdAnswer.await();  
            System.out.println("線程" + Thread.currentThread().getName() +   
            "已收到所有響應結果");     
        } catch (Exception e) {  
            e.printStackTrace();  
        }                 
        service.shutdown();  
    }  
}  

       上例中main方法表示主線程,並且通過for循環創建3個子線程,定義了兩個CountDownLatch對象:cdOrder和cdAnswer,cdOrder計數為1,用來控制3個子線程,cdAnswer計數為3,用來控制主線程。當程序開始運行時,主線程和子線程同時開始運行,由於主線程需要sleep一段時間,所以3個子線程運行,但是碰到cdOrder.await();必須等到主線程cdOrder.countDown();將計數變成0時才可以繼續往下運行,主線程運行到cdAnswer.await();時等待,只有當三個子線程都cdAnswer.countDown();將計數變為0時主線程才可以往下運行。改程序運行結果如下:

線程pool-1-thread-2正准備接受命令  
線程pool-1-thread-3正准備接受命令  
線程pool-1-thread-1正准備接受命令  
線程main即將發布命令  
線程main已發送命令,正在等待結果  
線程pool-1-thread-3已接受命令  
線程pool-1-thread-2已接受命令  
線程pool-1-thread-1已接受命令  
線程pool-1-thread-1回應命令處理結果  
線程pool-1-thread-3回應命令處理結果  
線程pool-1-thread-2回應命令處理結果  
線程main已收到所有響應結果  

Exchanger-兄弟線程的信息交換

 如果兩個線程在運行過程中需要交換彼此的信息,比如一個數據或者使用的空間,就需要用到Exchanger這個類,Exchanger為線程交換信息提供了非常方便的途徑,它可以作為兩個線程交換對象的同步點,只有當每個線程都在進入 exchange ()方法並給出對象時,才能接受其他線程返回時給出的對象。

       Exchanger的構造方法如下:

Exchanger();  //創建一個新的 Exchanger。  

       Exchanger用到的主要方法有:

exchange(V x);  //等待另一個線程到達此交換點(除非它被中斷),然后將給定的對象傳送給該線程,並接收該線程的對象。  
exchange(V x, long timeout, TimeUnit unit);   // 等待另一個線程到達此交換點(除非它被中斷,或者超出了指定的等待時間),然后將給定的對象傳送給該線程,同時接收該線程的對象。

     下面通過例子來加以說明:

public class ExchangerTest {  
    public static void main(String[] args) {  
        ExecutorService service = Executors.newCachedThreadPool();  
        final Exchanger exchanger = new Exchanger();  
        service.execute(new Runnable(){  
            public void run() {  
                try {                 
                    String data1 = "zxx";  
                    System.out.println("線程" + Thread.currentThread().getName() +   
                    "正在把數據" + data1 +"換出去");  
                    Thread.sleep((long)(Math.random()*10000));  
                    String data2 = (String)exchanger.exchange(data1);  
                    System.out.println("線程" + Thread.currentThread().getName() +   
                    "換回的數據為" + data2);  
                }catch(Exception e){      
                }  
            }     
        });  
        service.execute(new Runnable(){  
            public void run() {  
                try {                 
                    String data1 = "lhm";  
                    System.out.println("線程" + Thread.currentThread().getName() +   
                    "正在把數據" + data1 +"換出去");  
                    Thread.sleep((long)(Math.random()*10000));                    
                    String data2 = (String)exchanger.exchange(data1);  
                    System.out.println("線程" + Thread.currentThread().getName() +   
                    "換回的數據為" + data2);  
                }catch(Exception e){      
                }                 
            }     
        });       
    }  
}  

        上例main()方法中創建了2個線程,兩個線程都有各自String類型的data,並且各自的sleep時間都不一樣。類中定義了一個Exchanger對象,作為兩個線程交換數據的通道,當其中一個線程運行exchanger.exchange();方法時,由於沒有另外一個線程還沒有開始執行這個交換方法,所以就需要等到另外一個線程也提出交換時,兩個線程才可以完成信息的交換。其運行結果如下:

線程pool-1-thread-1正在把數據zxx換出去  
線程pool-1-thread-2正在把數據lhm換出去  
線程pool-1-thread-2換回的數據為zxx  
線程pool-1-thread-1換回的數據為lhm  

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM