(四)juc線程高級特性——線程池 / 線程調度 / ForkJoinPool


 13. 線程池

第四種獲取線程的方法:線程池,一個 ExecutorService,它使用可能的幾個池線程之一執行每個提交的任務,通常使用 Executors 工廠方法配置。

線程池可以解決兩個不同問題:由於減少了每個任務調用的開銷,它們通常可以在執行大量異步任務時提供增強的性能,並且還可以提供綁定和管理資源(包括執行任務集時使用的線程)的方法。每個 ThreadPoolExecutor 還維護着一些基本的統計數據,如完成的任務數。

為了便於跨大量上下文使用,此類提供了很多可調整的參數和擴展鈎子 (hook)。但是,強烈建議程序員使用較為方便的 Executors 工廠方法 :

  • Executors.newCachedThreadPool()(無界線程池,可以進行自動線程回收)
  • Executors.newFixedThreadPool(int)(固定大小線程池)
  • Executors.newSingleThreadExecutor()(單個后台線程)

它們均為大多數使用場景預定義了設置。

創建包含5個線程的線程池,對變量進行增加操作

/*
 * 一、線程池:提供了一個線程隊列,隊列中保存着所有等待狀態的線程。避免了創建與銷毀額外開銷,提高了響應的速度。
 * 
 * 二、線程池的體系結構:
 *     java.util.concurrent.Executor : 負責線程的使用與調度的根接口
 *         |--**ExecutorService 子接口: 線程池的主要接口
 *             |--ThreadPoolExecutor 線程池的實現類
 *             |--ScheduledExecutorService 子接口:負責線程的調度
 *                 |--ScheduledThreadPoolExecutor :繼承 ThreadPoolExecutor, 實現 ScheduledExecutorService
 * 
 * 三、工具類 : Executors 
 * ExecutorService newFixedThreadPool() : 創建固定大小的線程池
 * ExecutorService newCachedThreadPool() : 緩存線程池,線程池的數量不固定,可以根據需求自動的更改數量。
 * ExecutorService newSingleThreadExecutor() : 創建單個線程池。線程池中只有一個線程
 * 
 * ScheduledExecutorService newScheduledThreadPool() : 創建固定大小的線程,可以延遲或定時的執行任務。
 */
public class TestThreadPool {
    
    public static void main(String[] args) throws Exception {
        //1. 創建線程池
        ExecutorService pool = Executors.newFixedThreadPool(5);

        ThreadPoolDemo tpd = new ThreadPoolDemo();
        
        //2. 為線程池中的線程分配任務,>5,可將線程池里的五個線程都給調用
        for (int i = 0; i < 10; i++) {
            pool.submit(tpd);
        }
        
        //3. 關閉線程池
        pool.shutdown();
    }    
//    new Thread(tpd).start();
//    new Thread(tpd).start();
}

class ThreadPoolDemo implements Runnable{

    private int i = 0;
    
    @Override
    public void run() {
        while(i <= 100){
            System.out.println(Thread.currentThread().getName() + " : " + i++);
        }
    }   
}

線程池結合Callable和Future創建線程

public static void main(String[] args) throws Exception {
    //1. 創建線程池
    ExecutorService pool = Executors.newFixedThreadPool(5);
    
    List<Future<Integer>> list = new ArrayList<>();
    
    for (int i = 0; i < 10; i++) {
        //Future對象用於接收Callable線程的返回值
        Future<Integer> future = pool.submit(new Callable<Integer>(){
            //線程調用方法,查詢1-100之和
            @Override
            public Integer call() throws Exception {
                int sum = 0;
                for (int i = 0; i <= 100; i++) {
                    sum += i;
                }
                return sum;
            }          
        });
        list.add(future);
    }
    //關閉線程池
    pool.shutdown();
    //遍歷結果集,會輸出10次5050
    for (Future<Integer> future : list) {
        System.out.println(future.get());
    }        
}  

14. 線程調度

接口ScheduledExecutorService 繼承自 ExecutorService接口,由ScheduledThreadPoolExecutor類(ThreadPoolExecutor類的子類)實現,可安排在給定的延遲后運行或定期執行的命令。

ScheduledExecutorService newScheduledThreadPool() : 創建固定大小的線程,可以延遲或定時的執行任務。

參考java.util.concurrent.ScheduledThreadPoolExecutor.class中schedule方法源碼

public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        if (callable == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<V> t = decorateTask(callable,
            new ScheduledFutureTask<V>(callable,
                                       triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

示例:

public class TestScheduledThreadPool {
    public static void main(String[] args) throws Exception {
    //創建ScheduledExecutorService類型的線程池對象
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(5);
for (int i = 0; i < 5; i++) { Future<Integer> result = pool.schedule(new Callable<Integer>(){ @Override public Integer call() throws Exception { int num = new Random().nextInt(100);//生成隨機數 System.out.println(Thread.currentThread().getName() + " : " + num); return num; } }, 1, TimeUnit.SECONDS);
System.out.println(result.get()); }
//線程池關閉 pool.shutdown(); } }

15. ForkJoinPool 分支合並框架-工作竊取

Fork/Join 框架:就是在必要的情況下,將一個大任務,進行拆分(fork)成若干個小任務(拆到不可再拆時),再將一個個的小任務運算的結果進行 join 匯總。

  

 /* @since 1.7
 *  @author Doug Lea
 */
public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
...
}
  • 采用 “工作竊取”模式(work-stealing):當執行新的任務時它可以將其拆分分成更小的任務執行,並將小任務加到線程隊列中,然后再從一個隨機線程的隊列中偷一個並把它放在自己的隊列中。

                   

  • 相對於一般的線程池實現,fork/join框架的優勢體現在對其中包含的任務的處理方式上.在一般的線程池中,如果一個線程正在執行的任務由於某些原因無法繼續運行,那么該線程會處於等待狀態。而在fork/join框架實現中,如果某個子問題由於等待另外一個子問題的完成而無法繼續運行。那么處理該子問題的線程會主動尋找其他尚未運行的子問題來執行.這種方式減少了線程的等待時間,提高了性能。

jdk1.7之后提供了兩個Fork/Join 框架,兩個框架最大區別為是否有返回值

//有返回值
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {}
//無返回值
public abstract class RecursiveAction extends ForkJoinTask<Void> {}

下面為一實現示例(求兩數之間所有數之和,如1-100——>5050):

class ForkJoinSumCalculate extends RecursiveTask<Long>{

    private static final long serialVersionUID = -1812835340478767238L;
    
    private long start;
    private long end;
    
    private static final long THURSHOLD = 10000L;  //臨界值
    
    public ForkJoinSumCalculate(long start, long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        long length = end - start;
        //小於臨界值,則不進行拆分,直接計算初始值到結束值之間所有數之和
        if(length <= THURSHOLD){
            long sum = 0L;
            
            for (long i = start; i <= end; i++) {
                sum += i;
            }
            
            return sum;
        }else{  //大於臨界值,取中間值進行拆分,遞歸調用
            long middle = (start + end) / 2;
            
            ForkJoinSumCalculate left = new ForkJoinSumCalculate(start, middle); 
            left.fork(); //進行拆分,同時壓入線程隊列
            
            ForkJoinSumCalculate right = new ForkJoinSumCalculate(middle+1, end);
            right.fork(); //
            
            return left.join() + right.join();
        }
    }    
}

測試1-50000000000的和:

public static void main(String[] args) {
     Instant start = Instant.now();        
     ForkJoinPool pool = new ForkJoinPool();        
     ForkJoinTask<Long> task = new ForkJoinSumCalculate(0L, 50000000000L);        
     Long sum = pool.invoke(task);        
     System.out.println(sum);        
     Instant end = Instant.now();        
     System.out.println("耗費時間為:" + Duration.between(start, end).toMillis());
 }

結果:cpu利用率達到100%,耗時19.361s

和for循環累加比較一下:

@Test
 public void test1(){
     Instant start = Instant.now();        
     long sum = 0L;        
     for (long i = 0L; i <= 50000000000L; i++) {
         sum += i;
     }        
     System.out.println(sum);        
     Instant end = Instant.now();        
     System.out.println("耗費時間為:" + Duration.between(start, end).toMillis());//35-3142-15704
 }

結果如下:耗時18.699s

由於fork/join框架在復雜邏輯時不易拆分,java8為fork/join進行了改進,代碼如下:

 //java8 新特性
 @Test
 public void test2(){
     Instant start = Instant.now();        
     Long sum = LongStream.rangeClosed(0L, 50000000000L)
                          .parallel()
                          .reduce(0L, Long::sum);        
     System.out.println(sum);        
     Instant end = Instant.now();       
     System.out.println("耗費時間為:" + Duration.between(start, end).toMillis());//1536-8118
 }

結果:耗時15.428s

 測試了幾個值,發現效率方面: java8 > for循環 > fork/join

  10000000000L
50000000000L
100000000000L
java8 3320ms 15428ms 34770ms
for 3902ms 18699ms 37858ms
fork/join 4236ms 19361ms 40977ms

  按理來說,隨着計算量的增大,fork/join的效率會超過for循環,但是在本機測試出的結果如上,fork/join框架的效率始終不如貼近底層的for循環。這方面可能一方面在於compute方法設計中long類型的裝箱拆箱存在一定時間開銷,另一方面可能由於臨界值選擇不合理,測試時選擇10000,在測試10000000000L累加時,采取四個臨界值:5000、10000、20000、100000,結果還是臨界值為10000時效率最高。還是相信眼見為實吧。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM