fork/join使用
ForkJoinPool是ExecultorService接口的實現,它專為可以遞歸分解成小塊的工作而設計
fork/join框架將任務分配給線程池中的工作線程,充分利用多處理器的優勢,提高程序性能。
使用fork/join框架的第一步是編寫一部分工作的代碼。類似的偽代碼如下:
如果(當前工作部分足夠小)
直接做這項工作
其他
把當前工作分成兩部分
調用這兩部分並等待結果
將此代碼包裝在ForkJoinTask子類中,通常RecursiveTask(可以返回結果)或 RecursiveAction
關鍵點:分解任務fork出新任務,匯集join任務執行結果

@Service public class UserServiceForkJoin { // 本質是一個線程池,默認的線程數量:CPU的核數 ForkJoinPool forkJoinPool = new ForkJoinPool(10, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); @Autowired private RestTemplate restTemplate; /** * 查詢多個系統的數據,合並返回 */ public Object getUserInfo(String userId) throws ExecutionException, InterruptedException { // 其他例子, 查數據庫的多個表數據,分多次查詢 // fork/join // forkJoinPool.submit() ArrayList<String> urls = new ArrayList<>(); urls.add("http://www.tony.com/userinfo-api/get?userId=" + userId); urls.add("http://www.tony.com/integral-api/get?userId=" + userId); HttpJsonRequest httpJsonRequest = new HttpJsonRequest(restTemplate, urls, 0, urls.size() - 1); ForkJoinTask<JSONObject> forkJoinTask = forkJoinPool.submit(httpJsonRequest); JSONObject result = forkJoinTask.get(); return result; } } // 任務 class HttpJsonRequest extends RecursiveTask<JSONObject> { RestTemplate restTemplate; ArrayList<String> urls; int start; int end; HttpJsonRequest(RestTemplate restTemplate, ArrayList<String> urls, int start, int end) { this.restTemplate = restTemplate; this.urls = urls; this.start = start; this.end = end; } // 就是實際去執行的一個方法入口(任務拆分) @Override protected JSONObject compute() { int count = end - start; // 代表當前這個task需要處理多少數據 // 自行根據業務場景去判斷是否是大任務,是否需要拆分 if (count == 0) { String url = urls.get(start); // TODO 如果只有一個接口調用,立刻調用 long userinfoTime = System.currentTimeMillis(); String response = restTemplate.getForObject(url, String.class); JSONObject value = JSONObject.parseObject(response); System.out.println(Thread.currentThread() + " 接口調用完畢" + (System.currentTimeMillis() - userinfoTime) + " #" + url); return value; } else { // 如果是多個接口調用,拆分成子任務 7,8, 9,10 System.out.println(Thread.currentThread() + "任務拆分一次"); int x = (start + end) / 2; HttpJsonRequest httpJsonRequest = new HttpJsonRequest(restTemplate, urls, start, x);// 負責處理哪一部分? httpJsonRequest.fork(); HttpJsonRequest httpJsonRequest1 = new HttpJsonRequest(restTemplate, urls, x + 1, end);// 負責處理哪一部分? httpJsonRequest1.fork(); // join獲取處理結果 JSONObject result = new JSONObject(); result.putAll(httpJsonRequest.join()); result.putAll(httpJsonRequest1.join()); return result; } } }

@RunWith(SpringRunner.class) @SpringBootTest(classes = WebsiteDemoApplication.class) public class WebsiteDemoApplicationTests { @Before public void start() { System.out.println("開始測試"); } @After public void end() { System.out.println("結束測試"); } @Test public void ex() { int x = 0, y = 100; System.out.println("結果" + (x + y)); int i = 1 / 0; } @Autowired UserService userService; //forkjoin實現 @Autowired UserServiceForkJoin userServiceForkJoin; //future實現 @Autowired UserServiceFutureTask userServiceFutureTask; //倒計時實現 @Autowired UserServiceCountLatch UserServiceCountLatch; @Test public void testUserSerivce() throws Exception { // 調用 long currentTimeMillis = System.currentTimeMillis(); // http 實際就是 線程 調用service Object userInfo = userServiceForkJoin.getUserInfo("chen"); System.out.println("getUserInfo總執行時間為" + (System.currentTimeMillis() - currentTimeMillis)); System.out.println(userInfo.toString()); } }
倒計時實現
/** * 調用http接口 */ @Service public class UserServiceCountLatch { ExecutorService executorService = Executors.newCachedThreadPool(); @Autowired private RestTemplate restTemplate; /** * 查詢多個系統的數據,合並返回 */ public Object getUserInfo(String userId) throws InterruptedException { CountDownLatch count = new CountDownLatch(2); ArrayList<JSONObject> values = new ArrayList<>(); // 你可以封裝成一個 提交URL 就能自動多線程調用的 工具 executorService.submit(() -> { // 1. 先從調用獲取用戶基礎信息的http接口 long userinfoTime = System.currentTimeMillis(); String value = restTemplate.getForObject("http://localhost:8080/Test002/TestServlet/get?userId=" + userId, String.class); JSONObject userInfo = JSONObject.parseObject(value); System.out.println("userinfo-api用戶基本信息接口調用時間為" + (System.currentTimeMillis() - userinfoTime)); values.add(userInfo); count.countDown(); }); executorService.submit(() -> { // 2. 再調用獲取用戶積分信息的接口 long integralApiTime = System.currentTimeMillis(); String intergral = restTemplate.getForObject("http://localhost:8080/Test002/TestServlet/get?userId=" + userId, String.class); JSONObject intergralInfo = JSONObject.parseObject(intergral); System.out.println("integral-api積分接口調用時間為" + (System.currentTimeMillis() - integralApiTime)); values.add(intergralInfo); count.countDown(); }); count.await();// 等待計數器歸零 // 3. 合並為一個json對象 JSONObject result = new JSONObject(); for (JSONObject value : values) { result.putAll(value); } return result; } }

@RunWith(SpringRunner.class) @SpringBootTest(classes = WebsiteDemoApplication.class) public class WebsiteDemoApplicationTests { @Before public void start() { System.out.println("開始測試"); } @After public void end() { System.out.println("結束測試"); } @Test public void ex() { int x = 0, y = 100; System.out.println("結果" + (x + y)); int i = 1 / 0; } @Autowired UserService userService; //forkjoin實現 @Autowired UserServiceForkJoin userServiceForkJoin; //future實現 @Autowired UserServiceFutureTask userServiceFutureTask; //倒計時實現 @Autowired UserServiceCountLatch UserServiceCountLatch; @Test public void testUserSerivce() throws Exception { // 調用 long currentTimeMillis = System.currentTimeMillis(); // http 實際就是 線程 調用service Object userInfo = UserServiceCountLatch.getUserInfo("chen"); System.out.println("getUserInfo總執行時間為" + (System.currentTimeMillis() - currentTimeMillis)); System.out.println(userInfo.toString()); } }
FutureTask實現
Future表示異步計算的結果,提供了用於檢查計算是否完成。等待計算完成以及獲取結果的方法

import java.util.concurrent.*; import java.util.concurrent.locks.LockSupport; // 我們想一想,這個功能怎么實現 // (jdk本質,就是利用一些底層API,為開發人員提供便利) public class NeteaseFutureTask<T> implements Runnable, Future { // 獲取 線程異步執行結果 的方式 Callable<T> callable; // 業務邏輯在callable里面 T result = null; volatile String state = "NEW"; // task執行狀態 LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();// 定義一個存儲等待者的集合 public NeteaseFutureTask(Callable<T> callable) { this.callable = callable; } @Override public void run() { try { result = callable.call(); } catch (Exception e) { e.printStackTrace(); // result = exception } finally { state = "END"; } // 喚醒等待者 Thread waiter = waiters.poll(); while (waiter != null) { LockSupport.unpark(waiter); waiter = waiters.poll(); // 繼續取出隊列中的等待者 } } // 返回結果, @Override public T get() { if ("END".equals(state)) { return result; } waiters.offer(Thread.currentThread()); // 加入到等待隊列,線程不繼續往下執行 while (!"END".equals(state)) { LockSupport.park(); // 線程通信的知識點 } // 如果沒有結束,那么調用get方法的線程,就應該進入等待 return result; } @Override public boolean cancel(boolean mayInterruptIfRunning) { return false; } @Override public boolean isCancelled() { return false; } @Override public boolean isDone() { return false; } @Override public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return null; } }
import com.alibaba.fastjson.JSONObject; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.client.RestTemplate; import java.util.concurrent.*; @Service public class UserServiceFutureTask { ExecutorService executorService = Executors.newCachedThreadPool(); @Autowired private RestTemplate restTemplate; /** * 查詢多個系統的數據,合並返回 */ public Object getUserInfo(String userId) throws ExecutionException, InterruptedException { // 其他例子, 查數據庫的多個表數據,分多次查詢 // 原味愛好 // Future < > Callable // 1 和runnable一樣的業務定義. 但是本質上是有區別的: 返回值 異常 call run. Callable<JSONObject> callable = new Callable<JSONObject>() { @Override public JSONObject call() throws Exception { // 1. 先從調用獲取用戶基礎信息的http接口 long userinfoTime = System.currentTimeMillis(); String value = restTemplate.getForObject("http://localhost:8080/Test002/TestServlet/get?userId=" + userId, String.class); JSONObject userInfo = JSONObject.parseObject(value); System.out.println("userinfo-api用戶基本信息接口調用時間為" + (System.currentTimeMillis() - userinfoTime)); return userInfo; } }; // 通過多線程運行callable executorService.submit(callable); NeteaseFutureTask<JSONObject> userInfoFutureTask = new NeteaseFutureTask<>(callable); new Thread(userInfoFutureTask).start(); NeteaseFutureTask<JSONObject> intergralInfoTask = new NeteaseFutureTask(() -> { // 2. 再調用獲取用戶積分信息的接口 long integralApiTime = System.currentTimeMillis(); String intergral = restTemplate.getForObject("http://localhost:8080/Test002/TestServlet/get?userId=" + userId, String.class); JSONObject intergralInfo = JSONObject.parseObject(intergral); System.out.println("integral-api積分接口調用時間為" + (System.currentTimeMillis() - integralApiTime)); return intergralInfo; }); new Thread(intergralInfoTask).start(); // 3. 合並為一個json對象 JSONObject result = new JSONObject(); result.putAll(userInfoFutureTask.get()); // 會等待任務執行結束 result.putAll(intergralInfoTask.get()); return result; } }

import com.study.thread.future.service.UserServiceCountLatch; import com.study.thread.future.service.UserServiceForkJoin; import com.study.thread.future.service.UserServiceFutureTask; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import com.study.thread.future.service.UserService; @RunWith(SpringRunner.class) @SpringBootTest(classes = WebsiteDemoApplication.class) public class WebsiteDemoApplicationTests { @Before public void start() { System.out.println("開始測試"); } @After public void end() { System.out.println("結束測試"); } @Test public void ex() { int x = 0, y = 100; System.out.println("結果" + (x + y)); int i = 1 / 0; } @Autowired UserService userService; //forkjoin實現 @Autowired UserServiceForkJoin userServiceForkJoin; //future實現 @Autowired UserServiceFutureTask userServiceFutureTask; //倒計時實現 @Autowired UserServiceCountLatch UserServiceCountLatch; @Test public void testUserSerivce() throws Exception { // 調用 long currentTimeMillis = System.currentTimeMillis(); // http 實際就是 線程 調用service Object userInfo = userServiceFutureTask.getUserInfo("chen"); System.out.println("getUserInfo總執行時間為" + (System.currentTimeMillis() - currentTimeMillis)); System.out.println(userInfo.toString()); } }
以上部分內容和圖片來自網易雲課堂