Java異步調用模式


在長期的Java客戶端開發中,最常見的一個客戶端調用模式就是Java的異步調用。所謂異步調用其實就是實現一個可無需等待被調用函數的返回值而讓操作繼續運行的方法。在Java語言中,簡單的講就是另啟一個線程來完成調用中的部分計算,使調用繼續運行或返回,而不需要等待計算結果。但調用者仍需要取線程的計算結果。雖然在1.5以前從異步線程中取得返回結果需要自己精心設計,但從JDK1.5開始引入了Future接口(FutureTask類)從異步執行的線程中取得返回值。
Future 表示異步計算的結果,它提供了檢查計算是否完成的方法,以等待計算的完成,並獲取計算的結果。FutureTask類是Future接口方法的一個基本實現,是一種可以取消的異步計算任務,計算是通過Callable接口來實現的。

FutureTask有下面幾個重要的方法:

       1. get()   阻塞一直等待執行完成拿到結果

       2. get(int timeout, TimeUnit timeUnit)  阻塞一直等待執行完成拿到結果,如果在超時時間內,沒有拿到拋出異常

       3. isCancelled()  是否被取消

       4. isDone()   是否已經完成

       5. cancel(boolean mayInterruptIfRunning)  試圖取消正在執行的任務

Callable和Runnable有幾點不同:

  • Callable規定的方法是call(),而Runnable規定的方法是run().
  • Callable的任務執行后可返回值,而Runnable的任務是不能返回值的。
  • call()方法可拋出異常,而run()方法是不能拋出異常的。
運行Callable任務可拿到一個Future對象,通過Future對象可了解任務執行情況,可取消任務的執行,還可獲取任務執行的結果。
舉一個例子說明如何使用Future對象,如下:

public class MyFutureTaskTest {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        FutureTask<String> future = new FutureTask<String>(new Callable<String>() {
            public String call() throws Exception{ //建議拋出異常
                try {
                    Thread.sleep(5* 1000);
                    return "Hello Welcome!";
                }
                catch(Exception e) {
                    throw new Exception("Callable terminated with Exception!"); // call方法可以拋出異常
                }
            }
        });
        executor.execute(future);
        long t = System.currentTimeMillis();
        try {

//            String result = future.get(3000, TimeUnit.MILLISECONDS); //取得結果,同時設置超時執行時間為5秒。
            String result = future.get(); //取得結果,同時設置超時執行時間為5秒。
            System.err.println("result is " + result + ", time is " + (System.currentTimeMillis() - t));
        } catch (InterruptedException e) {
            future.cancel(true);
            System.err.println("Interrupte time is " + (System.currentTimeMillis() - t));
        } catch (ExecutionException e) {
            future.cancel(true);
            System.err.println("Throw Exception time is " + (System.currentTimeMillis() - t));
//        } catch (TimeoutException e) {
//            future.cancel(true);
//            System.err.println("Timeout time is " + (System.currentTimeMillis() - t));
        } finally {
            executor.shutdown();
        }

    }
}

運行結果如下:

 result is Hello Welcome!, time is 5000

如果設置了超時時間,則運行結果如下:

Timeout time is 3000

 可以看出設置超時時間的影響。

再如一個多個運行任務的例子:

public class MyAsyncExample implements Callable {
    private int num;

    public MyAsyncExample(int aInt) {
        this.num = aInt;
    }

    public String call() throws Exception {
        boolean resultOk = false;
        if (num == 0) {
            resultOk = true;
        } else if (num == 1) {
            while (true) { //infinite loop
                System.out.println("looping....");
                Thread.sleep(3000);
            }
        } else {
            throw new Exception("Callable terminated with Exception!"); 
        }
        if (resultOk) {
            return "Task done.";
        } else {
            return "Task failed";
        }
    }

    public static void main(String[] args) {
        //定義幾個任務
        MyAsyncExample call1 = new MyAsyncExample(0);
        MyAsyncExample call2 = new MyAsyncExample(1);
        MyAsyncExample call3 = new MyAsyncExample(2);
        //初始任務執行工具。
        ExecutorService es = Executors.newFixedThreadPool(3);
        //執行任務,任務啟動時返回了一個Future對象,
        Future future1 = es.submit(call1);
        Future future2 = es.submit(call2);
        Future future3 = es.submit(call3);
        try {
            //任務1正常執行完畢,future1.get()會返回線程的值
            System.out.println(future1.get());
            //任務2進行一個死循環,調用future2.cancel(true)來中止此線程。
            Thread.sleep(3000);
            System.out.println("Thread 2 terminated? :" + future2.cancel(true));
            //任務3拋出異常,調用future3.get()時會引起異常的拋出
            System.out.println(future3.get());
        } catch (ExecutionException ex) {
            ex.printStackTrace();
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
    }
}

 運行結果如下:

looping....
Task done.
java.util.concurrent.ExecutionException: java.lang.Exception: Callable terminated with Exception!
 at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
looping....
 at java.util.concurrent.FutureTask.get(FutureTask.java:83)
Thread 2 terminated? :true
 at org.jevo.future.sample.MyAsyncExample.main(MyAsyncExample.java:57)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 at java.lang.reflect.Method.invoke(Method.java:597)
 at com.intellij.rt.execution.application.AppMain.main(AppMain.java:90)
Caused by: java.lang.Exception: Callable terminated with Exception!
 at org.jevo.future.sample.MyAsyncExample.call(MyAsyncExample.java:30)
 at org.jevo.future.sample.MyAsyncExample.call(MyAsyncExample.java:13)
 at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
 at java.util.concurrent.FutureTask.run(FutureTask.java:138)
 at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
 at java.lang.Thread.run(Thread.java:662)

以上是對Future模型的例子。異步調用在Swing中應該十分廣泛,當客戶端調用一個'重'的服務端操作時,我們常采用這種方式。Swing中存在一個Future的實現——SwingWorker,這使我們十分方便地在客戶端開發中使用異步調用,詳細使用參見API文檔。下面附一個不使用Future來實現取得異步調用的代碼,如下:

public abstract class AsyncWorker {
    private Object value;  //the running result
    private boolean finished = false;

    private static class ThreadVar {
        private Thread thread;

        ThreadVar(Thread t) {
            thread = t;
        }

        synchronized Thread get() {
            return thread;
        }

        synchronized void clear() {
            thread = null;
        }
    }

    private ThreadVar threadVar;

    /**
     * 返回當前線程運行結果。
     */
    protected synchronized Object getValue() {
        return value;
    }

    /**
     * 設置當前線程運行結果
     */
    private synchronized void setValue(Object x) {
        value = x;
    }

    /**
     * 調用都創建計算邏輯,將運算結果返回
     */
    public abstract Object construct();

    public void finished() {
        finished = true;
    }

    public boolean isFinished() {
        return finished;
    }

    public void interrupt() {
        Thread t = threadVar.get();
        if (t != null) {
            t.interrupt();
        }
        threadVar.clear();
    }

    public void stop() {
        Thread t = threadVar.get();
        if(t!=null) {
            t.stop();
        }
        threadVar.clear();
    }

    /**
     * 返回 construct方法運行結果。
     */
    public Object get() {
        while (true) {
            Thread t = threadVar.get();
            if (t == null) {
                return getValue();
            }
            try {
                t.join();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return null;
            }
        }
    }


    public AsyncWorker() {
        final Runnable doFinished = new Runnable() {
            public void run() {
                finished();
            }
        };

        Runnable doConstruct = new Runnable() {
            public void run() {
                try {
                    setValue(construct());
                }
                finally {
                    threadVar.clear();
                }

                SwingUtilities.invokeLater(doFinished);
            }
        };

        Thread t = new Thread(doConstruct);
        threadVar = new ThreadVar(t);
    }

    /**
     * Start the worker thread.
     */
    public void start() {
        finished = false;
        Thread t = threadVar.get();
        if (t != null) {
            t.start();
        }
    }

    public static void main(String[] args) {
        AsyncWorker worker = new AsyncWorker() {
            public Object construct() {
                try {
                    Thread.sleep(3*1000);
                }
                catch(Exception e){}
                return "hello world";

            }

            public void finished() {
                super.finished();
                //取線程運行返回的結果
//                Object obj = this.get();
//                System.err.println("return is " + obj);
            }
        };

        long t = System.currentTimeMillis();
        worker.start();
        Object obj = worker.get(); //取得運行結果
        System.err.println("return is " + obj + ", time = " + (System.currentTimeMillis() - t));

    }
}

在上述代碼中,調用者只需要擴展AsyncWorker類定義可計算的邏輯,並將邏輯結果返回。返回結果會保存在一變量中。當調用者調用返回結果時,如果計算還未完成,將調用Thread.join()阻塞線程,直到計算結果返回。用法上是不是與FutureTask相似?在Swing異步調用中,還需要結合等待對話框來表示計算運行進程,從而使運行界面顯示更加友好。 

再看一下線程的join方法,我們知道線程可被Object.wait、Thread.join和Thread.sleep三種方法之一阻塞,當接收到一個中斷異常(InterruptedException)時,可提早地終結被阻塞狀態。Thread.join的使用情況卻有所不同:我們對一些耗時運算,常啟用一個主線程來生成並啟動一些子線程,在子線程中進行耗時的運算,當主線程繼續處理完其他的事務后,需要調用子線程的處理結果,這個時候就要使用join();。Joint方法將使主線程等待子線程運行結束,即join()方法后的代碼,只有等到子線程運行結束后才能被執行。參考下例:

public class ChildThread extends Thread {
    public ChildThread() {
        super("ChildThread");
    }

    public void run() {
        String threadName = Thread.currentThread().getName();
        System.out.println(threadName + " start.");
        try {
            for (int i = 0; i < 5; i++) {
                System.out.println(threadName + " loop at " + i);
                Thread.sleep(1000);
            }
            System.out.println(threadName + " end.");
        } catch (Exception e) {
            System.out.println("Exception from " + threadName + ".run");
        }
    }
}

 

public class ParentThread extends Thread {
    ChildThread t1;

    public ParentThread(ChildThread t1) {
        super("ParentThread");
        this.t1 = t1;
    }

    public void run() {
        String threadName = Thread.currentThread().getName();
        System.out.println(threadName + " start.");
        try {
            t1.join();   //ChildThread 線程t1結束后,才能運行此行代碼后的代碼。
            System.out.println(threadName + " end.");
        } catch (Exception e) {
            System.out.println("Exception from " + threadName + ".run");
        }
    }

    public static void main(String[] args) {
        String threadName = Thread.currentThread().getName();
        System.out.println(threadName + " start.");
        ChildThread t1 = new ChildThread();
        ParentThread t = new ParentThread(t1);
        try {
            t1.start();
            Thread.sleep(2000);
            t.start();
            t.join();//此處注釋后,將直接運行到結束代碼. 注釋此處代碼,比較運行結果
        } catch (Exception e) {
            System.out.println("Exception from main");
        }
        System.out.println(threadName + " end!");
    }

}

在t.join()被注釋前運行結果如下:

main start.
ChildThread start.
ChildThread loop at 0
ChildThread loop at 1
ParentThread start.
ChildThread loop at 2
ChildThread loop at 3
ChildThread loop at 4
ChildThread end.
ParentThread end.
main end!

當t.join()被注釋后運行結果如下: 

main start.
ChildThread start.
ChildThread loop at 0
ChildThread loop at 1
main end!
ParentThread start.
ChildThread loop at 2
ChildThread loop at 3
ChildThread loop at 4
ChildThread end.
ParentThread end.

可見ParentThread線程仍等待ChildThread線程運行結束后才運行完畢,而Main線程與ParentThread線程的運行並沒有保持等待。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM