1.概述 在本文中,我們將了解Future。自Java 1.5以來一直存在的接口,在處理異步調用和並發處理時非常有用。 2.創建Future 簡單地說,Future類表示異步計算的未來結果 - 這個結果最終將在處理完成后出現在Future中。 讓我們看看如何編寫創建和返回Future實例的方法。 Future接口是長時間運行方法異步處理的理想選擇。這使我們能夠在等待Future封裝的任務完成時執行一些其他事情。 利用Future的異步性質的操作示例如下: 計算密集型過程(數學和科學計算) 操縱大數據結構(大數據) 遠程方法調用(下載文件,抓取HTML,Web服務)。 2.1 用FutureTask實現Future 對於我們的示例,我們將創建一個非常簡單的類來計算Integer的平方。這絕對不屬於“長期運行”方法類別,但是我們將對它進行一次Thread.sleep()調用以使其持續1秒鍾完成: public class SquareCalculator { private ExecutorService executor = Executors.newSingleThreadExecutor(); public Future<Integer> calculate(Integer input) { return executor.submit(() -> { Thread.sleep(1000); return input * input; }); } } 實際執行計算的代碼位包含在call()方法中,作為lambda表達式提供。正如你所看到的,除了之前提到的sleep()調用之外沒有什么特別之處。 當我們將注意力轉向Callable 和ExecutorService的使用時,它會變得更有趣。 Callable是一個接口,表示返回結果並具有單個call()方法的任務。在這里,我們使用lambda表達式創建了它的實例。 創建一個Callable實例並沒有把我們帶到任何地方,我們仍然必須將這個實例傳遞給一個執行器,該執行器將負責在一個新線程中啟動該任務並返回有價值的Future對象。這就是ExecutorService的用武之地。 我們可以通過幾種方式獲得ExecutorService實例,其中大部分都是由實用程序類Executors的靜態工廠方法提供的。在這個例子中,我們使用了基本的newSingleThreadExecutor(),它為我們提供了一次能夠處理單個線程的ExecutorService。 一旦我們有了一個ExecutorService對象,我們只需要調用submit()傳遞我們的Callable作為參數。submit()將負責啟動任務並返回FutureTask 對象,該對象是Future接口的實現。 3.使用Future 到目前為止,我們已經學會了如何創建Future的實例。 在本節中,我們將通過探索Future的API中的所有方法來學習如何使用此實例。 3.1 使用isDone()和get()來獲取結果 現在我們需要調用calculate()並使用返回的Future來獲得生成的Integer。Future API中的兩種方法將幫助我們完成這項任務。 Future.isDone()告訴我們執行程序是否已完成任務處理。如果任務完成,則返回 true,否則返回 false。 從計算中返回實際結果的方法是Future.get()。請注意,此方法會阻止執行,直到任務完成,但在我們的示例中,這不會成為問題,因為我們首先通過調用isDone()來檢查任務是否已完成。 通過使用這兩種方法,我們可以在等待主任務完成時運行其他一些代碼: Future<Integer> future = new SquareCalculator().calculate(10); while(!future.isDone()) { System.out.println("Calculating..."); Thread.sleep(300); } Integer result = future.get(); 在這個例子中,我們在輸出上寫一條簡單的消息,讓用戶知道程序正在執行計算。 方法get()將阻止執行,直到任務完成。但是我們不必擔心,因為我們的示例只是在確保任務完成后才調用get()。因此,在這種情況下,future.get()將始終立即返回。 值得一提的是,get()有一個重載版本,它接受超時和TimeUnit作為參數: Integer result = future.get(500, TimeUnit.MILLISECONDS); get(long,TimeUnit)和get()之間的區別在於,如果任務在指定的超時時間之前沒有返回,前者將拋出TimeoutException。 3.2 使用cancel()取消Future 假設我們已經觸發了一項任務,但由於某種原因,我們不再關心結果了。我們可以使用Future.cancel(boolean)告訴執行程序停止操作並中斷其底層線程: Future<Integer> future = new SquareCalculator().calculate(4); boolean canceled = future.cancel(true); 從上面的代碼我們的Future實例永遠不會完成它的操作。實際上,如果我們嘗試從該實例調用get(),在調用cancel()之后,結果將是CancellationException。Future.isCancelled()將告訴我們Future是否已被取消。這對於避免獲取CancellationException非常有用。 對cancel()的調用可能會失敗。在這種情況下,其返回值將為false。請注意,cancel()接受一個布爾值作為參數 - 這將控制執行此任務的線程是否應該被中斷。 4.使用線程池進行更多多線程處理 我們當前的ExecutorService是單線程的,因為它是使用Executors.newSingleThreadExecutor獲得的。要突出顯示“單線程”,讓我們同時觸發兩個計算: SquareCalculator squareCalculator = new SquareCalculator(); Future<Integer> future1 = squareCalculator.calculate(10); Future<Integer> future2 = squareCalculator.calculate(100); while (!(future1.isDone() && future2.isDone())) { System.out.println( String.format( "future1 is %s and future2 is %s", future1.isDone() ? "done" : "not done", future2.isDone() ? "done" : "not done" ) ); Thread.sleep(300); } Integer result1 = future1.get(); Integer result2 = future2.get(); System.out.println(result1 + " and " + result2); squareCalculator.shutdown(); 現在讓我們分析一下這段代碼的輸出: calculating square for: 10 future1 is not done and future2 is not done future1 is not done and future2 is not done future1 is not done and future2 is not done future1 is not done and future2 is not done calculating square for: 100 future1 is done and future2 is not done future1 is done and future2 is not done future1 is done and future2 is not done 100 and 10000 很明顯,這個過程並不平行。注意第二個任務僅在第一個任務完成后才開始,使整個過程大約需要2秒鍾才能完成。 為了使我們的程序真正具有多線程,我們應該使用不同風格的ExecutorService。讓我們看一下如果我們使用工廠方法Executors.newFixedThreadPool()提供的線程池,我們的示例的行為會如何變化: public class SquareCalculator { private ExecutorService executor = Executors.newFixedThreadPool(2); //... } 通過對SquareCalculator類的簡單更改,我們現在有一個執行器,它可以使用2個同步線程。 如果我們再次運行完全相同的客戶端代碼,我們將獲得以下輸出: calculating square for: 10 calculating square for: 100 future1 is not done and future2 is not done future1 is not done and future2 is not done future1 is not done and future2 is not done future1 is not done and future2 is not done 100 and 10000 現在看起來好多了。注意2個任務如何同時開始和結束運行,整個過程大約需要1秒鍾才能完成。 還有其他工廠方法可用於創建線程池,例如Executors.newCachedThreadPool(),它們在可用時重用以前使用過的Thread,而Executors.newScheduledThreadPool() 則調度命令在給定的延遲后運行。 5. ForkJoinTask概述 ForkJoinTask是一個實現 Future的抽象類,能夠運行由 ForkJoinPool中的少量實際線程托管的大量任務。 在本節中,我們將快速介紹ForkJoinPool的主要特性。 ForkJoinTask的主要特征是它通常會產生新的子任務,作為完成其主要任務所需的工作的一部分。它通過調用fork()生成新任務,並使用join()收集所有結果,從而得到類的名稱。 有兩個實現ForkJoinTask的抽象類:RecursiveTask,它在完成時返回一個值,而RecursiveAction則不返回任何內容。顧名思義,這些類將用於遞歸任務,例如文件系統導航或復雜的數學計算。 讓我們擴展前面的例子來創建一個類,給定一個Integer,它將計算所有因子元素的和平方。因此,例如,如果我們將數字4傳遞給我們的計算器,我們應該得到4 + 3 + 2 + 1的總和為30的結果。 首先,我們需要創建RecursiveTask的具體實現並實現其compute()方法。這是我們編寫業務邏輯的地方: public class FactorialSquareCalculator extends RecursiveTask<Integer> { private Integer n; public FactorialSquareCalculator(Integer n) { this.n = n; } @Override protected Integer compute() { if (n <= 1) { return n; } FactorialSquareCalculator calculator = new FactorialSquareCalculator(n - 1); calculator.fork(); return n * n + calculator.join(); } } 注意我們如何通過在compute()中創建FactorialSquareCalculator的新實例來實現遞歸。通過調用fork(),一個非阻塞方法,我們要求ForkJoinPool啟動這個子任務的執行。 在join()方法從計算返回的結果,這是我們增加我們目前正在訪問數的平方。 現在我們只需要創建一個ForkJoinPool來處理執行和線程管理: ForkJoinPool forkJoinPool = new ForkJoinPool(); FactorialSquareCalculator calculator = new FactorialSquareCalculator(10); forkJoinPool.execute(calculator); 6.結論 在本文中,我們對Future接口進行了全面的了解,訪問了它的所有方法。我們還學習了如何利用線程池的強大功能來觸發多個並行操作。還簡要介紹了ForkJoinTask類,fork()和join()的主要方法。