Semaphore 與ThreadPoolExecutor 的使用


 

1、 Semaphore 信號量  (阻塞)

優點:可以控制線程的數量,不會超出線程范圍

缺點:當線程死鎖時,永遠沒法釋放,導致一直阻塞

在java中,提供了信號量Semaphore的支持。

Semaphore類是一個計數信號量,必須由獲取它的線程釋放, 
通常用於限制可以訪問某些資源(物理或邏輯的)線程數目。


一個信號量有且僅有3種操作,且它們全部是原子的:初始化、增加和減少 
增加可以為一個進程解除阻塞; 
減少可以讓一個進程進入阻塞。


如何獲得Semaphore對象? 
    public Semaphore(int permits,boolean fair) 
    permits:初始化可用的許可數目。 
    fair: 若該信號量保證在征用時按FIFO 先進先出的順序授予許可,則為true,否則為false; 
    
如何從信號量獲得許可? 
    public void acquire() throws InterruptedException

如何釋放一個許可,並返回信號量? 
    public void release() 

 

使用方法:

private static final Semaphore coImpInfoMutex = new Semaphore(5);   // 定義五個信號量   

    try
{ logger.info("wait for permit"); coImpInfoMutex.acquire();      // 獲取信號new Thread(new Runnable() { @Override public void run() { try { // doing .... } finally { coImpInfoMutex.release();  // 必須釋放訊號 } } }).start(); } catch (InterruptedException e) { e.printStackTrace(); }

 

 

 

2、ThreadPoolExecutor的使用 

 

1.使用線程池的好處?

第一:降低資源消耗。通過重復利用已創建的線程降低線程創建和銷毀造成的消耗。

第二:提高響應速度。當任務到達時,任務可以不需要等到線程創建就能立即執行。

第三:提高線程的可管理性。線程是稀缺資源,如果無限制的創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的分配,調優和監控。

2.ThreadPoolExecutor的使用

A.線程池的創建

我們可以通過java.util.concurrent.ThreadPoolExecutor來創建一個線程池。

new  ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, milliseconds,runnableTaskQueue, handler);

創建線程池需要的參數介紹:

  • corePoolSize(線程池的基本大小):當提交一個任務到線程池時,線程池會創建一個線程來執行任務,即使其他空閑的基本線程能夠執行新任務也會創建線程,等到需要執行的任務數大於線程池基本大小時就不再創建。如果調用了線程池的prestartAllCoreThreads方法,線程池會提前創建並啟動所有基本線程。

  • runnableTaskQueue(任務隊列):用於保存等待執行的任務的阻塞隊列。 可以選擇以下幾個阻塞隊列。

    • ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。
    • LinkedBlockingQueue:一個基於鏈表結構的阻塞隊列,此隊列按FIFO (先進先出) 排序元素,吞吐量通常要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個隊列。
    • SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個隊列。
    • PriorityBlockingQueue:一個具有優先級的無限阻塞隊列。
  • maximumPoolSize(線程池最大大小):線程池允許創建的最大線程數。如果隊列滿了,並且已創建的線程數小於最大線程數,則線程池會再創建新的線程執行任務。值得注意的是如果使用了無界的任務隊列這個參數就沒什么效果。

  • ThreadFactory:用於設置創建線程的工廠,可以通過線程工廠給每個創建出來的線程設置更有意義的名字。

  • RejectedExecutionHandler(飽和策略):當隊列和線程池都滿了,說明線程池處於飽和狀態,那么必須采取一種策略處理提交的新任務。這個策略默認情況下是AbortPolicy,表示無法處理新任務時拋出異常。以下是JDK1.5提供的四種策略。

    • AbortPolicy:直接拋出異常。
    • CallerRunsPolicy:只用調用者所在線程來運行任務。
    • DiscardOldestPolicy:丟棄隊列里最近的一個任務,並執行當前任務。
    • DiscardPolicy:不處理,丟棄掉。
    • 當然也可以根據應用場景需要來實現RejectedExecutionHandler接口自定義策略。如記錄日志或持久化不能處理的任務。
  • keepAliveTime(線程活動保持時間):線程池的工作線程空閑后,保持存活的時間。所以如果任務很多,並且每個任務執行的時間比較短,可以調大這個時間,提高線程的利用率。

  • TimeUnit(線程活動保持時間的單位):可選的單位有天(DAYS),小時(HOURS),分鍾(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

B.向線程池提交任務

  提交任務有execute()和submit()兩個方法,下面看看他倆的區別:

  ①接收參數不同

  execute()的參數是Runnable,submit()參數可以是Runnable,也可以是Cable。

  ②返回值不同

  execute()沒有返回值,submit()有返回值Future。通過Future可以獲取各個線程的完成情況,是否有異常,還能試圖取消任務的執行。詳見》》》》》》》》

  execute()很好理解,下面看個使用submit()獲取返回值的例子,假設我有很多更新各種數據的task,我希望如果其中一個task失敗,其它的task就不需要執行了。那我就需要catch Future.get拋出的異常,然后終止其它task的執行,代碼如下:

 

public class ThreadPoolTest implements Runnable {   
    public void run() {   
           try{  
               System.out.println(Thread.currentThread().getName());  
               Thread.sleep(3000);  
           }catch (InterruptedException e){  
               e.printStackTrace();  
           }  
    }   
      
    public static void main(String[] args) {   
        BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();  
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 6, 1, TimeUnit.DAYS, queue);  
        for (int i = 0; i < 10; i++) {     
            executor.execute(new Thread(new ThreadPoolTest(), "TestThread".concat(""+i)));     
            int threadSize = queue.size();  
            System.out.println("線程隊列大小為-->"+threadSize);  
        }     
        executor.shutdown();    
    }  
}   

 

c.線程池的關閉

我們可以通過調用線程池的shutdown或shutdownNow方法來關閉線程池,它們的區別詳見 http://www.cnblogs.com/shamo89/p/6703563.html

可以簡單的總結為shutdown()是正常結束線程池,已經添加進去正在執行的線程正常執行,沒添加的線程不會再添加。shutdownNow()則是強制中斷線程池里的線程,但是因為是通過interuppt()來執行的,所以會有局限性,另外該方法會返回未執行的任務。

所以通常調shutdown來正常關閉線程池,如果任務不一定要執行完,則可以調用shutdownNow。

23. 線程池的分析

A.流程分析:線程池的主要工作流程如下圖:

 

從上圖我們可以看出,當提交一個新任務到線程池時,線程池的處理流程如下:

  1. 首先線程池判斷基本線程池是否已滿?沒滿,創建一個工作線程來執行任務。滿了,則進入下個流程。
  2. 其次線程池判斷工作隊列是否已滿?沒滿,則將新提交的任務存儲在工作隊列里。滿了,則進入下個流程。
  3. 最后線程池判斷整個線程池是否已滿?沒滿,則創建一個新的工作線程來執行任務,滿了,則交給飽和策略來處理這個任務。

B.源碼分析

上面的流程分析讓我們很直觀的了解了線程池的工作原理,讓我們再通過源代碼來看看是如何實現的。線程池執行任務的方法如下:

復制代碼
 1 public void execute(Runnable command) {
 2     if (command == null)
 3        throw new NullPointerException();
 4     //如果線程數小於基本線程數,則創建線程並執行當前任務
 5     if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
 6     //如線程數大於等於基本線程數或線程創建失敗,則將當前任務放到工作隊列中。
 7         if (runState == RUNNING && workQueue.offer(command)) {
 8             if (runState != RUNNING || poolSize == 0)
 9                       ensureQueuedTaskHandled(command);
10         }
11     //如果線程池不處於運行中或任務無法放入隊列,並且當前線程數量小於最大允許的線程數量,
12 // 則創建一個線程執行任務。
13         else if (!addIfUnderMaximumPoolSize(command))
14         //拋出RejectedExecutionException異常
15             reject(command); // is shutdown or saturated
16     }
17 }
復制代碼

 

2.4. 合理的配置線程池

要想合理的配置線程池,就必須首先分析任務特性,可以從以下幾個角度來進行分析:

  1. 任務的性質:CPU密集型任務,IO密集型任務和混合型任務。
  2. 任務的優先級:高,中和低。
  3. 任務的執行時間:長,中和短。
  4. 任務的依賴性:是否依賴其他系統資源,如數據庫連接。

任務性質不同的任務可以用不同規模的線程池分開處理。

CPU密集型任務配置盡可能小的線程,如配置Ncpu+1個線程的線程池。

IO密集型任務則由於線程並不是一直在執行任務,則配置盡可能多的線程,如2*Ncpu。

混合型的任務,如果可以拆分,則將其拆分成一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那么分解后執行的吞吐率要高於串行執行的吞吐率,如果這兩個任務執行時間相差太大,則沒必要進行分解。

我們可以通過Runtime.getRuntime().availableProcessors()方法獲得當前設備的CPU個數。

優先級不同的任務可以使用優先級隊列PriorityBlockingQueue來處理。它可以讓優先級高的任務先得到執行,需要注意的是如果一直有優先級高的任務提交到隊列里,那么優先級低的任務可能永遠不能執行。

執行時間不同的任務可以交給不同規模的線程池來處理,或者也可以使用優先級隊列,讓執行時間短的任務先執行。

依賴數據庫連接池的任務,因為線程提交SQL后需要等待數據庫返回結果,如果等待的時間越長CPU空閑時間就越長,那么線程數應該設置越大,這樣才能更好的利用CPU。

建議使用有界隊列,有界隊列能增加系統的穩定性和預警能力,可以根據需要設大一點,比如幾千。

有一次我們組使用的后台任務線程池的隊列和線程池全滿了,不斷的拋出拋棄任務的異常,通過排查發現是數據庫出現了問題,導致執行SQL變得非常緩慢,因為后台任務線程池里的任務全是需要向數據庫查詢和插入數據的,所以導致線程池里的工作線程全部阻塞住,

任務積壓在線程池里。

如果當時我們設置成無界隊列,線程池的隊列就會越來越多,有可能會撐滿內存,導致整個系統不可用,而不只是后台任務出現問題。

當然我們的系統所有的任務是用的單獨的服務器部署的,而我們使用不同規模的線程池跑不同類型的任務,但是出現這樣問題時也會影響到其他任務。

2.1. 線程池的監控

通過線程池提供的參數進行監控。線程池里有一些屬性在監控線程池的時候可以使用

  • taskCount:線程池需要執行的任務數量。
  • completedTaskCount:線程池在運行過程中已完成的任務數量。小於或等於taskCount。
  • largestPoolSize:線程池曾經創建過的最大線程數量。通過這個數據可以知道線程池是否滿過。如等於線程池的最大大小,則表示線程池曾經滿了。
  • getPoolSize:線程池的線程數量。如果線程池不銷毀的話,池里的線程不會自動銷毀,所以這個大小只增不+ getActiveCount:獲取活動的線程數。

 

 

3、線程ThreadPoolTaskExecutor的使用
    

// 配合spring使用

<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="30" />
        <property name="keepAliveSeconds" value="200" />
        <property name="maxPoolSize" value="50" />
    </bean>

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM