ForkJoinPool 是jdk1.7 由Doug Lea 寫的實現 遞歸調用任務拆分,合並,的線程池。
代碼示例:
package www.itbac.com; import com.alibaba.fastjson.JSONObject; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.client.RestTemplate; import java.util.ArrayList; import java.util.concurrent.*; /** * 並行調用http接口 */ @Service public class UserServiceForkJoin { // 本質是一個線程池,默認的線程數量:CPU的核數 ForkJoinPool forkJoinPool = new ForkJoinPool(10, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); @Autowired private RestTemplate restTemplate; /** * 查詢多個系統的數據,合並返回 */ public Object getUserInfo(String userId) throws ExecutionException, InterruptedException { // 其他例子, 查數據庫的多個表數據,分多次查詢 // fork/join // forkJoinPool.submit() ArrayList<String> urls = new ArrayList<>(); urls.add("http://www.itbac.com/userinfo-api/get?userId=" + userId); urls.add("http://www.itbac.com/integral-api/get?userId=" + userId); HttpJsonRequest httpJsonRequest = new HttpJsonRequest(restTemplate, urls, 0, urls.size() - 1); ForkJoinTask<JSONObject> forkJoinTask = forkJoinPool.submit(httpJsonRequest); JSONObject result = forkJoinTask.get(); return result; } } // 自定義任務類, 繼承遞歸任務。 class HttpJsonRequest extends RecursiveTask<JSONObject> { RestTemplate restTemplate; ArrayList<String> urls; int start; int end; HttpJsonRequest(RestTemplate restTemplate, ArrayList<String> urls, int start, int end) { this.restTemplate = restTemplate; this.urls = urls; this.start = start; this.end = end; } // 就是實際去執行的一個方法入口(任務拆分) @Override protected JSONObject compute() { int count = end - start; // 代表當前這個task需要處理多少數據 // 自行根據業務場景去判斷是否是大任務,是否需要拆分 if (count == 0) { String url = urls.get(start); // TODO 如果只有一個接口調用,立刻調用 long userinfoTime = System.currentTimeMillis(); String response = restTemplate.getForObject(url, String.class); JSONObject value = JSONObject.parseObject(response); System.out.println(Thread.currentThread() + " 接口調用完畢" + (System.currentTimeMillis() - userinfoTime) + " #" + url); return value; } else { // 如果是多個接口調用,拆分成子任務 7,8, 9,10 System.out.println(Thread.currentThread() + "任務拆分一次"); //求中間值。 int x = (start + end) / 2; //任務從開始,到中間值。 HttpJsonRequest httpJsonRequest = new HttpJsonRequest(restTemplate, urls, start, x);// 負責處理哪一部分? //fork拆分任務。 httpJsonRequest.fork(); //任務從中間值+1 ,到結束。 HttpJsonRequest httpJsonRequest1 = new HttpJsonRequest(restTemplate, urls, x + 1, end);// 負責處理哪一部分? httpJsonRequest1.fork(); // join獲取處理結果 JSONObject result = new JSONObject(); //join合並結果。 result.putAll(httpJsonRequest.join()); result.putAll(httpJsonRequest1.join()); return result; } } }
就是把任務拆分,交給線程池執行,再合並。與Future的獲取返回值有點相似。只是對任務拆分做了抽象封裝。
特點:
線程池 ThreadPoolExecutor 中只維護了一個隊列。多線程去隊列中爭搶任務來執行。
而ForkJoinPool 是每一個大任務是維護一個隊列,fork拆分出的小任務也是在自己隊列中。一個線程去處理自己隊列中的任務,此時,沒有線程爭搶,效率比線程池要高。
該線程把當前自己的隊列處理完了,就去和其他線程爭搶其他隊列的任務來處理,這個術語叫工作竊取work-stealing .
ForkJoinPool 維護了多個隊列,ThreadPoolExecutor只維護了一個隊列,通過多個隊列來減少線程爭搶,從而提高了效率。
但是:
每個worker線程都維護一個任務隊列,ForkJoinWorkerThread中的任務隊列。當這個worker線程處理完自己隊列的任務,會隨機從其他的worker的隊列中拿走一個任務執行(工作竊取:work-stealing )。
如果所有worker線程都很忙,大家都沒有工作竊取,那就是單線程處理完整個任務隊列。對於請求方而言,本次任務拆分,並沒有提高響應的效率?
而且,如果任務拆分太細,遞歸調用太深,這個拆分,合並,的過程,也是消耗性能的。
結語:
ForkJoinPool的工作竊取帶來的性能提升偏理論,API的源碼復雜度較高,實際研發中可控性來說不如其他API ,謹慎使用。