java中使用線程池並發執行方法


我們在開發的過程中,會經常遇到並發執行某個方法。在網上搜索答案的時候,都似乎Thread創建線程,或者就是先給你來一套JMM,線程之間內存,消息通信機制。
這種做法很好,鞏固知識,如果現在就像要一個案例多線程執行方法,大批量的原理性介紹很費時費力,甚至會導致懷疑自己的水平。

現在有個業務需求是這樣的:我要取story和joke兩種類型里的數據。如果是串行操作就是
查詢story,然后再查詢joke。
如果數據量小沒關系,如果數據量大,查詢story要一個小時,查詢joke也要一個小時,串行操作2小時,並發操作小於1小時(實測環境)。

首先創建一個線程池

private ExecutorService cacheExecutor = Executors.newCachedThreadPool();

查詢story和查詢joke,串行操作:

List<String> storySubTypeList = countStory("story");
List<String> jokeSubTypeList = countStory("joke");

那么我們不串行了,讓他們並發執行。從直觀來看速度就是很快了,可以提升一倍。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

//查詢story然后操作數據
cacheExecutor.execute(new Runnable() {
    @Override
    public void run() {
        List<String> storySubTypeList = countStory("story");
        for(int i=1; i<=6; i++) {
            //查詢方法,為了顯示直觀,這行代碼我換行了。
            log.info("call "+this.getClass()+"."
            +Thread.currentThread().getStackTrace()[1].getMethodName()
            +"inputParam:"+storySubTypeList.get(i));
            //查詢方法,為了顯示直觀,這行代碼我換行了。
            List<String> ids = 
            storySearchDao.searchIds("story", 
            storySubTypeList.get(i), Common.STORY_ID_LIST_MAXNUM, 0);

            storyTypeMap.put(storySubTypeList.get(i), ids);
        }
    }
});
//查詢joke然后操作數據
cacheExecutor.execute(new Runnable() {
    @Override
    public void run() {
         List<String> jokeSubTypeList = countStory("joke");
        for(int i=1; i<=6; i++) {
            //打印日志,為了顯示直觀,這行代碼我換行了。
            log.info("call "+this.getClass()+"."
            +Thread.currentThread().getStackTrace()[1].getMethodName()
            +"inputParam:"+jokeSubTypeList.get(i));
            //查詢方法,為了顯示直觀,這行代碼我換行了。
            List<String> ids = 
            storySearchDao.searchIds("story", 
            jokeSubTypeList.get(i), Common.STORY_ID_LIST_MAXNUM, 0);

            storyTypeMap.put(jokeSubTypeList.get(i), ids);
        }
    }
});

代碼里形如:storyTypeMap,storySearchDao,countStory,不用糾結其意思。我這里表達是如何快速實現線程池創建任務,然后使用線程。

在《Java並發編程藝術》一書中介紹到:
(下面的內容可以看,可以不看。建議看,我認為是這本書總結特別好的地方)
通過Executor框架的工具類Executors,可以創建3種類型的ThreadPoolExecutor

FixedThreadPool

FixedThreadPool被稱為可重用固定線程數的線程池。FixedThreadPool的corePoolSize和maximumPoolSize都被設置為創建FixedThreadPool時指定的參數nThreads
當線程池中的線程數大於corePoolSize時,keepAliveTime為多余的空閑線程等待新任務的最長時間,超過這個時間后多余的線程將被終止。這里把keepAliveTime設置為0L,意味着多余的空閑線程會被立即終止

 

對於圖中解釋:
1)如果當前運行的線程數少於corePoolSize,則創建新線程來執行任務。
2)在線程池完成預熱之后(當前運行的線程數等於corePoolSize),將任務加入
LinkedBlockingQueue。
3)線程執行完1中的任務后,會在循環中反復從LinkedBlockingQueue獲取任務來執行。

FixedThreadPool使用無界隊列LinkedBlockingQueue作為線程池的工作隊列(隊列的容量為Integer.MAX_VALUE)。使用無界隊列作為工作隊列會對線程池帶來如下影響。
1)當線程池中的線程數達到corePoolSize后,新任務將在無界隊列中等待,因此線程池中
的線程數不會超過corePoolSize。
2)由於1,使用無界隊列時maximumPoolSize將是一個無效參數。
3)由於1和2,使用無界隊列時keepAliveTime將是一個無效參數。
4)由於使用無界隊列,運行中的FixedThreadPool(未執行方法shutdown()或shutdownNow())不會拒絕任務(不會調用RejectedExecutionHandler.rejectedExecution方法)。

SingleThreadExecutor

SingleThreadExecutor是使用單個worker線程的Executor。FixedThreadPool相同。
SingleThreadExecutor使用無界隊列LinkedBlockingQueue作為線程池的工作隊列(隊列的容量為Integer.MAX_VALUE)。
SingleThreadExecutor使用無界隊列作為工作隊列對線程池帶來的影響與FixedThreadPool相同

 

 

 

CachedThreadPool

CachedThreadPool是一個會根據需要創建新線程的線程池。
CachedThreadPool的corePoolSize被設置為0,即corePool為空;maximumPoolSize被設置為Integer.MAX_VALUE,即maximumPool是無界的。這里把keepAliveTime設置為60L,意味着CachedThreadPool中的空閑線程等待新任務的最長時間為60秒,
空閑線程超過60秒后將會被終止。 FixedThreadPool和SingleThreadExecutor使用無界隊列LinkedBlockingQueue作為線程池的工作隊列。CachedThreadPool使用沒有容量的SynchronousQueue作為線程池的工作隊 列,但CachedThreadPool的maximumPool是無界的。這意味着,如果主線程提交任務的速度高於maximumPool中線程處理任務的速度時,CachedThreadPool會不斷創建新線程。極端情況下,CachedThreadPool會因為創建過多線程而耗盡CPU和內存資源。

 

 

注意:實際開發中不用這個線程池的,用參數可以調整的的線程池+async執行任務,會比這種更優雅

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM