Future任務機制和FutureTask
化繁為簡,分而治之,遞歸的分解和合並,直到任務小到可以接受的程度。
Fork/Join 框架是Java7提供的一個用於並行執行任務的框架。
是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的框架。
Fork 就是把大任務切分為若干子任務並行的執行
Join 就是合並這些子任務的執行結果,最后得到這個大任務的結果。
1 public interface Callable<V> { 2 3 // 只有一個獲得返回結果的方法,實現這個方法即可 4 5 V call() throws Exception; 6 }
Future類就是對於具體的Runnable或者Callable任務的執行結果進行取消、查詢是否完成、獲取結果。
必要時,通過get方法獲取執行結果,該方法會阻塞直到任務返回結果。
package路徑:java.util.concurrent。也是一個接口。
1 public interface Future<V> { 2 3 // 1. 如果取消成功則返回true, 取消失敗則返回false 4 // 2. mayInterruptIfRunning 表示是否允許取消正在執行卻沒有完畢的任務,如果設置true, 則取消。 5 // 3. 如果任務完成, mayInterruptIfRunning為true,還是false, 都返回false。 6 // 4. 如果取消已經完成的任務返回false 7 // 5. 如果任務正在執行,若mayInterruptIfRunning設置為true, 則返回true。若mayInterruptIfRunning設置為false, 則返回false。 8 // 6. 如果任務還沒有執行,則無論mayInterruptIfRunning設置為true還是false, 肯定返回true 9 boolean cancel(boolean mayInterruptIfRunning); 10 11 // 表示任務是否被取消成功,如果在任務正常完成前被取消,則返回ture 12 boolean isCanceled(); 13 14 // 表示任務是否已經完成, 若任務完成, 則返回true。 15 boolean isDone(); 16 17 // 用來獲取執行結果, 這個方法會產生阻塞, 會一直等到任務執行完畢才返回 18 V get() throws InterruptedException, ExecutionException; 19 20 // 用來獲取執行結果, 如果在指定時間內, 還沒獲取到結果, 就直接返回null 21 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; 22 }
Future 提供了三種功能:
(1)判斷任務是否完成
(2)能夠中斷任務
(3)能夠獲取任務執行結果
FutureTask是Future接口的一個唯一實現類。
public class FutureTask implements RunnableFuture<V> { } // RunnableFuture 接口的實現 public interface RunnableFuture <V>extends Runnable, Future<V> { void run(); }
FutureTask實現了RunnableFuture,所以它既可以作為Runnable被線程執行,又可以作為Future得到Callback的返回值。
兩個構造函數:
1 public FutureTask(Callable<V> callable) { 2 // 創建一個FutureTask, 一旦運行就執行給定的Callback。 3 } 4 5 public FutureTask(Runnable runnable, V result) { 6 // 創建一個FutureTask, 一旦運行就執行給定的runnable, 並安排成功完成時,get返回給定的結果。 7 }
接下來的Fork/Join就是基於Future實現的:
1 class demo { 2 3 public static void main(String[] args) throws InterruptedException, ExecutionException { 4 5 FoundTask task1 = new FoundTask("Thread Found Name"); 6 FutureTask<String> f1 = new FutureTask<String>(task1); 7 8 new Thread(f1).start(); 9 10 System.out.printf(f1.get()); 11 12 FutureTask<Integer> f2 = new FutureTask<Integer>(new FoundRun(), 2); 13 14 new Thread(f2).start(); 15 System.out.printf("result-" + f2.get()); 16 } 17 } 18 19 class FoundTask implements Callable<String> { 20 21 private String mName = null; 22 23 FoundTask(String name) { 24 mName = name; 25 } 26 27 @Override 28 public String call() throws Exception { 29 30 Thread.sleep(1000); 31 32 System.out.printf(mName + "finish the task"); 33 34 return "result-1"; 35 } 36 } 37 38 class FoundRun implements Runnable { 39 40 @Override 41 public void run() { 42 43 try { 44 Thread.sleep(1000); 45 } catch (InterruptedException e) { 46 e.printStackTrace(); 47 } 48 49 System.out.printf("FoundRun finish"); 50 } 51 }
運行結果:
1 Thread Found Name finish the task 2 result-1 3 FoundRun finish 4 result-2
按照預想,按順序,按步驟的執行。
使用Fork/Join框架,首先要考慮到的是如何分割任務:
e.g.
1 class demo { 2 3 public static void main(String[] args) throws InterruptedException, ExecutionException { 4 5 ForkJoinPool forkJoinPool = new ForkJoinPool(); 6 7 CountTask task = new CountTask(1, 5); 8 9 Future<Integer> result = forkJoinPool.submit(task); 10 11 System.out.printf("1-5最終相加的結果:" + result.get()); 12 13 CountTask task2 = new CountTask(1, 100); 14 15 Future<Integer> result2 = forkJoinPool.submit(task2); 16 17 System.out.printf("1-100最終相加的結果:" + result2.get()); 18 19 // end 20 } 21 } 22 23 class CountTask extends RecursiveTask<Integer> { 24 25 private static final long serialVersionUID = 3336021421713606929L; 26 27 private static int splitSize = 2; 28 private int mStart, mEnd; 29 30 public CountTask(int start, int end) { 31 mStart = start; 32 mEnd = end; 33 } 34 35 36 @Override 37 protected Integer compute() { 38 int sum = 0; 39 // 如果任務已經不需要再拆分了就開始計算。 40 boolean canCompute = (mEnd - mStart) <= splitSize; 41 42 if (canCompute) { 43 44 for (int i = mStart; i <= mEnd; i++) { 45 46 sum = sum + i; 47 } 48 } else { 49 50 // 拆分兩個字任務。 51 int middle = (mStart - mEnd) / 2; 52 CountTask firstTask = new CountTask(mStart, middle); 53 CountTask secondTask = new CountTask(middle + 1, mEnd); 54 55 firstTask.fork(); // 開始執行 56 secondTask.fork(); 57 58 int firstResult = firstTask.join(); // 獲得第一個子任務結果,得不到結果,此線程不會往下面執行。 59 int secondResult = secondTask.join(); 60 61 // 合並兩個子任務的結果。 62 sum = firstResult + secondResult; 63 } 64 65 return sum; 66 } 67 }
運行結果:
1 1-5最終相加的結果:15 2 1-100最終相加的結果::5050
Fork/Join模式優缺點及應用場景。
注意:分拆的對象過多時,小心一下子把內存撐滿了,等待線程的CPU資源釋放了,但是線程對象等待時,不會被垃圾機制回收。
場景:
對於樹形結構類型的數據的處理和遍歷非常適合。
e.g. 我們要對一個靜態資源服務器的圖片文件目錄進行遍歷和分析的時候,我們需要遞歸的統計每個目錄下的文件數量,最后匯總,非常適合用分叉/結合框架來處理。
1 class demo { 2 3 public static void main(String[] args) throws InterruptedException, ExecutionException { 4 5 Integer count = new ForkJoinPool().invoke(new CountingTask(Paths.get("D://fish"))); 6 7 System.out.printf("D:盤fish下面總文件數量:" + count); 8 9 // end 10 } 11 } 12 13 // 處理單個目錄的任務 14 class CountingTask extends RecursiveTask<Integer> { 15 16 private Path mDir; 17 public CountingTask (Path dir) { 18 mDir = dir; 19 } 20 21 22 @Override 23 protected Integer compute() { 24 25 int count = 0; 26 List<CountingTask> subTasks = new ArrayList<>(); 27 28 // 讀取目錄dir的子路徑。 29 try { 30 31 DirectoryStream<Path> ds = Files.newDirectoryStream(dir); 32 33 for (Path subPath : ds) { 34 35 if (Files.isDirectory(subPath, LinkOption.NOFOLLOW_LINKS)) { 36 37 // 對每個子目錄都新建一個子任務 38 subTasks.add(new CountingTask(subPath)); 39 } else { 40 41 // 遇到文件,則計數器增加1 42 count ++; 43 } 44 } 45 46 if (!subTasks.isEmpty()) { 47 // 在當前的ForkJoinPool上調度所有的子任務 48 for (CountingTask subTask : invokeAll(subTasks)) { 49 50 count += subTask.join(); 51 } 52 } 53 54 } catch (IOException ex) { 55 return 0; 56 } 57 } 58 }
運行結果:運算速度還是非常快的,但是一旦文件多了,也是非常耗資源的,電腦就會出現卡頓的情況
1 D:盤fish下面總文件數量:7647