【Java多線程系列七】ExecutorService


java.util.concurrent.ExecutorService接口提供了許多線程管理的方法

Method 說明
shutdown 拒絕接收新的任務,待已提交的任務執行后關閉,且宿主線程不阻塞,若需要阻塞可借助awaitTermination實現
shutdownNow 停止所有正在執行的任務,掛起未執行的任務並關閉,且宿主線程不阻塞,若需要阻塞可借助awaitTermination實現
awaitTermination 當發生shutdown時,阻塞宿主線程直到約定的時間已過或者所有任務完成
submit 提交任務Callable/Runnable,可利用Future的get()方法使宿主線程阻塞直到任務結束后返回結果

有了以上方法,便可以基於此接口實現線程池的各種功能(例如java.util.concurrent.ThreadPoolExecutor/java.util.concurrent.ScheduledThreadPoolExecutor),以java.util.concurrent.ThreadPoolExecutor為例,其參數的詳解

Name Type 說明
corePoolSize int 線程池中最小的線程數
maximumPoolSize int 線程池中最大的線程數
keepAliveTime long 線程空閑時間,若線程數大於corePoolSize,空閑時間超過該值的線程將被終止回收
unit TimeUnit keepAliveTime的時間單位
workQueue BlockingQueue<Runnable> 已提交但未執行的任務隊列
threadFactory ThreadFactory 創建新線程的工廠
handler RejectedExecutionHandler 當線程池或隊列達到上限拒絕新任務拋出異常時的處理類

同時,java.util.concurrent.Executors類提供的常用方法有

Method 說明 基類
newFixedThreadPool 線程池中含固定數量的線程 基於java.util.concurrent.ThreadPoolExecutor類
newSingleThreadExecutor 線程池中僅含一個工作線程
newCachedThreadPool 按需創建線程,若線程池中無可用線程,則創建新的線程並加入,直到線程數達到上限值(Integer.MAX_VALUE)
newWorkStealingPool 按照可用CPU數創建線程池 基於java.util.concurrent.ForkJoinPool類

java.util.concurrent.ForkJoinPool類是Fork/Join框架的實現類,Fork/Join框架是Java7提供了的一個用於並行執行任務的框架, 是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架,該類在有遞歸實現的場景有更優異的表現。

package com.concurrent.test;

import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.junit.Assert;
import org.junit.Test;

/**
 * 測試ExecutorService
 */
public class ThreadExecutorServiceTest {
    private static final String THIS_IS_SHUTDOWN_WITH_AWAIT_TERMINATION = "This is shutdownWithAwaitTermination";
    private static final int RESULT = 111;
    
    private static boolean submitRunable() throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<?> future = executorService.submit(new Runnable() {
            
            @Override
            public void run() {
                System.out.println("This is submitRunnable");
            }
        });
        
        return future.get() == null;
    }
    
    private static Integer submitRunnableWithResult() throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<Integer> future = executorService.submit(new Runnable() {
            
            @Override
            public void run() {
                System.out.println("This is submitRunnableWithResult");
            }
        }, RESULT);
        
        return future.get();
    }
    
    private static Integer submitBlockCallable() throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<Integer> future = executorService.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("This is submitBlockCallable");
                return RESULT;
            }
        });
        return future.get(); //阻塞
    }
    
    private static boolean submitNonBlockCallable() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<Integer> future = executorService.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                System.out.println("This is submitNonBlockCallable");
                return RESULT;
            }
        });
        
        while (!future.isDone()) {// 非阻塞
            System.out.println(new Date());
        }
        
        return future.isDone();
    }
    
    
    private static String shutdown() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        final StringBuilder sb = new StringBuilder();
        executorService.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                Thread.sleep(10000);
                sb.append("This is shutdown");
                return RESULT;
            }
        });
        executorService.shutdown();
        return sb.toString();
    }
    
    private static String shutdownWithAwaitTermination() throws InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        final StringBuilder sb = new StringBuilder();
        executorService.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                Thread.sleep(10000);
                sb.append(THIS_IS_SHUTDOWN_WITH_AWAIT_TERMINATION);
                return RESULT;
            }
        });
        
        executorService.shutdown();
        executorService.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
        return sb.toString();
    }
    
    private static int testForkJoinPool(List<Integer> list) throws InterruptedException, ExecutionException {
        ForkJoinPool forkJoinPool = new ForkJoinPool(8);
        Future<Integer> future = forkJoinPool.submit(new SumTask(list));
        return future.get();
    }
    
    @Test
    public void test() throws InterruptedException, ExecutionException {
        Assert.assertTrue(submitRunable());
        Assert.assertEquals(RESULT, submitRunnableWithResult().intValue());
        Assert.assertEquals(RESULT, submitBlockCallable().intValue());
        Assert.assertTrue(submitNonBlockCallable());
        Assert.assertTrue(shutdown().isEmpty());
        Assert.assertEquals(THIS_IS_SHUTDOWN_WITH_AWAIT_TERMINATION, shutdownWithAwaitTermination());
        
        Assert.assertEquals(10, testForkJoinPool(Arrays.asList(new Integer[] { 1, 2, 3, 4 })));
        Assert.assertEquals(49, testForkJoinPool(Arrays.asList(new Integer[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 })));
        Assert.assertEquals(60, testForkJoinPool(Arrays.asList(new Integer[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11 })));
        
    }
}

 SumTask類如下:

package com.concurrent.test;

import java.util.List;
import java.util.concurrent.RecursiveTask;

public class SumTask extends RecursiveTask<Integer> {

    /**
     * 
     */
    private static final long serialVersionUID = -5468389855825594053L;

    private List<Integer> list;
    
    public SumTask(List<Integer> list) {
        this.list = list;
    }
    
    /* 
     * Ensure it is necessary to divide the job to parts and finish them separately
     */
    @Override
    protected Integer compute() {
        int rtn , size = list.size();
        if (size < 10) {
            rtn = sum(list);
        }
        else{
            SumTask subTask1 = new SumTask(list.subList(0, size /2));
            SumTask subTask2 = new SumTask(list.subList(size /2 + 1, size));
            subTask1.fork();
            subTask2.fork();
            rtn = subTask1.join() + subTask2.join();
        }
        return rtn;
        
    }
    
    private int sum(List<Integer> list) {
        int sum = 0;
        for (Integer integer : list) {
            sum += integer;
        }
        return sum;
    }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM