【Java並發編程六】線程池


一、概述

  在執行並發任務時,我們可以把任務傳遞給一個線程池,來替代為每個並發執行的任務都啟動一個新的線程,只要池里有空閑的線程,任務就會分配一個線程執行。在線程池的內部,任務被插入一個阻塞隊列(BlockingQueue),線程池里的線程會去取這個隊列里的任務。

  利用線程池有三個好處:

  1. 降低資源消耗。通過重復利用已創建的線程降低線程創建和銷毀造成的消耗
  2. 提高響應速度。當任務到達時,任務可以不需要的等到線程創建就能立即執行
  3. 提高線程的可管理性。線程是稀缺資源,如果無限制的創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的分配,調優和監控

二、線程池的使用

  1、創建線程池

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, 
  TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

  創建一個線程池需要的幾個參數:

  • corePoolSize:線程池的基本大小,當提交一個任務到線程池,線程池會創建一個線程來執行任務,即使其他空閑的基本線程能夠執行新任務也會創建線程,等到需要執行的任務數大於corePoolSize時就不再創建。如果調用了線程池的prestartAllCoreThreads方法,線程池會提前創建並啟動所有基本線程。
  • maximumPoolSize:線程池最大大小,線程池允許創建的最大線程數,如果隊列滿了,並且已創建的線程數小於最大線程數,則線程池會再創建新的線程執行任務。
  • keepAliveTime:線程活動保持時間,線程池的工作線程空閑后,保持存活的時間。
  • unit:線程活動保持時間的單位
  • workQueue:任務隊列,用於保存等待執行的任務的阻塞隊列,可以選擇以下幾個隊列:

1、ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。

2、LinkedBlockingQueue:一個基於鏈表結構的阻塞隊列,此隊列按FIFO (先進先出) 排序元素,吞吐量通常要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個隊列。

3、SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQueue,靜態工廠方法Executors.newCachedThreadPool使用了這個隊列。

4、PriorityBlockingQueue:一個具有優先級得無限阻塞隊列。

  • threadFactory:用於設置創建線程的工廠,可以通過線程工程給每個創建出來的線程設置更有意義的名字。
  • handler:飽和策略,當線程池和隊列都滿了,說明線程池處於飽和狀態,那么必須采取一種策略處理提交的新任務。這個策略默認情況下是AbortPolicy,表示無法處理新任務時拋出異常。

  此外,我們還可以通過調用Ececutors中的某個靜態工廠方法來創建一個線程池(它們的內部實現原理都是相同的,僅僅是使用了不同的工作隊列或線程池大小):

  newFixedThreadPool:創建一個定長的線程池,每當提交一個任務就創建一個線程,直到達到池的最大長度,這時線程池會保持長度不在變化

  newCachedThreadPool:創建一個可緩存的線程池,如果當前的線程池的長度超過了處理的需要時,它可以靈活的回收空閑的線程,當需求增加時,它可以靈活的添加新的線程,並不會對池的長度做任何限制

  newSingleThreadPool:創建一個單線程化的executor,它只會創建唯一的工作者線程來執行任務

  newScheduledThreadPool:創建一個定長的線程池,而且支持定時的以及周期性的任務執行,類似於Timer

  2、向線程池提交任務

  可以使用execute向線程池提交任務:

public class Test2
{
    public static void main(String[] args)
    {
        BlockingQueue<Runnable> workQueue=new LinkedBlockingDeque<Runnable>();
        ThreadPoolExecutor poolExecutor=new ThreadPoolExecutor(2, 3, 60, TimeUnit.SECONDS, workQueue);
        poolExecutor.execute(new Task1());
        poolExecutor.execute(new Task2());
     poolExecutor.shutdown(); } }
class Task1 implements Runnable { public void run() { System.out.println("執行任務1"); } } class Task2 implements Runnable { public void run() { System.out.println("執行任務2"); } }

  也可以使用submit方法來提交任務,它會返回一個future,我們可以通過這個future來判斷任務是否執行成功,通過future的get方法獲取返回值,get方法會阻塞直到任務完成。

public class Test3
{
    public static void main(String[] args) throws InterruptedException, ExecutionException
    {
        ExecutorService executorService = Executors.newCachedThreadPool();
        List<Future<String>> resultList = new ArrayList<Future<String>>();   
        // 創建10個任務並執行
        for (int i = 0; i < 10; i++)
        {
            // 使用ExecutorService執行Callable類型的任務,並將結果保存在future變量中
            Future<String> future = executorService.submit(new TaskWithResult(i));
            resultList.add(future);
        }
        for (Future<String> future : resultList)
        {
            while (!future.isDone());// Future返回如果沒有完成,則一直循環等待,直到Future返回完成
            {
                System.out.println(future.get()); // 打印各個線程(任務)執行的結果
            }
        }
        executorService.shutdown();
    }
}
class TaskWithResult implements Callable<String>
{
    private int id;
    public TaskWithResult(int id)
    {
        this.id = id;
    }
    public String call() throws Exception
    {
        return "執行結果"+id;
    }
}

 

  3、線程關閉

  可以通過調用線程池的shutdown或shutdownNow方法來關閉線程池,但是它們的實現原理不同,shutdown的原理是只是將線程池的狀態設置成SHUTDOWN狀態,然后中斷所有沒有正在執行任務的線程。shutdownNow的原理是遍歷線程池中的工作線程,然后逐個調用線程的interrupt方法來中斷線程,所以無法響應中斷的任務可能永遠無法終止。shutdownNow會首先將線程池的狀態設置成STOP,然后嘗試停止所有的正在執行或暫停任務的線程,並返回等待執行任務的列表。

  只要調用了這兩個關閉方法的其中一個,isShutdown方法就會返回true。當所有的任務都已關閉后,才表示線程池關閉成功,這時調用isTerminaed方法會返回true。至於我們應該調用哪一種方法來關閉線程池,應該由提交到線程池的任務特性決定,通常調用shutdown來關閉線程池,如果任務不一定要執行完,則可以調用shutdownNow。

三、線程池的執行過程

  整個ThreadExecutor的任務處理經過下面4個步驟,如下圖所示:

  

  1、如果當前的線程數<corePoolSize,提交的Runnable任務,會直接作為new Thread的參數,立即執行,當提交的任務數超過了corePoolSize,就進入第二部操作

  2、將當前的任務提交到BlockingQueue阻塞隊列中,如果Block Queue是個有界隊列,當隊列滿了之后就進入第三步

  3、如果poolSize < maximumPoolsize時,會嘗試new 一個Thread的進行救急處理,執行對應的runnable任務

  4、如果第三步也無法處理,就會用RejectedExecutionHandler來做拒絕處理

四、執行定時及周期性任務

  Timer工具管理任務的定時以及周期性執行。示例代碼如下:

public class TimerTest
{
    final static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 
    public static void main(String[] args)
    {
        TimerTask timerTask1=new TimerTask()
        {
            @Override
            public void run()
            {
                System.out.println("任務1執行時間:"+sdf.format(new Date()));
                try
                {
                    Thread.sleep(3000);//模擬任務1執行時間為3秒
                }
                catch (InterruptedException e)
                {
                    // TODO 自動生成的 catch 塊
                    e.printStackTrace();
                }
            }
        };
        System.out.println("當前時間:"+sdf.format(new Date()));
        Timer timer=new Timer();
        timer.schedule(timerTask1, new Date(),4000); //間隔4秒周期性執行
    }
}

  執行結果:

  

  可以看到上述任務1以4秒為間隔周期性執行。但是Timer存在一些缺陷,主要是下面兩個方面的問題:

  缺陷1Timer只創建唯一的線程的來執行所有的Timer任務,如果一個time任務的執行很耗時,會導致其他的TimeTask的時效准確性出問題。看下面的例子:

public class TimerTest
{
    final static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 
    public static void main(String[] args)
    {
        TimerTask timerTask1=new TimerTask()
        {
            @Override
            public void run()
            {
                System.out.println("任務1執行時間:"+sdf.format(new Date()));
                try
                {
                    Thread.sleep(10000);
                }
                catch (InterruptedException e)
                {
                    // TODO 自動生成的 catch 塊
                    e.printStackTrace();
                }
            }
        };
        TimerTask timerTask2=new TimerTask()
        {
            @Override
            public void run()
            {
                
                System.out.println("任務2執行時間:"+sdf.format(new Date()));
            }
        };
        System.out.println("當前時間:"+sdf.format(new Date()));
        Timer timer=new Timer();
        timer.schedule(timerTask1, new Date(),1000); //間隔1秒周期性執行
        timer.schedule(timerTask2, new Date(),4000); //間隔4秒周期性執行
    }
}

  執行結果:

  

  缺陷2:如果TimeTask拋出未檢查的異常,Timer將產生無法預料的行為。Timer線程並不捕獲線程,所有TimerTask拋出的未檢查的異常會終止timer線程。看下面的代碼:

public class TimerTest2
{
    final static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 
    public static void main(String[] args)
    {
        TimerTask timerTask1=new TimerTask()
        {
            @Override
            public void run()
            {
                System.out.println("任務1執行時間:"+sdf.format(new Date()));
                throw new RuntimeException();
            }
        };
        
        TimerTask timerTask2=new TimerTask()
        {
            @Override
            public void run()
            {
                System.out.println("任務2執行時間:"+sdf.format(new Date()));
            }
        };
        System.out.println("當前時間:"+sdf.format(new Date()));
        Timer timer=new Timer();
        timer.schedule(timerTask1, new Date(),1000); //周期1秒執行任務1
        timer.schedule(timerTask2, new Date() ,3000); //周期3秒執行任務2
        
    }
}

  執行結果為:

  

  

  針對上述的兩個問題,我們可以使用ScheduledThreadPoolExecutor來作為Timer的替代。

  針對問題1,有下面代碼:

public class ScheduledThreadPoolExecutorTest
{
    final static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    public static void main(String[] args)
    {
        TimerTask timerTask1 = new TimerTask()
        {

            @Override
            public void run()
            {
                System.out.println("任務1執行時間:" + sdf.format(new Date()));
                try
                {
                    Thread.sleep(10000);
                } catch (InterruptedException e)
                {
                    // TODO 自動生成的 catch 塊
                    e.printStackTrace();
                }
            }
        };
        TimerTask timerTask2 = new TimerTask()
        {
            @Override
            public void run()
            {
                System.out.println("任務2執行時間:" + sdf.format(new Date()));
            }
        };
        System.out.println("當前時間:" + sdf.format(new Date()));
        ScheduledThreadPoolExecutor poolExecutor=new ScheduledThreadPoolExecutor(2);
        poolExecutor.scheduleAtFixedRate(timerTask1, 0, 1000,TimeUnit.MILLISECONDS);
        poolExecutor.scheduleAtFixedRate(timerTask2,  0, 4000,TimeUnit.MILLISECONDS);
    }
}

  執行的結果為:

  

  針對問題2,有下面代碼:

public class ScheduledThreadPoolExecutorTest2
{
    final static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    public static void main(String[] args)
    {
        TimerTask timerTask1 = new TimerTask()
        {

            @Override
            public void run()
            {
                System.out.println("任務1執行時間:" + sdf.format(new Date()));
                throw new RuntimeException();
            }
        };
        TimerTask timerTask2 = new TimerTask()
        {
            @Override
            public void run()
            {

                System.out.println("任務2執行時間:" + sdf.format(new Date()));
            }
        };
        System.out.println("當前時間:" + sdf.format(new Date()));
        ScheduledThreadPoolExecutor poolExecutor=new ScheduledThreadPoolExecutor(2);
        poolExecutor.scheduleAtFixedRate(timerTask1, 0, 1000, TimeUnit.MILLISECONDS);  
        poolExecutor.scheduleAtFixedRate(timerTask2, 0, 2000, TimeUnit.MILLISECONDS);
    }
}

  執行結果為:

  

五、參考資料

  1、Java並發編程實踐

打賞

免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2021 CODEPRJ.COM