線程池核心參數


一.線程池核心參數
public ThreadPoolExecutor(

int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);

corePoolSize:池中的線程數,即使其處於IDLE狀態

maximumPoolSize:池中允許的最大線程數

keepAliveTime:當線程數大於核心時,空閑線程在終止之前等待新任務的最長時間

unit:keepAliveTime的時間單位

workQueue:隊列,用於在執行task之前保存task

handler:當達到了線程邊界和隊列容量,無法及時處理時,reject task使用的處理程序

/**
* @param corePoolSize 線程池基本大小,核心線程池大小,活動線程小於corePoolSize則直接創建,大於等於則先加到workQueue中,
* 隊列滿了才創建新的線程。當提交一個任務到線程池時,線程池會創建一個線程來執行任務,即使其他空閑的基本線程能夠執行新任務也會創建線程,
* 等到需要執行的任務數大於線程池基本大小時就不再創建。如果調用了線程池的prestartAllCoreThreads()方法,
* 線程池會提前創建並啟動所有基本線程。
* @param maximumPoolSize 最大線程數,超過就reject;線程池允許創建的最大線程數。如果隊列滿了,
* 並且已創建的線程數小於最大線程數,則線程池會再創建新的線程執行任務
* @param keepAliveTime
* 線程池的工作線程空閑后,保持存活的時間。所以,如果任務很多,並且每個任務執行的時間比較短,可以調大時間,提高線程的利用率
* @param unit 線程活動保持時間的單位):可選的單位有天(DAYS)、小時(HOURS)、分鍾(MINUTES)、
* 毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和納秒(NANOSECONDS,千分之一微秒)
* @param workQueue 工作隊列,線程池中的工作線程都是從這個工作隊列源源不斷的獲取任務進行執行
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
// threadFactory用於設置創建線程的工廠,可以通過線程工廠給每個創建出來的線程設置更有意義的名字
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

 二.向線程池提交任務

可以使用兩個方法向線程池提交任務,分別為execute()和submit()方法。execute()方法用於提交不需要返回值的任務,所以無法判斷任務是否被線程池執行成功。通過以下代碼可知execute()方法輸入的任務是一個Runnable類的實例。

threadsPool.execute(new Runnable() {

@Override

public void run() {

   }

});

submit()方法用於提交需要返回值的任務。線程池會返回一個future類型的對象,通過這個future對象可以判斷任務是否執行成功,並且可以通過future的get()方法來獲取返回值,get()方法會阻塞當前線程直到任務完成,而使用get(long timeout,TimeUnit unit)方法則會阻塞當前線程一段時間后立即返回,這時候有可能任務沒有執行完。

Future<Object> future = executor.submit(harReturnValuetask);
  try
    {
        Object s = future.get();
    }catch(
    InterruptedException e)
    {
        // 處理中斷異常
    }catch(
    ExecutionException e)
    {
        // 處理無法執行任務異常
    }finally
    {
        // 關閉線程池
        executor.shutdown();
    }

三.關閉線程池

可以通過調用線程池的shutdown或shutdownNow方法來關閉線程池。它們的原理是遍歷線程池中的工作線程,然后逐個調用線程的interrupt方法來中斷線程,所以無法響應中斷的任務可能永遠無法終止。但是它們存在一定的區別,shutdownNow首先將線程池的狀態設置成STOP,然后嘗試停止所有的正在執行或暫停任務的線程,並返回等待執行任務的列表,而shutdown只是將線程池的狀態設置成SHUTDOWN狀態,然后中斷所有沒有正在執行任務的線程。
只要調用了這兩個關閉方法中的任意一個,isShutdown方法就會返回true。當所有的任務都已關閉后,才表示線程池關閉成功,這時調用isTerminaed方法會返回true。至於應該調用哪一種方法來關閉線程池,應該由提交到線程池的任務特性決定,通常調用shutdown方法來關閉線程池,如果任務不一定要執行完,則可以調用shutdownNow方法。

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
 
/**
 * @Author:Zach
 * @Description: 定制屬於自己的非阻塞線程池
 * @Date:Created in 15:26 2018/8/14
 * @Modified By:
 */
public class CustomThreadPoolExecutor {
    private ThreadPoolExecutor pool = null;
 
    /**
     * 線程池初始化方法
     *
     * corePoolSize 核心線程池大小----10
     * maximumPoolSize 最大線程池大小----30
     * keepAliveTime 線程池中超過corePoolSize數目的空閑線程最大存活時間----30+單位TimeUnit
     * TimeUnit keepAliveTime時間單位----TimeUnit.MINUTES
     * workQueue 阻塞隊列----new ArrayBlockingQueue<Runnable>(10)====10容量的阻塞隊列
     * threadFactory 新建線程工廠----new CustomThreadFactory()====定制的線程工廠
     * rejectedExecutionHandler 當提交任務數超過maxmumPoolSize+workQueue之和時,
     *                             即當提交第41個任務時(前面線程都沒有執行完,此測試方法中用sleep(100)),
     *                                   任務會交給RejectedExecutionHandler來處理
     */
 
    public void init() {
 
        pool = new ThreadPoolExecutor(10,30,30,
                TimeUnit.MINUTES,new ArrayBlockingQueue<Runnable>(10),new CustomThreadFactory(), new CustomRejectedExecutionHandler());
    }
 
    public void destory() {
        if(pool !=null) {
            pool.shutdownNow();
        }
    }
 
    public ExecutorService getCustomThreadPoolExecutor() {
        return this.pool;
    }
 
 
    private class CustomRejectedExecutionHandler implements  RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            //記錄異常
            System.out.println("error...................");
        }
    }
 
    private class CustomThreadFactory implements ThreadFactory {
 
        private AtomicInteger count = new AtomicInteger(0);
 
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
           String threadName =  CustomThreadPoolExecutor.class.getSimpleName()+count.addAndGet(1);
 
            System.out.println(threadName);
            t.setName(threadName);
            return t;
        }
    }
 
    public static void main(String[] args){
        CustomThreadPoolExecutor exec = new CustomThreadPoolExecutor();
 
        //1. 初始化
        exec.init();
 
        ExecutorService pool = exec.getCustomThreadPoolExecutor();
 
        for(int i=1;i<100;i++) {
            System.out.println("提交第"+i+"個任務");
            pool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(">>>task is running========");
                       Thread.sleep(3000);
                    }catch (InterruptedException e){
                        e.printStackTrace();
                    }
                }
            });
        }
 
        //2. 銷毀----此處不能銷毀,因為任務沒有提交執行完,如果銷毀線程池,任務也就無法執行
        //exec.destory();
 
        try {
            Thread.sleep(10000);
        }catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
 
    /**
     * 方法中建立一個核心線程數為30個,緩沖隊列有10個的線程池。每個線程任務,執行時會先睡眠3秒,保證提交10任務時,線程數目被占用完,再提交30任務時,阻塞隊列被占用完,,這樣提交第41個任務是,會交給CustomRejectedExecutionHandler 異常處理類來處理。
     提交任務的代碼如下:
     
     /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    /**
     public void execute(Runnable command) {
     if (command == null)
     throw new NullPointerException();
     int c = ctl.get();
     if (workerCountOf(c) < corePoolSize) {
     if (addWorker(command, true))
     return;
     c = ctl.get();
     }
     if (isRunning(c) && workQueue.offer(command)) {
     int recheck = ctl.get();
     if (! isRunning(recheck) && remove(command))
     reject(command);
     else if (workerCountOf(recheck) == 0)
     addWorker(null, false);
     }
     else if (!addWorker(command, false))
     reject(command);
     }
        注意:41以后提交的任務就不能正常處理了,因為,execute中提交到任務隊列是用的offer方法,如上面代碼,
        這個方法是非阻塞的,所以就會交給CustomRejectedExecutionHandler 來處理,
         所以對於大數據量的任務來說,這種線程池,如果不設置隊列長度會OOM,設置隊列長度,會有任務得不到處理,接下來我們構建一個阻塞的自定義線程池
     */
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM