Java的並發神器concurrent包詳解(一)


 

在JDK 1.5之前,提到並發,java程序員們一般想到的是wait()、notify()、Synchronized關鍵字等,但是並發除了要考慮競態資源、死鎖、資源公平性等問題,往往還需要考慮性能問題,在一些業務場景往往還會比較復雜,這些都給java coder們造成不小的難題。JDK 1.5的concurrent包幫我們解決了不少問題。

 

Concurrent包中包含了幾個比較常用的並發模塊,這個系列,LZ就和大家一起來學習各個模塊,Let’s Go!

 

一、線程池的基本用法

一般並發包里有三個常用的線程池實例化方法,在Executors這個工廠類中。

  • ·newFixedThreadPool(int size):

創建一個可重用固定線程數的線程池,以共享的無界隊列(LinkedBlockingQueue)方式來運行這些線程。任何時刻最多只有size大小的線程在執行各自任務。當線程池中所有線程都在運行,后續新來的線程將在無界隊列中等待,直到有空閑線程為止。如果有線程在執行任務期間終止退出,一個新的線程會替代原先的線程繼續后面任務的執行。線程池中的線程會一直存在,除非顯示的關閉某個線程。

 

ExecuterService fixedPool = Executors. newFixedThreadPool(5); //創建一個固定包含5個線程的線程池

 

  • ·newCachedThreadPool()

創建一個可以根據需要創建線程的線程池,當一個任務過來,如果線程池中有空閑可用線程,先用空閑線程,沒有就創建一個新線程。這種線程池對於執行short-lived asynchronous tasks(短期異步任務)通常會提高性能。默認空閑60s的線程會從線程池中移除,如果長時間空閑的話,這個線程池是不會占用任何資源的。但是以我個人的經驗,這種線程池要慎用,用不好經常會造成堆外內存溢出,從而造成應用反應慢,最后導致宕機。

 

ExecuterService fixedPool = Executors. newCachedThreadPool();

 

  • ·newSingleThreadExecutor()

創建一個使用單個 worker 線程的 Executor,以無界隊列方式來運行該線程。可以保證順序執行各個任務。有人說這不就是newFixedThreadPool(1)么,錯矣。看源碼就知道外部還包了一層FinalizableDelegatedExecutorService,這個是Executors的一個靜態內部類,這個內部類繼承了DelegatedExecutorService,返回的這個ExecutorService實現只包含ExecutorService接口定義的一些方法,不同於newFixedThreadPool有自己擴展的方法。源碼注釋的區別是:newSingleThreadExecutor可以保證使用其他線程時,返回的Executor不用重新配置。

ExecuterService fixedPool = Executors. newSingleThreadExecutor();

 

 

  • ·newScheduledThreadPool(int size)

創建一個線程池,它可安排在給定延遲后運行命令或者定期地執行。這是個無界大小的線程池。其中主要的執行方法是scheduleAtFixedRate和scheduleWithFixedDelay方法,最終的核心方法是delayedExecute,這兩個方法有什么區別呢,簡單來說,scheduleAtFixedRate是以任務執行時間開始為延遲時間起點的,scheduleWithFixedDelay是以任務執行結束時間為延時時間起點的。

 

 

接下來對並發包里面常用的一些類進行使用解析

 

1)  信號量 Semaphore:

一個計數信號量,維護了一個許可集。通俗的講,就是對有限共享資源記錄使用情況。

 

舉個日常生活中的例子好了:有20個人去登記信息表格,但是只有3只筆,一個人用完之后給下面一個人用,直到所有人都用完。

 

沒有並發包之前,我們是怎么設計的呢?

 

定義一個變量a,用synchronized修飾,值為3,新建20個線程模擬20個人訪問,每個人用筆之前,先看是否有空余的筆,如果有,就使用,並將變量a減少1,使用完成之后,再放回去,即將變量a加1,各個線程之間對於變量a的訪問需要互斥進行。Synchronized是一個重量級的同步操作,涉及到用戶態與核心態的切換,所以性能一般。

 

讓我們再看看信號量是怎么實現的,Semaphore實現的思想跟上面差不多。只不過Semaphore已經幫我們做了同步處理,有兩個關鍵的方法:

  • ·acquire() 獲取一個信號量許可
  • ·release() 釋放一個信號量許可

此外還有其他一些方法:availablePermits() 返回信號量可用的許可數,talk is cheap show me the code:

 

public class SemaphoreTest extends Thread{

  

   private String name;

   private Semaphore sh;

  

   public SemaphoreTest(String name,Semaphore sh){

      this.name = name;

      this.sh = sh;

   }

 

   public void run(){

      if(sh.availablePermits()>0){

         System.out.println("有筆");

      }else{

         System.out.println("筆沒了,等等");

      }

      try {

         sh.acquire();//信號量減1

         System.out.println(this.name+"號在用筆");

         Thread.sleep((long) (Math.random()*1000));

         sh.release();

         System.out.println(this.name+"號用完了");

      } catch (Exception e) {

         // TODO Auto-generated catch block

         e.printStackTrace();

      }

   }

  

   public static void main(String[] args) {

      ExecutorService es = Executors.newCachedThreadPool();

      Semaphore sh = new Semaphore(3);

      for(int i=0;i<20;i++){

         es.submit(new SemaphoreTest(i+"",sh));

      }

      es.shutdown();

   }

  

}

 

上述代碼直接運行即可。

 

 

這個信號量是如何同步資源的,就需要讀讀jdk相關源碼了。核心類是AbstractQueuedSynchronizer,該類實現了一個FIFO的列表,列表中Node表示列表節點,該Node有prev、next、Thread等屬性,表示前節點后節點以及線程等。並發包中每個需要實現個性化同步機制的都要擴展該類,以實現不同的同步功能。比如Semaphore、countDownLatch、CyclicBarrier等,具體原理要分析源碼,篇幅較長,這里不做展開,有興趣的可以研究一下,對volatile關鍵字、CAS等都會有更深的理解。

 

 

2)  CountDownLatch

用給定的計數 初始化 CountDownLatch。由於調用了 countDown() 方法,所以在當前計數到達零之前,await 方法會一直受阻塞。之后,會釋放所有等待的線程,await 的所有后續調用都將立即返回。

比較重要的2個方法:countDown()和await()

 

下面來個常見的例子:導游帶了20人團到了飯店吃飯,要等到20個人到齊了,才開始吃飯,還要等到20人都吃完了,才可以繼續下個景點。

來看看不用並發包你會怎么實現,首先會設置20個人到來以及吃完的標志,每個線程過來更改自己的標志,主程序for或者while不停的循環監聽,制止全部到來以及吃完才繼續。如果是100人1000人呢,未免臃腫。

 

CountDownLatch的實現:

public class CountDownLautchTest {

  

   public static void main(String[] args) {

     

      ExecutorService es = Executors.newFixedThreadPool(20);

      final CountDownLatch endFlag = new CountDownLatch(20);

      final CountDownLatch startFlag = new CountDownLatch(1);

      final CountDownLatch comeFlag = new CountDownLatch(20);

 

 

      for(int i=0;i<20;i++){

         final int j = i + 1;

         Runnable person = new Runnable(){

            @Override

            public void run() {

                System.out.println(j+"號游客來了");

                comeFlag.countDown();

                try {

                   startFlag.await();

                   Thread.sleep(1000);

                } catch (InterruptedException e) {

                   // TODO Auto-generated catch block

                   e.printStackTrace();

                } finally {

                   System.out.println(j+"號吃完了");

                   endFlag.countDown();

                }

            }

           

         };

         es.submit(person);

      }

      try {

         comeFlag.await();   //保證所有人都到齊了

         System.out.println("人都齊了,大家一起吃飯");

         startFlag.countDown();//開吃

         endFlag.await();//等待所有人都吃完了

         System.out.println("全都吃完了,繼續下個景點");

      } catch (InterruptedException e) {

         // TODO Auto-generated catch block

         e.printStackTrace();

      }

      es.shutdown();

   }

 

}

 

CountDownLatch是一種不可逆的降序計數操作,所以上述代碼里面定義了2個分別表示等候以及吃飯的標志。如果需要重復計數,就需要用到下面這個類:CyclicBarrier。

 

3) CyclicBarrier

一個同步輔助類,它允許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)。在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此時 CyclicBarrier 很有用。因為該 barrier 在釋放等待線程后可以重用,所以稱它為循環 的 barrier。

 

還是上述的例子吧,再加一點場景,吃完飯還要游玩,游玩結束返程回家。讓我們來看看CyclicBarrier是如何實現的,上代碼:

 

public class CyclicBarrierTest {

  

   public static void main(String[] args) {

      final CyclicBarrier barrier = new CyclicBarrier(20);

      final ExecutorService es = Executors.newFixedThreadPool(20);

      System.out.println("****等人齊****");

      for(int i=0;i<20;i++){

         final int j = i+1;

         Runnable person = new Runnable(){

            @Override

            public void run() {

                try {

                   System.out.println(j+"號來了");

                   if(barrier.await()==0){

                      System.out.println("****人都到齊了****");

                   }

                   System.out.println(j+"號開吃");

                   if(barrier.await()==0){

                      System.out.println("****吃完了出發****");

                   }

                   System.out.println(j+"號玩好了");

                   if(barrier.await()==0){

                      System.out.println("****游玩結束回家****");

                   }

                } catch (Exception e) {

                   // TODO Auto-generated catch block

                   e.printStackTrace();

                }

            }

           

         };

         es.submit(person);

      }

      es.shutdown();

   }

  

}

 

代碼直接運行即可,應該比較好懂吧,這里有個小竅門,代碼中if(barrier.await()==0)這個條件,有點類似於CountDownLatch的await往下執行的條件了,每次判斷都會減1,減到0,即完成所有線程的等待,繼續下面的操作。

 

結語

         關於並發包一些常用的類就介紹到這里吧,工作中具體怎么使用要具體分析各個業務場景,選擇合適的方法,隨機應變,觸類旁通吧,后續會介紹一下Lock源碼以及實現,涉及AQS的原理等。

 

未完,待續……


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM