fork/join使用
ForkJoinPool是ExecultorService接口的实现,它专为可以递归分解成小块的工作而设计
fork/join框架将任务分配给线程池中的工作线程,充分利用多处理器的优势,提高程序性能。
使用fork/join框架的第一步是编写一部分工作的代码。类似的伪代码如下:
如果(当前工作部分足够小)
直接做这项工作
其他
把当前工作分成两部分
调用这两部分并等待结果
将此代码包装在ForkJoinTask子类中,通常RecursiveTask(可以返回结果)或 RecursiveAction
关键点:分解任务fork出新任务,汇集join任务执行结果

@Service public class UserServiceForkJoin { // 本质是一个线程池,默认的线程数量:CPU的核数 ForkJoinPool forkJoinPool = new ForkJoinPool(10, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); @Autowired private RestTemplate restTemplate; /** * 查询多个系统的数据,合并返回 */ public Object getUserInfo(String userId) throws ExecutionException, InterruptedException { // 其他例子, 查数据库的多个表数据,分多次查询 // fork/join // forkJoinPool.submit() ArrayList<String> urls = new ArrayList<>(); urls.add("http://www.tony.com/userinfo-api/get?userId=" + userId); urls.add("http://www.tony.com/integral-api/get?userId=" + userId); HttpJsonRequest httpJsonRequest = new HttpJsonRequest(restTemplate, urls, 0, urls.size() - 1); ForkJoinTask<JSONObject> forkJoinTask = forkJoinPool.submit(httpJsonRequest); JSONObject result = forkJoinTask.get(); return result; } } // 任务 class HttpJsonRequest extends RecursiveTask<JSONObject> { RestTemplate restTemplate; ArrayList<String> urls; int start; int end; HttpJsonRequest(RestTemplate restTemplate, ArrayList<String> urls, int start, int end) { this.restTemplate = restTemplate; this.urls = urls; this.start = start; this.end = end; } // 就是实际去执行的一个方法入口(任务拆分) @Override protected JSONObject compute() { int count = end - start; // 代表当前这个task需要处理多少数据 // 自行根据业务场景去判断是否是大任务,是否需要拆分 if (count == 0) { String url = urls.get(start); // TODO 如果只有一个接口调用,立刻调用 long userinfoTime = System.currentTimeMillis(); String response = restTemplate.getForObject(url, String.class); JSONObject value = JSONObject.parseObject(response); System.out.println(Thread.currentThread() + " 接口调用完毕" + (System.currentTimeMillis() - userinfoTime) + " #" + url); return value; } else { // 如果是多个接口调用,拆分成子任务 7,8, 9,10 System.out.println(Thread.currentThread() + "任务拆分一次"); int x = (start + end) / 2; HttpJsonRequest httpJsonRequest = new HttpJsonRequest(restTemplate, urls, start, x);// 负责处理哪一部分? httpJsonRequest.fork(); HttpJsonRequest httpJsonRequest1 = new HttpJsonRequest(restTemplate, urls, x + 1, end);// 负责处理哪一部分? httpJsonRequest1.fork(); // join获取处理结果 JSONObject result = new JSONObject(); result.putAll(httpJsonRequest.join()); result.putAll(httpJsonRequest1.join()); return result; } } }

@RunWith(SpringRunner.class) @SpringBootTest(classes = WebsiteDemoApplication.class) public class WebsiteDemoApplicationTests { @Before public void start() { System.out.println("开始测试"); } @After public void end() { System.out.println("结束测试"); } @Test public void ex() { int x = 0, y = 100; System.out.println("结果" + (x + y)); int i = 1 / 0; } @Autowired UserService userService; //forkjoin实现 @Autowired UserServiceForkJoin userServiceForkJoin; //future实现 @Autowired UserServiceFutureTask userServiceFutureTask; //倒计时实现 @Autowired UserServiceCountLatch UserServiceCountLatch; @Test public void testUserSerivce() throws Exception { // 调用 long currentTimeMillis = System.currentTimeMillis(); // http 实际就是 线程 调用service Object userInfo = userServiceForkJoin.getUserInfo("chen"); System.out.println("getUserInfo总执行时间为" + (System.currentTimeMillis() - currentTimeMillis)); System.out.println(userInfo.toString()); } }
倒计时实现
/** * 调用http接口 */ @Service public class UserServiceCountLatch { ExecutorService executorService = Executors.newCachedThreadPool(); @Autowired private RestTemplate restTemplate; /** * 查询多个系统的数据,合并返回 */ public Object getUserInfo(String userId) throws InterruptedException { CountDownLatch count = new CountDownLatch(2); ArrayList<JSONObject> values = new ArrayList<>(); // 你可以封装成一个 提交URL 就能自动多线程调用的 工具 executorService.submit(() -> { // 1. 先从调用获取用户基础信息的http接口 long userinfoTime = System.currentTimeMillis(); String value = restTemplate.getForObject("http://localhost:8080/Test002/TestServlet/get?userId=" + userId, String.class); JSONObject userInfo = JSONObject.parseObject(value); System.out.println("userinfo-api用户基本信息接口调用时间为" + (System.currentTimeMillis() - userinfoTime)); values.add(userInfo); count.countDown(); }); executorService.submit(() -> { // 2. 再调用获取用户积分信息的接口 long integralApiTime = System.currentTimeMillis(); String intergral = restTemplate.getForObject("http://localhost:8080/Test002/TestServlet/get?userId=" + userId, String.class); JSONObject intergralInfo = JSONObject.parseObject(intergral); System.out.println("integral-api积分接口调用时间为" + (System.currentTimeMillis() - integralApiTime)); values.add(intergralInfo); count.countDown(); }); count.await();// 等待计数器归零 // 3. 合并为一个json对象 JSONObject result = new JSONObject(); for (JSONObject value : values) { result.putAll(value); } return result; } }

@RunWith(SpringRunner.class) @SpringBootTest(classes = WebsiteDemoApplication.class) public class WebsiteDemoApplicationTests { @Before public void start() { System.out.println("开始测试"); } @After public void end() { System.out.println("结束测试"); } @Test public void ex() { int x = 0, y = 100; System.out.println("结果" + (x + y)); int i = 1 / 0; } @Autowired UserService userService; //forkjoin实现 @Autowired UserServiceForkJoin userServiceForkJoin; //future实现 @Autowired UserServiceFutureTask userServiceFutureTask; //倒计时实现 @Autowired UserServiceCountLatch UserServiceCountLatch; @Test public void testUserSerivce() throws Exception { // 调用 long currentTimeMillis = System.currentTimeMillis(); // http 实际就是 线程 调用service Object userInfo = UserServiceCountLatch.getUserInfo("chen"); System.out.println("getUserInfo总执行时间为" + (System.currentTimeMillis() - currentTimeMillis)); System.out.println(userInfo.toString()); } }
FutureTask实现
Future表示异步计算的结果,提供了用于检查计算是否完成。等待计算完成以及获取结果的方法

import java.util.concurrent.*; import java.util.concurrent.locks.LockSupport; // 我们想一想,这个功能怎么实现 // (jdk本质,就是利用一些底层API,为开发人员提供便利) public class NeteaseFutureTask<T> implements Runnable, Future { // 获取 线程异步执行结果 的方式 Callable<T> callable; // 业务逻辑在callable里面 T result = null; volatile String state = "NEW"; // task执行状态 LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();// 定义一个存储等待者的集合 public NeteaseFutureTask(Callable<T> callable) { this.callable = callable; } @Override public void run() { try { result = callable.call(); } catch (Exception e) { e.printStackTrace(); // result = exception } finally { state = "END"; } // 唤醒等待者 Thread waiter = waiters.poll(); while (waiter != null) { LockSupport.unpark(waiter); waiter = waiters.poll(); // 继续取出队列中的等待者 } } // 返回结果, @Override public T get() { if ("END".equals(state)) { return result; } waiters.offer(Thread.currentThread()); // 加入到等待队列,线程不继续往下执行 while (!"END".equals(state)) { LockSupport.park(); // 线程通信的知识点 } // 如果没有结束,那么调用get方法的线程,就应该进入等待 return result; } @Override public boolean cancel(boolean mayInterruptIfRunning) { return false; } @Override public boolean isCancelled() { return false; } @Override public boolean isDone() { return false; } @Override public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return null; } }
import com.alibaba.fastjson.JSONObject; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.client.RestTemplate; import java.util.concurrent.*; @Service public class UserServiceFutureTask { ExecutorService executorService = Executors.newCachedThreadPool(); @Autowired private RestTemplate restTemplate; /** * 查询多个系统的数据,合并返回 */ public Object getUserInfo(String userId) throws ExecutionException, InterruptedException { // 其他例子, 查数据库的多个表数据,分多次查询 // 原味爱好 // Future < > Callable // 1 和runnable一样的业务定义. 但是本质上是有区别的: 返回值 异常 call run. Callable<JSONObject> callable = new Callable<JSONObject>() { @Override public JSONObject call() throws Exception { // 1. 先从调用获取用户基础信息的http接口 long userinfoTime = System.currentTimeMillis(); String value = restTemplate.getForObject("http://localhost:8080/Test002/TestServlet/get?userId=" + userId, String.class); JSONObject userInfo = JSONObject.parseObject(value); System.out.println("userinfo-api用户基本信息接口调用时间为" + (System.currentTimeMillis() - userinfoTime)); return userInfo; } }; // 通过多线程运行callable executorService.submit(callable); NeteaseFutureTask<JSONObject> userInfoFutureTask = new NeteaseFutureTask<>(callable); new Thread(userInfoFutureTask).start(); NeteaseFutureTask<JSONObject> intergralInfoTask = new NeteaseFutureTask(() -> { // 2. 再调用获取用户积分信息的接口 long integralApiTime = System.currentTimeMillis(); String intergral = restTemplate.getForObject("http://localhost:8080/Test002/TestServlet/get?userId=" + userId, String.class); JSONObject intergralInfo = JSONObject.parseObject(intergral); System.out.println("integral-api积分接口调用时间为" + (System.currentTimeMillis() - integralApiTime)); return intergralInfo; }); new Thread(intergralInfoTask).start(); // 3. 合并为一个json对象 JSONObject result = new JSONObject(); result.putAll(userInfoFutureTask.get()); // 会等待任务执行结束 result.putAll(intergralInfoTask.get()); return result; } }

import com.study.thread.future.service.UserServiceCountLatch; import com.study.thread.future.service.UserServiceForkJoin; import com.study.thread.future.service.UserServiceFutureTask; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import com.study.thread.future.service.UserService; @RunWith(SpringRunner.class) @SpringBootTest(classes = WebsiteDemoApplication.class) public class WebsiteDemoApplicationTests { @Before public void start() { System.out.println("开始测试"); } @After public void end() { System.out.println("结束测试"); } @Test public void ex() { int x = 0, y = 100; System.out.println("结果" + (x + y)); int i = 1 / 0; } @Autowired UserService userService; //forkjoin实现 @Autowired UserServiceForkJoin userServiceForkJoin; //future实现 @Autowired UserServiceFutureTask userServiceFutureTask; //倒计时实现 @Autowired UserServiceCountLatch UserServiceCountLatch; @Test public void testUserSerivce() throws Exception { // 调用 long currentTimeMillis = System.currentTimeMillis(); // http 实际就是 线程 调用service Object userInfo = userServiceFutureTask.getUserInfo("chen"); System.out.println("getUserInfo总执行时间为" + (System.currentTimeMillis() - currentTimeMillis)); System.out.println(userInfo.toString()); } }
以上部分内容和图片来自网易云课堂