介紹:
a . Fork/Join為JKD1.7引入,適用於對大量數據進行拆分成多個小任務進行計算的框架,最后把所有小任務的結果匯總合並得到最終的結果
b . 相關類
public abstract class RecursiveTask<V> extends ForkJoinTask<V>; public abstract class RecursiveAction extends ForkJoinTask<Void>;
c . 其中RecursiveTask在執行有返回值的任務時使用,RecursiveAction在執行沒有返回值的任務時使用
實例代碼:
ForkJoin有參無返回值
參數:map
public class UpdatePlayersTotalTimeTask extends RecursiveAction { private static final int THRESHOLD_NUM = 30;//定義任務的切分閥值 private Map<String,String> players; private PlayerTotalTimeService playerTotalTimeService; private MongoDao mongoDao; public UpdatePlayersTotalTimeTask(Map<String,String> players, MongoDao mongoDao) { this.players = players; this.mongoDao = mongoDao; } @Override protected void compute() { boolean canCompute = players.size() <= THRESHOLD_NUM; if (canCompute) { playerTotalTimeService = new PlayerTotalTimeService(); playerTotalTimeService.updatePlayersTotalTime(players, mongoDao); } else {
// 將任務一份為二 int middle = players.size() / 2; int i = 0; Map<String, String> leftMap = new HashMap<>(); Map<String, String> rightMap = new HashMap<>(); for (Map.Entry<String, String> entry : players.entrySet()) { if (i < middle) { leftMap.put(entry.getKey(), entry.getValue()); } else { rightMap.put(entry.getKey(), entry.getValue()); } i++; } UpdatePlayersTotalTimeTask leftTask = new UpdatePlayersTotalTimeTask(leftMap, mongoDao); UpdatePlayersTotalTimeTask rightTask = new UpdatePlayersTotalTimeTask(rightMap, mongoDao); // 執行子任務 leftTask.fork(); rightTask.fork(); } }
//調用測試 public static void main(String[] args) throws InterruptedException { // 創建包含Runtime.getRuntime().availableProcessors()返回值作為個數的並行線程的ForkJoinPool ForkJoinPool forkjoinPool = new ForkJoinPool(); Map<String, String> map = new HashMap<>();
map.put("key1","value1"); MongoDao mongoDao = new MongoDaoImpl(); //生成一個計算任務 UpdatePlayersTotalTimeTask task = new UpdatePlayersTotalTimeTask(map, mongoDao); // 提交可分解的PrintTask任務 forkjoinPool.excute(task); forkjoinPool.awaitTermination(2, TimeUnit.SECONDS);//阻塞當前線程直到 ForkJoinPool 中所有的任務都執行結束 // 關閉線程池 forkjoinPool.shutdown(); } }
ForkJoin有參有返回值 (繼承RecursiveTask<T>類)
參數:set<String> 返回值:map<String,String>
1 public class CalcRoomPlayersTotalTimeTask extends RecursiveTask<Map<String,String>> { 2 private static final int THRESHOLD_NUM = 15; 3 private Set<String> roomSet; 4 private PlayerTotalTimeService playerTotalTimeService; 5 private MongoDao mongoDao; 6 7 public CalcRoomPlayersTotalTimeTask(Set<String> roomSet, MongoDao mongoDao) { 8 this.roomSet = roomSet; 9 this.mongoDao = mongoDao; 10 } 11 12 @Override 13 protected Map<String,String> compute() { 14 15 playerTotalTimeService = new PlayerTotalTimeService(); 16 //如果任務足夠小就計算任務 17 boolean canCompute = roomSet.size() <= THRESHOLD_NUM; 18 if (canCompute) { 19 20 Map<String,String> playersResult = new HashMap<>(); 21 22 playersResult = playerTotalTimeService.calcRoomPlayersTotalTime(roomSet, mongoDao); 23 24 return playersResult; 25 } else { 26 // 如果任務大於閾值,就分裂成兩個子任務計算 27 long middle = roomSet.size() / 2; 28 Set<String> leftSet = new HashSet<>(); 29 Set<String> rightSet = new HashSet<>(); 30 31 long i = 0; 32 for (String room : roomSet) { 33 if (i < middle) { 34 leftSet.add(room); 35 } else { 36 rightSet.add(room); 37 } 38 i++; 39 } 40 41 CalcRoomPlayersTotalTimeTask leftTask = new CalcRoomPlayersTotalTimeTask(leftSet,mongoDao); 42 CalcRoomPlayersTotalTimeTask rightTask = new CalcRoomPlayersTotalTimeTask(rightSet,mongoDao); 43 44 // 執行子任務 45 invokeAll(leftTask,rightTask); 46 47 HashMap<String,String> result = new HashMap<>(); 48 49 Map<String,String> leftResult = leftTask.join(); 50 Map<String,String> rightResult = rightTask.join(); 51 52 result.putAll(leftResult); 53 54 for (Map.Entry<String, String> entry : rightResult.entrySet()) { 55 boolean contains = result.containsKey(entry.getKey()); 56 if(contains){ 57 String playerTotalTimeStr = entry.getValue(); 58 Long playerTotalTimeLong = playerTotalTimeService.timeStringToLong(result.get(entry.getKey())) + playerTotalTimeService.timeStringToLong(playerTotalTimeStr); 59 playerTotalTimeStr = playerTotalTimeService.timeLongToString(playerTotalTimeLong); 60 result.put(entry.getKey(),playerTotalTimeStr); 61 }else { 62 result.put(entry.getKey(),entry.getValue()); 63 } 64 } 65 66 return result; 67 } 68 } 69 70 public static void main(String[] args) throws InterruptedException { 71 // 創建包含Runtime.getRuntime().availableProcessors()返回值作為個數的並行線程的ForkJoinPool 72 ForkJoinPool forkjoinPool = new ForkJoinPool(); 73 74 Set<String> roomSet = new HashSet<>(); 75 Map<String,String> map = new HashMap<String,String>(); 76 MongoDao mongoDao = new MongoDaoImpl(); 77 //生成一個計算任務 78 CalcRoomPlayersTotalTimeTask task = new CalcRoomPlayersTotalTimeTask(roomSet, mongoDao); 79 80 // 提交可分解的PrintTask任務
map = forkjoinPool.invoke(task); 83 forkjoinPool.awaitTermination(2, TimeUnit.SECONDS);//阻塞當前線程直到 ForkJoinPool 中所有的任務都執行結束 84 // 關閉線程池 85 forkjoinPool.shutdown(); 86 } 87 }
說明:
a .在有大量計算任務時,此框架方法可進行並行計算效率高,以上示例,可以根據具體的業務需求更改屬性及相關方法用於匹配自己的業務邏輯
b .JDK1.8后由於加入Stream流的操作,集合框架可以使用Collection<E> default Stream<E> parallelStream()的方法轉換成並行流進行計算,此時效果與Fork/Join任務同效
c .ForkJoinPool中的多種方法
1 public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task);//等待獲取結果 2 public void execute(ForkJoinTask<?> task);//異步執行 3 public <T> T invoke(ForkJoinTask<T> task);//執行,獲取Future
d .ForkJoinTask在執行的時候可能會拋出異常,但是沒辦法在主線程里直接捕獲異常,所以ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務是否已經拋出異常或已經被取消了,並且可以通過ForkJoinTask的getException方法獲取異常。getException方 法返回Throwable對象,如果任務被取消了則返回CancellationException。如果任務沒有完成或者沒有拋出異常則返回null。
if(task.isCompletedAbnormally()) { System.out.println(task.getException()); }