Java線程池ExecutorService


系統里面用到了線程池:

 private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(16, 32,
            5L, TimeUnit.MINUTES,
            new ArrayBlockingQueue<>(1000), new ThreadFactory() {

        private final ThreadGroup threadGroup = new ThreadGroup("fileTemplateMethodThreadGroup");

        private final AtomicInteger threadNumber = new AtomicInteger(1);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(threadGroup, r, "fileTemplateMethod-thread-pool-" + threadNumber.getAndIncrement());
        }
    }, (r, executor) -> {
        if (!executor.isShutdown()) {
            /* 丟棄隊列最老的數據 */
            if (executor.getQueue().poll() != null) {
                Cat.logMetricForCount(CatConstant.METRIC_DISCARD_FILE_TASK_COUNT);
            }
            executor.execute(r);
        }
    });

我查了一下:一 Java通過Executors提供四種線程池,分別為: 
newCachedThreadPool創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。 
newFixedThreadPool 創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待。 
newScheduledThreadPool 創建一個定長線程池,支持定時及周期性任務執行。 
newSingleThreadExecutor 創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。

我奇怪我們的沒有這四個,但是有個:

 
         
private static final ExecutorService EXECUTOR_SERVICE = 
new ThreadPoolExecutor(16, 32, 5L, TimeUnit.MINUTES, new ArrayBlockingQueue<>(1000), new ThreadFactory() {})

其實我們用的就是:newFixedThreadPool ,剛才那四個線程池 你在看一下實現,其實就是我們現在用的。

以下是newFixedThreadPool 的實現:

Executors.newFixedThreadPool()
==>
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

 

看到了吧,跟我們的寫法是一樣的,只是:用了不同的 LinkedBlockingQueue 也就是說當創建的線程超過最大線程數的時候,會放到 無界隊列里面

LinkedBlockingQueue 我們的是有界隊列:ArrayBlockingQueue

開篇前,我們先來看看不使用線程池的情況:

new Thread的弊端

執行一個異步任務你還只是如下new Thread嗎?  

new Thread(new Runnable() {
 
    @Override
    public void run() {
        // TODO Auto-generated method stub
    }
}).start();

 

那你就太out了,new Thread的弊端如下:
a. 每次new Thread新建對象性能差。
b. 線程缺乏統一管理,可能無限制新建線程,相互之間競爭,及可能占用過多系統資源導致死機或oom。
c. 缺乏更多功能,如定時執行、定期執行、線程中斷。
相比new Thread,Java提供的四種線程池的好處在於:
a. 重用存在的線程,減少對象創建、消亡的開銷,性能佳。
b. 可有效控制最大並發線程數,提高系統資源的使用率,同時避免過多資源競爭,避免堵塞。
c. 提供定時執行、定期執行、單線程、並發數控制等功能。  

一 Java通過Executors提供四種線程池,分別為: 
newCachedThreadPool創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。 
newFixedThreadPool 創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待。 
newScheduledThreadPool 創建一個定長線程池,支持定時及周期性任務執行。 
newSingleThreadExecutor 創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。

二、 ExecutorService 的submit() 與execute()區別 
1、接收的參數不一樣 submit()可以接受runnable和callable  有返回值
execute()接受runnable 無返回值

2、submit有返回值,而execute沒有

Method submit extends base method Executor.execute by creating and returning a Future that can be used to cancel execution and/or wait for completion.

用到返回值的例子,比如說我有很多個做validation的task,我希望所有的task執行完,然后每個task告訴我它的執行結果,是成功還是失敗,如果是失敗,原因是什么。

3、submit方便Exception處理

There is a difference when looking at exception handling. If your tasks throws an exception and if it was submitted with execute this exception will Go to the uncaught exception handler (when you don’t have provided one explicitly, the default one will just print the stack trace to System.err). If you submitted the task with submit any thrown exception, checked or not, is then part of the task’s return status. For a task that was submitted with submit and that terminates with an exception, the Future.get will rethrow this exception, wrapped in an ExecutionException.

意思就是如果你在你的task里會拋出checked或者unchecked exception,而你又希望外面的調用者能夠感知這些exception並做出及時的處理,那么就需要用到submit,通過捕獲Future.get拋出的異常。

復制代碼
import java.util.ArrayList;  
import java.util.List;  
import java.util.Random;  
import java.util.concurrent.Callable;  
import java.util.concurrent.ExecutionException;  
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;  
import java.util.concurrent.Future;  

public class ExecutorServiceTest {  
    public static void main(String[] args) {  
        ExecutorService executorService = Executors.newCachedThreadPool();  
        List<Future<String>> resultList = new ArrayList<Future<String>>();  

        // 創建10個任務並執行  
        for (int i = 0; i < 10; i++) {  
            // 使用ExecutorService執行Callable類型的任務,並將結果保存在future變量中  
            Future<String> future = executorService.submit(new TaskWithResult(i));  
            // 將任務執行結果存儲到List中  
            resultList.add(future);  
        }  
        executorService.shutdown();  

        // 遍歷任務的結果  
        for (Future<String> fs : resultList) {  
            try {  
                System.out.println(fs.get()); // 打印各個線程(任務)執行的結果  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            } catch (ExecutionException e) {  
                executorService.shutdownNow();  
                e.printStackTrace();  
                return;  
            }  
        }  
    }  
}  

class TaskWithResult implements Callable<String> {  
    private int id;  

    public TaskWithResult(int id) {  
        this.id = id;  
    }  

    /** 
     * 任務的具體過程,一旦任務傳給ExecutorService的submit方法,則該方法自動在一個線程上執行。 
     *  
     * @return 
     * @throws Exception 
     */  
    public String call() throws Exception {  
        System.out.println("call()方法被自動調用,干活!!!             " + Thread.currentThread().getName());  
        if (new Random().nextBoolean())  
            throw new TaskException("Meet error in task." + Thread.currentThread().getName());  
        // 一個模擬耗時的操作  
        for (int i = 999999999; i > 0; i--)  
            ;  
        return "call()方法被自動調用,任務的結果是:" + id + "    " + Thread.currentThread().getName();  
    }  
}  

class TaskException extends Exception {  
    public TaskException(String message) {  
        super(message);  
    }  
}  
復制代碼

執行的結果類似於:

call()方法被自動調用,干活!!!             pool-1-thread-1 
call()方法被自動調用,干活!!!             pool-1-thread-2 
call()方法被自動調用,干活!!!             pool-1-thread-3 
call()方法被自動調用,干活!!!             pool-1-thread-5 
call()方法被自動調用,干活!!!             pool-1-thread-7 
call()方法被自動調用,干活!!!             pool-1-thread-4 
call()方法被自動調用,干活!!!             pool-1-thread-6 
call()方法被自動調用,干活!!!             pool-1-thread-7 
call()方法被自動調用,干活!!!             pool-1-thread-5 
call()方法被自動調用,干活!!!             pool-1-thread-8 
call()方法被自動調用,任務的結果是:0    pool-1-thread-1 
call()方法被自動調用,任務的結果是:1    pool-1-thread-2 
java.util.concurrent.ExecutionException: com.cicc.pts.TaskException: Meet error in task.pool-1-thread-3 
    at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222) 
    at java.util.concurrent.FutureTask.get(FutureTask.java:83) 
    at com.cicc.pts.ExecutorServiceTest.main(ExecutorServiceTest.java:29) 
Caused by: com.cicc.pts.TaskException: Meet error in task.pool-1-thread-3 
    at com.cicc.pts.TaskWithResult.call(ExecutorServiceTest.java:57) 
    at com.cicc.pts.TaskWithResult.call(ExecutorServiceTest.java:1) 
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:138) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) 
    at java.lang.Thread.run(Thread.java:619) 

 

 

可以看見一旦某個task出錯,其它的task就停止執行。

三、shotdown() showdownNow()區別

可以關閉 ExecutorService,這將導致其拒絕新任務。提供兩個方法來關閉 ExecutorService。 
shutdown() 方法在終止前允許執行以前提交的任務, 
shutdownNow() 方法阻止等待任務啟動並試圖停止當前正在執行的任務。在終止時執行程序沒有任務在執行,也沒有任務在等待執行,並且無法提交新任務。關閉未使用的 ExecutorService 以允許回收其資源。 
一般分兩個階段關閉 ExecutorService。第一階段調用 shutdown 拒絕傳入任務,然后調用 shutdownNow(如有必要)取消所有遺留的任務

1
2
// 啟動一次順序關閉,執行以前提交的任務,但不接受新任務。
     threadPool.shutdown();

 

四、Runnable()與Callable()區別

如果是一個多線程協作程序,比如菲波拉切數列,1,1,2,3,5,8…使用多線程來計算。 
但后者需要前者的結果,就需要用callable接口了。 
callable用法和runnable一樣,只不過調用的是call方法,該方法有一個泛型返回值類型,你可以任意指定。

runnable接口實現的沒有返回值的並發編程。 

 
這里寫圖片描述

callable實現的存在返回值的並發編程。(call的返回值String受泛型的影響) 使用Future獲取返回值。 
這里寫圖片描述

(1). newCachedThreadPool
創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。示例代碼如下:

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
    final int index = i;
    try {
        Thread.sleep(index * 1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
 
    cachedThreadPool.execute(new Runnable() {
 
        @Override
        public void run() {
            System.out.println(index);
        }
    });
}

 

線程池為無限大,當執行第二個任務時第一個任務已經完成,會復用執行第一個任務的線程,而不用每次新建線程。

 
(2). newFixedThreadPool
創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待。示例代碼如下:

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
    final int index = i;
    fixedThreadPool.execute(new Runnable() {
 
 
        @Override
        public void run() {
            try {
                System.out.println(index);
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    });
}

 

因為線程池大小為3,每個任務輸出index后sleep 2秒,所以每兩秒打印3個數字。

定長線程池的大小最好根據系統資源進行設置。如Runtime.getRuntime().availableProcessors()。可參考PreloadDataCache。
 
(3) newScheduledThreadPool
創建一個定長線程池,支持定時及周期性任務執行。延遲執行示例代碼如下:

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.schedule(new Runnable() {
 
    @Override
    public void run() {
        System.out.println("delay 3 seconds");
    }
}, 3, TimeUnit.SECONDS);

 

 

表示延遲3秒執行。
 
定期執行示例代碼如下:

scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
 
    @Override
    public void run() {
        System.out.println("delay 1 seconds, and excute every 3 seconds");
    }
}, 1, 3, TimeUnit.SECONDS);

 

 

表示延遲1秒后每3秒執行一次。
ScheduledExecutorService比Timer更安全,功能更強大,后面會有一篇單獨進行對比。
 
(4)、newSingleThreadExecutor
創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。示例代碼如下:

ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
    final int index = i;
    singleThreadExecutor.execute(new Runnable() {
 
        @Override
        public void run() {
            try {
                System.out.println(index);
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    });
}

 

 

結果依次輸出,相當於順序執行各個任務。
現行大多數GUI程序都是單線程的。Android中單線程可用於數據庫操作,文件操作,應用批量安裝,應用批量刪除等不適合並發但可能IO阻塞性及影響UI線程響應的操作。  

  總結:

    (1)使用ExecutorService的submit函數由於execute函數

    (2)異常如何處理,異常后其他task停止

參考:Java ExecutorService四種線程池的例子與說明 

參考:線程池 ExecutorService 詳細介紹以及注意點區別  

參考:Java線程池ExecutorService


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM