Java筆記(十七) 異步任務執行服務


異步任務執行服務

一、基本原理和概念

一)基本接口

1)Runnable和Callable:表示要執行的異步任務。

2)Executor和ExecutorService:表示執行服務。

3)Future:表示異步任務的結果。

Executor接口:

public interface Executor {
    void execute(Runnable command);
}

ExecutorService擴展了Executor:

public interface ExecutorService extends Executor {
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
}

這三個submit都只是表示任務已經提交,不代表已經執行,通過Future可以查詢可以

查詢異步任務的狀態、獲取最終結果、取消任務等。

public interface Future<V> {
    //用於取消任務,如果任務還沒有開始,則不再運行,如果任務已經在執行,則不一定能
    //取消,參數mayInterruptIfRunning表示,如果任務已經在執行,是否調用interrupt
    //方法中斷線程,如果為false就不會,如果為true就會嘗試線程中斷,但中斷也不一定取消
    boolean cancel(boolean mayInterruptIfRunning);
//返回cancel方法的返回值,任務不一定被終止
boolean isCancelled();
//不管什么方式,只要任務結束,都返回true
boolean isDone(); //用於返回異步任務最終的結果,如果任務還未執行,會阻塞等待。 V get() throws InterruptedException, ExecutionException; //限定等待時間,如果超時任務還沒有結束,拋出異常TimeoutException V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }

Future是一個重要的概念,是實現“任務的提交”與“任務的執行”相分離的關鍵,是其中的紐帶,

任務提交者和任務執行服務通過它隔離各自的關注點,同時進行協作。

二)基本用法

public class BasicDemon {
    static class Task implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            int sleepSeconds = new Random().nextInt(1000);
            Thread.sleep(sleepSeconds);
            return sleepSeconds;
        }
    }
    public static void main(String[] args) {
        //使用一個線程執行所有服務
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<Integer> future = executor.submit(new Task());
        //模擬執行其他任務
        try {
            Thread.sleep(1000);
            System.out.println(future.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        //關閉執行任務服務
        executor.shutdown();
    }
}
public interface ExecutorService extends Executor {
    //表示不再接收新任務,但已經提交的任務會繼續執行,即使任務還未開始
    void shutdown();
    //不接收新任務,終止已經提交但還尚未執行的任務,
    // 對於已經執行的任務,用interrupt方法嘗試中斷。
    //返回已經提交但尚未執行的任務列表
    List<Runnable> shutdownNow();
    //shutdown和shutdownNow不會阻塞等待,它們返回后不代表所有的任務都已結束
    //不過isShutdown方法會返回true。
    boolean isShutdown();
    //所有任務都結束返回true
    boolean isTerminated();
    //等待所有任務結束
    boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;
    //等待所有任務完成,返回Future列表中,每個Future的isDone方法都返回true,
    //但這並不代表任務執行成功,也可能是被取消了。
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException;
    //指定等待時間,如果超時后有的任務沒完成,就會被取消。
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
            throws InterruptedException;
    //只要有一個任務在限時內成功返回了,它就會返回該任務的結果,其他任務被取消
    //如果沒有任務能在限時內成功返回,拋出TimeoutException,如果限時內所有的任務
    //都完成了,但都發生了異常,拋出ExecutionException.
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) 
throws InterruptedException, ExecutionException, TimeoutException; }

三)基本實現原理

ExecutorService的主要實現類是ThreadPoolExecutor,它是基於線程池實現的,

ExecutorService有一個抽象實現類AbstractExecutorService

1.AbstractExecutorService 

該類提供了submit,invokeAll,invokeAny的默認實現,子類需要實現其他方法。

除了execute,其他方法都與執行服務的生命周期管理有關。submit/invokeAll/invokeAny

最終都會調用execute,我們來簡單實現它們:

public void execute(Runnable command) {
    new Thread(command).start();
}
public <T> Future<T> submit(Callable<T> task) {
    if(task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

2.FutureTask 

FutureTask實現了RunnableFuture接口。它的成員變量:

private Callable<V> callable;

整數變量state表示狀態:

private volatile int state;

取值為:

NEW = 0; //任務在運行
COMPLETING = 1; //臨時狀態,任務即將結束,在設置結果
NORMAL = 2; //任務正常執行完成
EXCEPTIONAL = 3 //任務執行拋出異常結束
CANCELLED = 4; //任務被取消 INTERRUPTING = 5; //任務在被中斷 INTERRUPTED = 6; //任務被中斷

有一個變量表示最終的執行結果或異常:

private Object outcome;

有個變量表示運行任務的線程:

private volatile Thread runner;

有個單向鏈表表示等待任務的執行結果的線程:

private volatile WaitNode waiters;

構造方法:

public FutureTask(Runnable runnable, V result) {
    //轉化為Callable
    this.callable = Executors.callable(runnable, result);
    this.state = NEW; //ensure visibility of callable
}

任務執行服務:

public void run() {
    if(state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                    null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if(c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if(ran)
                set(result);
        }
    } finally {
        //runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        //state must be re-read after nulling runner to prevent
        //leaked interrupts
        int s = state;
        if(s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);}
}

其中,set和setException除了設置結果,修改狀態外,還會調用finshCompletion,它會

喚醒所有等待結果的線程。

對於任務提交者,它通過get方法獲取結果,限時get方法的代碼為:

public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
    if(unit == null)
        throw new NullPointerException();
    int s = state;
    if(s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}
private V report(int s) throws ExecutionException {
    Object x = outcome;
    if(s == NORMAL)
        return (V)x;
    if(s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}
public boolean cancel(boolean mayInterruptIfRunning) {
    if(state != NEW)
        return false;
    if(mayInterruptIfRunning) {
        if(!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
            return false;
        Thread t = runner;
        if(t != null)
            t.interrupt();
        UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
    }
    else if(!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
        return false;
//喚醒所有等待結果的線程 finishCompletion();
return true; }

二、線程池

線程池是並發程序中一個非常重要的概念和技術。線程池主要由兩個概念組成:

一是任務隊列,另一個是工作者線程。工作者線程主體就是一個循環,循環從隊列

中接受任務並執行,任務隊列保存待執行的任務。線程池的優點:

1)可以重用線程,避免線程創建的開銷;

2)任務過多時,通過排隊避免創建過多線程,減少系統資源和競爭,確保任務有序完成。

Java並發包中線程池的實現類是ThreadPoolExecutor,它繼承自AbstracExecutorService,

實現了ExecutorService.

一)理解線程池

主要構造方法:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, RejectedExecutionHandler handler)

1.線程池大小

corePoolSize:核心線程個數

maximumPoolSize:最大線程個數

keepAliveTime和unit:表示當線程池中線程個數大於corePoolSize時額外空閑線程的存活時間。

如果該值為0,表示所有線程都不會超時終止。

一般情況下,有新任務到來的時候,如果當前線程個數小於corePoolSize,就會創建一個新

線程來執行該任務,需要說明的是即使其他線程是空閑着的,也會創建新線程。不過,如果

線程個數大等於corePoolSize,那就不會立即創建新線程了,它會先嘗試排隊,需要強調的是

它是嘗試排隊,而不是阻塞等待入隊,如果隊列滿了或者因為其他原因不能立即入隊,它就不

會排隊,而是檢查線程個數是否達到了maximumPoolSize,如果沒有,就會繼續創建線程,直到

線程數達到maximumPoolSize。

查看關於線程和任務數的一些動態數字:

//返回當前線程個數
public int getPoolSize()
//返回線程池曾經達到過的最大線程數
public int getLargestPoolSize()
//返回線程池創建以來所有已完成的任務數
public long getCompletedTaskCount()
//返回所有任務數,包括已完成和在排隊的
public long getTaskCount()

關於任務隊列,需要強調的是,如果用的是無界隊列,線程個數最多只能達到corePoolSize,

新的任務總會排隊,參數maximumPoolSize也就沒有意義。

2.任務拒絕策略 

如果任務隊列有界,且maximumPoolSize有限,則當隊列排滿,線程個數

也達到maximumPoolSize,這時,新任務來了就會觸發線程池任務拒絕策略。

此時,默認情況下,默認情況下提交任務的方法(executoe/submit/invokeAll等)

會拋出RejectExecutionException。不過該策略可以自定義,ThreadPoolExecutor

實現了4種處理方式:

1)ThreadPoolExecutor.AbortPolicy:默認處理方式,拋異常;

2)ThreadPoolExecutor.DiscardPolicy:靜默處理,忽略新任務,不拋異常也不執行;

3)ThreadPoolExecutor.DiscarOldestPolicy:將等待時間最長的任務扔掉,然后自己排隊;

4)ThreadPoolExecutor.CallerRunsPolicy:在任務提交者線程中執行任務,而不是交給線程池中的線程執行。

他們都實現了RejectedExecutionHandler接口:

public interface RejectedExecutionHandler {
    //當線程池不能接受任務時,調用該方法
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

默認的RejectedExecutionHandler:

private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

3.線程工廠  

ThreadFactory是一個接口:

public interface ThreadFactory {
    Thread newThread(Runnable r);
}

這個接口根據Runnable創建一個Thread. ThreadPoolExecutor中線程的默認實現就是Execotors類中的靜態內部類

DefaultThreadFactory,主要就是創建一個線程,給線程設置一個名稱,設置daemon屬性為false,設置線程的優先級

為標准默認優先級,線程的名稱為:pool-<線程池編號>-thread-<線程編號>。可以自定義,實現該接口。

4.關於核心線程的特殊配置 

當線程池中線程個數小等於corePoolSize時,線程池中的線程是核心線程,默認情況下:

核心線程不會預先創建,只有當有任務時才創建,核心線程不會因為空閑而終止。

ThreadPoolExecutor有如下方法,可以改變這些默認行為:

//預先創建所有核心線程
public int prestartAllCoreThreads()
//創建一個核心線程,如果所有核心線程都已經創建,則返回false
public boolean prestartCoreThread()
//如果參數為true,則keepAliveTime參數也適用於核心線程
public void allowCoreThreadTimeOut(boolean value)

二)工廠類Executors

該類提供了創建線程池的方法:

public static ExcutorService newSingleThreadExecutor() {
    return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
}

注意使用的是無界隊列

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L,
            TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}

創建固定線程個數的線程池,使用無界隊列,線程創建后不會超時終止,

由於是無界隊列,如果排隊任務過多,可能會消耗過多內存。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L,
            TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
}

創建一個線程池,當新線程到來時,如果有空閑線程在等待任務,則其中一個空閑線程接受該任務,

否則就創建一個新線程,線程創建的總個數幾乎不受限制,對於任意一個空閑線程,如果60秒內沒有新任務,就終止。

思考,應該怎么選擇線程池?

三)線程池死鎖

自己思考。

三、定時任務的那些陷阱

在Java中主要有兩種方式實現定時任務:

1)使用java.util包中的Timer和TimeTask

2)使用java並發包中的ScheduledExecutorService

一)Timer和TimeTask

1.基本用法 

TimerTask表示一個定時任務,它是一個抽象類,實現了Runnable,具體的定時任務需要繼承

該類,實現run方法。Timer是一個具體類,它負責定時任務的調度和執行:

//在指定的絕對時間運行task
public void schedule(TimerTask task, Date time)
//在當前時間延遲delay毫秒后執行
public void schedule(TimerTask task, long delay)
//固定延時重復執行,第一次計划執行時間為firstTime,
//后一次的計划執行時間為前一次的“實際”加上period,如果由於某種原因該次任務延時了,
//則本次任務也會延時,即延時時間period始終不變。 public void schedule(TimerTask task, Date firstTime, long period) //同樣是固定延時重復執行,第一次執行時間為當前時間加上delay public void schedule(TimerTask task, long delay, long period) //固定頻率重復執行,第一次計划執行時間為firstTime //后一次的計划執行時間為前一次的計划時間加上period public void scheduleAtFixedRate(TimerTask task, Date firstTime, long period) // public void scheduleAtFixedRate(TimerTask task, long delay, long period)

 注意固定延時和固定頻率的區別。另外需要注意,如果第一個計划執行的時間firstTime是一個過去時,則任務會

立即執行,對於固定延時的任務,下次任務會基於第一次執行時間計算,而對於固定頻率的任務,則會從firstTime

開始計算,有可能加上period還是一個過去時間,從而連續運行很多次,直到時間超過當前時間。

例子:

public class TimerFixedDelay {

    static String getNowTime() {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
        return format.format(new Date());
    }

    static class LongRunningTask extends TimerTask {
        public void run() {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Date date = new Date();
            System.out.println("Long running task finished! And finish time is " + getNowTime() );
        }
    }

    static class DelayTask extends TimerTask {
        public void run() {
            System.out.println("Now the time is " + getNowTime());
        }
    }

    public static void main(String[] args) {
        Timer timer = new Timer();
        timer.schedule(new LongRunningTask(), 10);
        timer.schedule(new DelayTask(), 100, 1000);
        /*Long running task finished! And finish time is 2018-12-24 04:50:29
        Now the time is 2018-12-24 04:50:29
        Now the time is 2018-12-24 04:50:30
        Now the time is 2018-12-24 04:50:31
        Now the time is 2018-12-24 04:50:32
        Now the time is 2018-12-24 04:50:33
        Now the time is 2018-12-24 04:50:35*/
        
//        Timer timer1 = new Timer();
//        timer1.schedule(new LongRunningTask(), 10);
//        timer1.scheduleAtFixedRate(new DelayTask(), 100, 1000);
        /*Long running task finished! And finish time is 2018-12-24 04:48:48
        Now the time is 2018-12-24 04:48:48  
        Now the time is 2018-12-24 04:48:48  //補足了之前運行的代碼
        Now the time is 2018-12-24 04:48:48
        Now the time is 2018-12-24 04:48:48
        Now the time is 2018-12-24 04:48:48
        Now the time is 2018-12-24 04:48:48
        Now the time is 2018-12-24 04:48:49
        Now the time is 2018-12-24 04:48:50
        Now the time is 2018-12-24 04:48:51
        Now the time is 2018-12-24 04:48:52
        Now the time is 2018-12-24 04:48:53
        Now the time is 2018-12-24 04:48:54
        Now the time is 2018-12-24 04:48:55
        Now the time is 2018-12-24 04:48:56*/ 
    }
} 

2.基本原理 

Timer內部主要由任務隊列和Timer線程兩部分組成。任務隊列是一個基於

堆實現的優先級隊列,按照下次執行時間排優先級。Timer線程負責執行

所有的定時任務,注意,一個Timer對象只有一個Timer線程,所以對於上面的

例子,任務會被延遲。

Timer線程的主體是一個循環,從隊列中獲取任務,如果隊列中有任務

且計划執行時間小等於當前時間,就執行它,如果隊列中沒有任務或者

第一個任務延時還沒有到,就睡眠。如果睡眠過程中隊列上添加新任務

是第一個任務,Timer線程就會被喚醒,重新進行檢查。

 在執行任務之前,Timer線程判斷任務是否為周期任務,如果是就設置

下次執行時間並添加到優先級隊列中,對於固定延時任務,下次執行時間

為當前時間加上period,對於固定頻率任務,下次執行時間為上次計划時間加上period。

3.死循環

 定時任務不能耗時太長,更不能是無限循環。

public class EndlessTimer {
    static class LoopTask extends TimerTask {
        public void run() {
            while (true) {
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class SimpleTask extends TimerTask {
        public void run() {
            System.out.println("Never happen!"); //永遠不會被執行
        }
    }

    public static void main(String[] args) {
        Timer timer = new Timer();
        timer.schedule(new LoopTask(), 100);
        timer.schedule(new SimpleTask(), 100);
    }
}

4.異常處理

在執行任何一個任務的run方法時,如果run方法拋出異常,Timer線程就會退出,

從而所有的定時任務都會被取消。所以,如果希望各個定時任務互不干擾,一定要在run方法內捕獲異常。

二)ScheduledExecutorService

1.基本用法

ScheduledExecutorService是一個接口,其用法為:

public interface ScheduledExecutorService extends ExecutorService {
    //單次執行,在指定時間delay后運行command
    ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
    //單次執行,在指定時間delay后運行callable
    <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay,TimeUnit unit);
    //固定頻率重復執行
    ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay, long delay, TimeUnit unit)
    //固定延時重復執行
    ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}

ScheduledExecutorServiced的主要實現類是SchedeuledThreadPoolExecutor,它是線程池

ThreadPoolExecutor的子類,其主要構造方法為:

public ScheduledThreadPoolExecutor(int corePoolSize)

它的任務隊列是一個無界優先級隊列。工廠類Executors也提供了一些方法,以創建SchedeuledThreadPoolExecutor:

//單線程定時任務
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
public static ScheduledExecutorService newSingleThreadScheduledExecutor(
ThreadFactory threadFactory)
//多線程定時任務
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize)
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory)

與Timer中的任務類似,應該捕獲所有異常。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM