Java線程池Executor&ThreadPool


  java自1.5版本之后,提供線程池,供開發人員快捷方便的創建自己的多線程任務。下面簡單的線程池的方法及說明。

  1、Executor 

    線程池的頂級接口。定義了方法execute(Runnable),該方法接收一個Runnable實例,用來執行一個任務,該任務即是一個實現Runnable接口的類。

    此服務方法無返回值,原因是因為實現Runnable接口的類的run方法是無返回(void)的。

    常用方法 : void execute(execute)

    作用 : 啟動並執行線程任務

  2、ExecutorService

    繼承自Executor接口,提供了更多的方法調用,例如優雅關閉方法shutdown,有返回值的submit。

    2.1、ExecutorService生命周期

      運行 - Running 、關閉 - shuttingdown、終止 - terminated

      Running : 線程池正在執行中,活動狀態。創建后即進入此狀態

      shuttingdown : 優雅關閉,線程池正在關閉中。不再接收新的線程任務,已有的任務(正在處理的 + 隊列中阻塞的),處理完畢后,關閉線程池。

              調用shutdown()方法,即進入此狀態

      terminated : 線程池已關閉。

    2.2、submit方法

      有返回值,Future類型。重載了方法,submit(Runnable)不需要提供返回值。submit(Callable)、submit(Runnable,T)可以提供線程執行后的結果返回值。

    2.3、Future

      線程執行完畢結果。獲取線程執行結果是通過get()方法獲取。get()無參,阻塞等待線程執行結束。

      get(long timeout, TimeUnit unit)有參,阻塞等待固定時長,超時未獲取,則拋出異常。

    2.4、Callable

      類似Runnable的一個線程接口。其中的對應run的方法是call方法。此接口提供了線程執行完畢返回值。

package com.cn.cfang.executor;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Test {
    public static void main(String[] args) throws Exception{
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        Data data = new Data();
//        Future<Data> future = executorService.submit(new Task(data), data); //runnable
        Future<Data> future = executorService.submit(new Task1(data)); //callable
        System.out.println(future.get().getName());
        executorService.shutdown();
    }
}

class Data {
    String name;
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
}

class Task implements Runnable{
    Data data;
    public Task(Data data) {
        this.data = data;
    }
    @Override
    public void run() {
         data.setName("hello world");
    }
}

class Task1 implements Callable<Data>{
    Data data;
    public Task1(Data data) {
        this.data = data;
    }
    @Override
    public Data call() throws Exception {
        data.setName("hello world");
        return data;
    }
    
}

   3、Executors工具類

    提供了很多的工廠方法用於創建線程池,返回的線程池都實現了ExecutorService接口。

線程池屬於進程級的重量級資源,默認的生命周期同JVM一致,當開啟線程池后,直到jvm關閉,是線程池的默認的生命周期。
如果手動調用shutdown方法,可優雅關閉線程池,在當前所有任務執行結束后,關閉線程池。

  4、幾種常用的線程池

    4.1、FixedThreadPool

     容量固定的線程池。使用LinkedBlockingQueue作為任務隊列,當任務數量大於線程池容量的時候,未執行的任務進入任務等待隊列LinkedBlockingQueue中,

     當線程有空閑的時候,自動從隊列中取出任務執行。

        使用場景: 大多數情況下,推薦使用的線程池。因為os系統和硬件是有線程上限限制的,不可能去無限的提供線程池操作。

    4.2、CachedThreadPool

      緩存線程池。容量 0-Integer.MAX_VALUE,自動根據任務數擴容:如果線程池中的線程數不滿足任務執行需求,則創建新的線程並添加到池中。

      生命周期默認60s,當線程空閑時長到60s的時候,自動終止銷毀釋放線程,移除線程池。

      使用場景 : 可用於測試最高負載量,用於對FixedThreadPool容量的參考。

      注意,放入CachedThreadPool的線程不必擔心其結束,超過TIMEOUT(默認60s)不活動,其會自動被終止。 

    4.3、ScheduledThreadPool

      定時及周期性的任務執行的線程池,多數情況下可用來替代Timer類。

public static void main(String[] args) {
        ScheduledExecutorService service = Executors.newScheduledThreadPool(3);
        System.out.println(service);
        
        // 定時完成任務。 scheduleAtFixedRate(Runnable, start_limit, limit, timeunit)
        // runnable - 要執行的任務。
     // start_limit - 第一次執行任務的時間間隔
     // limit - 多次任務執行的時間間隔
// timeunit - 時間單位 service.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); } }, 0, 300, TimeUnit.MILLISECONDS); }

    4.4、SingleThreadExceutor 單一容量線程池。

    4.5、自定義線程池

      自定義線程池,可以使用ThreadPoolExecutor類來進行創建管理。線程池中,除了ForkJoinPool外,其他常用的線程池底層,都是使用ThreadPoolExecutor實現的。

      參數說明:

        corePoolSize:核心線程數,也是最少線程數。在創建線程池時,默認情況下,是不會創建線程池的,也即此時的線程池中線程數為0,直到有任務來臨時,才會去創建線程。當然,手動調用prestartCoreThread()或者prestartAllCoreThreads()方法,可以初始化創建線程池中的線程。默認情況下,當有任務來臨時,就會創建新的線程去處理執行,即使此時線程池中有空閑的線程。當線程數達到corePoolSize時,線程數不增加,此時任務會放入等待隊列BlockingQueue中。

        workQueue:阻塞隊列,用來存儲等待執行的任務資源。

          maximumPoolSize:最大線程數。當阻塞隊列滿了,開始擴充線程池中的線程數。直到達到此最大值的時候。

        handler:當線程池中的線程數等於maximumPoolSize的時候,此時再來任務的話,交由此拒絕策略執行。

        keepAliveTime:表示的線程在空閑多長時間后會被終止。默認是在線程數大於corePoolSize才生效,也可以手動設置allowCoreThreadTimeOut()方法讓線程數在不大於  corePoolSize也生效。

 public ThreadPoolExecutor(
             int corePoolSize, //核心容量,創建線程池的時候,默認有多少的線程數。也是最少線程數
                   int maximumPoolSize, //最大線程數
                   long keepAliveTime,  //線程生命周期,0為永久。當線程空閑多長時間,自動回收。
                   TimeUnit unit,  //生命周期時間單位。
                  BlockingQueue<Runnable> workQueue,  //任務阻塞隊列。
           RejectedExecutionHandler handler
      ) {     
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);       }

      簡單例子:

package com.cn.cfang.executor;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Test2 {

    public static void main(String[] args){ 
        //創建等待隊列 
        BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(20); 
        //創建線程池,池中保存的線程數為3,允許的最大線程數為5
        ThreadPoolExecutor pool = new ThreadPoolExecutor(3,5,50,TimeUnit.MILLISECONDS,bqueue); 
        //創建七個任務 
        Runnable t1 = new MyThread(); 
        Runnable t2 = new MyThread(); 
        Runnable t3 = new MyThread(); 
        Runnable t4 = new MyThread(); 
        Runnable t5 = new MyThread(); 
        Runnable t6 = new MyThread(); 
        Runnable t7 = new MyThread(); 
        //每個任務會在一個線程上執行
        pool.execute(t1); 
        pool.execute(t2); 
        pool.execute(t3); 
        pool.execute(t4); 
        pool.execute(t5); 
        pool.execute(t6); 
        pool.execute(t7); 
        //關閉線程池 
        pool.shutdown(); 
    } 
}

class MyThread implements Runnable{ 
    @Override
    public void run(){ 
        System.out.println(Thread.currentThread().getName() + "正在執行。。。"); 
        try{ 
            Thread.sleep(100); 
        }catch(InterruptedException e){ 
            e.printStackTrace(); 
        } 
    } 
}

 5、forkjoin框架

    拆分合並,將一個大的任務,拆分成若干子任務,並最終匯總子任務的執行結果,得到大任務的執行結果。並行執行,采用工作竊取機制,更加有效的利用cpu資源。

    5.1、主要類

      ForkJoinPool : 用於執行Task。任務分割出的子任務會添加到當前工作線程所維護的雙端隊列中,進入隊列的頭部。

              當一個工作線程的隊列里暫時沒有任務時,它會隨機從其他未完成工作線程的隊列的尾部獲取一個任務。

      ForkJoinTask:ForkJoin任務,提供在任務中執行fork()和join()操作的機制(二叉分邏輯),通常不直接繼承ForkJoinTask類,

             而是繼承抽象子類RecursiveTask(有返回結果) 或者 RecursiveAction (無返回結果)。

      ForkJoinWorkerThread:ForkJoinPool 內部的worker thread,用來具體執行ForkJoinTask。內部有 ForkJoinPool.WorkQueue,來保存要執行的 ForkJoinTask。

      ForkJoinPool.WorkQueue:保存要執行的ForkJoinTask。

   5.2、工作竊取機制

      1、大任務分割成N個子任務,為避免線程競爭,於是分開幾個隊列去保存這些子任務,並為每個隊列提供一個工作線程去處理其中的任務。工作線程與任務隊列一一對應。

      2、如果A線程執行完自己隊列中的所有任務,如果此時其他隊列中還有未執行的任務,則A線程會去竊取一個其他隊列的任務來執行。但是,此時兩個線程同時訪問,

        可能會產生競爭問題,所以,任務隊列設計成了雙向隊列。A線程竊取的時候,從另一端開始執行,盡可能的去避免線程競爭問題。

      3、工作竊取機制,充分的利用線程資源,並盡可能的去避免線程間的競爭問題。但是,只能是盡可能避免,並不能規避。例如,雙向隊列只有一個任務。

     5.3、簡單使用

      例:求和 0 - 10000000000L。

package com.cn.cfang.executor;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

/**
 * 步驟:
 *  1、建立任務類Task,繼承RecursiveTask或者RecursiveAction。需要返回值則選用RecursiveTask,無需返回值選用RecursiveAction
 *  2、任務類Task,滿足一定的閾值,則對子任務進行計算,不滿足,則二叉分后,遞歸調用自身
 *  3、調用中,新建ForkJoinPool對象,新建任務類對象Task,將任務類對象Task放入ForkJoinPool中執行。
 *     如果需要返回值,則可以invoke或者Future-submit。
 * @author cfang
 * 2018年5月15日 上午10:51:03
 */
public class Test3 {
    
    public static void main(String[] args) throws Exception{
        ForkJoinPool pool = new ForkJoinPool();
        ForkJoinWorkTask task = new ForkJoinWorkTask(0l, 10000000l);
//        Long result = pool.invoke(task);
//        System.out.println(result);
        Future<Long> future = pool.submit(task);
        System.out.println(future.get());
    }
    
}

class ForkJoinWorkTask extends RecursiveTask<Long>{

    private static final long serialVersionUID = 1L;
    
    private Long start;    //起始
    private Long end;    //終止
    private static final Long THRESHOLD = 10000L; //子任務分割閾值
    
    public ForkJoinWorkTask(Long start, Long end){
        this.start = start;
        this.end = end;
    }
    
    @Override
    protected Long compute() {
        Long sum = 0l;
        if(end - start <= THRESHOLD){ //足夠小的子任務,進行計算求和
            for(Long i = start; i < end; i++){
                sum += i;
            }
        }else{ //任務拆分不滿足,繼續拆分(二叉分邏輯)
            Long middle = (start + end) / 2;
            ForkJoinWorkTask rightTask = new ForkJoinWorkTask(start, middle);
            rightTask.fork();
            ForkJoinWorkTask leftTask = new ForkJoinWorkTask(middle + 1, end);
            leftTask.fork();
            sum = rightTask.join() + leftTask.join();
        }
        return sum;
    }
    
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM