fork/join使用
ForkJoinPool是ExecultorService接口的实现,它专为可以递归分解成小块的工作而设计
fork/join框架将任务分配给线程池中的工作线程,充分利用多处理器的优势,提高程序性能。

使用fork/join框架的第一步是编写一部分工作的代码。类似的伪代码如下:
如果(当前工作部分足够小)
直接做这项工作
其他
把当前工作分成两部分
调用这两部分并等待结果
将此代码包装在ForkJoinTask子类中,通常RecursiveTask(可以返回结果)或 RecursiveAction

关键点:分解任务fork出新任务,汇集join任务执行结果

@Service public class UserServiceForkJoin { // 本质是一个线程池,默认的线程数量:CPU的核数 ForkJoinPool forkJoinPool = new ForkJoinPool(10, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); @Autowired private RestTemplate restTemplate; /** * 查询多个系统的数据,合并返回 */ public Object getUserInfo(String userId) throws ExecutionException, InterruptedException { // 其他例子, 查数据库的多个表数据,分多次查询 // fork/join // forkJoinPool.submit() ArrayList<String> urls = new ArrayList<>(); urls.add("http://www.tony.com/userinfo-api/get?userId=" + userId); urls.add("http://www.tony.com/integral-api/get?userId=" + userId); HttpJsonRequest httpJsonRequest = new HttpJsonRequest(restTemplate, urls, 0, urls.size() - 1); ForkJoinTask<JSONObject> forkJoinTask = forkJoinPool.submit(httpJsonRequest); JSONObject result = forkJoinTask.get(); return result; } } // 任务 class HttpJsonRequest extends RecursiveTask<JSONObject> { RestTemplate restTemplate; ArrayList<String> urls; int start; int end; HttpJsonRequest(RestTemplate restTemplate, ArrayList<String> urls, int start, int end) { this.restTemplate = restTemplate; this.urls = urls; this.start = start; this.end = end; } // 就是实际去执行的一个方法入口(任务拆分) @Override protected JSONObject compute() { int count = end - start; // 代表当前这个task需要处理多少数据 // 自行根据业务场景去判断是否是大任务,是否需要拆分 if (count == 0) { String url = urls.get(start); // TODO 如果只有一个接口调用,立刻调用 long userinfoTime = System.currentTimeMillis(); String response = restTemplate.getForObject(url, String.class); JSONObject value = JSONObject.parseObject(response); System.out.println(Thread.currentThread() + " 接口调用完毕" + (System.currentTimeMillis() - userinfoTime) + " #" + url); return value; } else { // 如果是多个接口调用,拆分成子任务 7,8, 9,10 System.out.println(Thread.currentThread() + "任务拆分一次"); int x = (start + end) / 2; HttpJsonRequest httpJsonRequest = new HttpJsonRequest(restTemplate, urls, start, x);// 负责处理哪一部分? httpJsonRequest.fork(); HttpJsonRequest httpJsonRequest1 = new HttpJsonRequest(restTemplate, urls, x + 1, end);// 负责处理哪一部分? httpJsonRequest1.fork(); // join获取处理结果 JSONObject result = new JSONObject(); result.putAll(httpJsonRequest.join()); result.putAll(httpJsonRequest1.join()); return result; } } }
@RunWith(SpringRunner.class) @SpringBootTest(classes = WebsiteDemoApplication.class) public class WebsiteDemoApplicationTests { @Before public void start() { System.out.println("开始测试"); } @After public void end() { System.out.println("结束测试"); } @Test public void ex() { int x = 0, y = 100; System.out.println("结果" + (x + y)); int i = 1 / 0; } @Autowired UserService userService; //forkjoin实现 @Autowired UserServiceForkJoin userServiceForkJoin; //future实现 @Autowired UserServiceFutureTask userServiceFutureTask; //倒计时实现 @Autowired UserServiceCountLatch UserServiceCountLatch; @Test public void testUserSerivce() throws Exception { // 调用 long currentTimeMillis = System.currentTimeMillis(); // http 实际就是 线程 调用service Object userInfo = userServiceForkJoin.getUserInfo("chen"); System.out.println("getUserInfo总执行时间为" + (System.currentTimeMillis() - currentTimeMillis)); System.out.println(userInfo.toString()); } }
倒计时实现
/**
* 调用http接口
*/
@Service
public class UserServiceCountLatch {
ExecutorService executorService = Executors.newCachedThreadPool();
@Autowired
private RestTemplate restTemplate;
/**
* 查询多个系统的数据,合并返回
*/
public Object getUserInfo(String userId) throws InterruptedException {
CountDownLatch count = new CountDownLatch(2);
ArrayList<JSONObject> values = new ArrayList<>();
// 你可以封装成一个 提交URL 就能自动多线程调用的 工具
executorService.submit(() -> {
// 1. 先从调用获取用户基础信息的http接口
long userinfoTime = System.currentTimeMillis();
String value = restTemplate.getForObject("http://localhost:8080/Test002/TestServlet/get?userId=" + userId, String.class);
JSONObject userInfo = JSONObject.parseObject(value);
System.out.println("userinfo-api用户基本信息接口调用时间为" + (System.currentTimeMillis() - userinfoTime));
values.add(userInfo);
count.countDown();
});
executorService.submit(() -> {
// 2. 再调用获取用户积分信息的接口
long integralApiTime = System.currentTimeMillis();
String intergral = restTemplate.getForObject("http://localhost:8080/Test002/TestServlet/get?userId=" + userId,
String.class);
JSONObject intergralInfo = JSONObject.parseObject(intergral);
System.out.println("integral-api积分接口调用时间为" + (System.currentTimeMillis() - integralApiTime));
values.add(intergralInfo);
count.countDown();
});
count.await();// 等待计数器归零
// 3. 合并为一个json对象
JSONObject result = new JSONObject();
for (JSONObject value : values) {
result.putAll(value);
}
return result;
}
}
@RunWith(SpringRunner.class) @SpringBootTest(classes = WebsiteDemoApplication.class) public class WebsiteDemoApplicationTests { @Before public void start() { System.out.println("开始测试"); } @After public void end() { System.out.println("结束测试"); } @Test public void ex() { int x = 0, y = 100; System.out.println("结果" + (x + y)); int i = 1 / 0; } @Autowired UserService userService; //forkjoin实现 @Autowired UserServiceForkJoin userServiceForkJoin; //future实现 @Autowired UserServiceFutureTask userServiceFutureTask; //倒计时实现 @Autowired UserServiceCountLatch UserServiceCountLatch; @Test public void testUserSerivce() throws Exception { // 调用 long currentTimeMillis = System.currentTimeMillis(); // http 实际就是 线程 调用service Object userInfo = UserServiceCountLatch.getUserInfo("chen"); System.out.println("getUserInfo总执行时间为" + (System.currentTimeMillis() - currentTimeMillis)); System.out.println(userInfo.toString()); } }
FutureTask实现
Future表示异步计算的结果,提供了用于检查计算是否完成。等待计算完成以及获取结果的方法
import java.util.concurrent.*; import java.util.concurrent.locks.LockSupport; // 我们想一想,这个功能怎么实现 // (jdk本质,就是利用一些底层API,为开发人员提供便利) public class NeteaseFutureTask<T> implements Runnable, Future { // 获取 线程异步执行结果 的方式 Callable<T> callable; // 业务逻辑在callable里面 T result = null; volatile String state = "NEW"; // task执行状态 LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();// 定义一个存储等待者的集合 public NeteaseFutureTask(Callable<T> callable) { this.callable = callable; } @Override public void run() { try { result = callable.call(); } catch (Exception e) { e.printStackTrace(); // result = exception } finally { state = "END"; } // 唤醒等待者 Thread waiter = waiters.poll(); while (waiter != null) { LockSupport.unpark(waiter); waiter = waiters.poll(); // 继续取出队列中的等待者 } } // 返回结果, @Override public T get() { if ("END".equals(state)) { return result; } waiters.offer(Thread.currentThread()); // 加入到等待队列,线程不继续往下执行 while (!"END".equals(state)) { LockSupport.park(); // 线程通信的知识点 } // 如果没有结束,那么调用get方法的线程,就应该进入等待 return result; } @Override public boolean cancel(boolean mayInterruptIfRunning) { return false; } @Override public boolean isCancelled() { return false; } @Override public boolean isDone() { return false; } @Override public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return null; } }
import com.alibaba.fastjson.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import java.util.concurrent.*;
@Service
public class UserServiceFutureTask {
ExecutorService executorService = Executors.newCachedThreadPool();
@Autowired
private RestTemplate restTemplate;
/**
* 查询多个系统的数据,合并返回
*/
public Object getUserInfo(String userId) throws ExecutionException, InterruptedException {
// 其他例子, 查数据库的多个表数据,分多次查询
// 原味爱好
// Future < > Callable
// 1 和runnable一样的业务定义. 但是本质上是有区别的: 返回值 异常 call run.
Callable<JSONObject> callable = new Callable<JSONObject>() {
@Override
public JSONObject call() throws Exception {
// 1. 先从调用获取用户基础信息的http接口
long userinfoTime = System.currentTimeMillis();
String value = restTemplate.getForObject("http://localhost:8080/Test002/TestServlet/get?userId=" + userId, String.class);
JSONObject userInfo = JSONObject.parseObject(value);
System.out.println("userinfo-api用户基本信息接口调用时间为" + (System.currentTimeMillis() - userinfoTime));
return userInfo;
}
};
// 通过多线程运行callable
executorService.submit(callable);
NeteaseFutureTask<JSONObject> userInfoFutureTask = new NeteaseFutureTask<>(callable);
new Thread(userInfoFutureTask).start();
NeteaseFutureTask<JSONObject> intergralInfoTask = new NeteaseFutureTask(() -> {
// 2. 再调用获取用户积分信息的接口
long integralApiTime = System.currentTimeMillis();
String intergral = restTemplate.getForObject("http://localhost:8080/Test002/TestServlet/get?userId=" + userId,
String.class);
JSONObject intergralInfo = JSONObject.parseObject(intergral);
System.out.println("integral-api积分接口调用时间为" + (System.currentTimeMillis() - integralApiTime));
return intergralInfo;
});
new Thread(intergralInfoTask).start();
// 3. 合并为一个json对象
JSONObject result = new JSONObject();
result.putAll(userInfoFutureTask.get()); // 会等待任务执行结束
result.putAll(intergralInfoTask.get());
return result;
}
}
import com.study.thread.future.service.UserServiceCountLatch; import com.study.thread.future.service.UserServiceForkJoin; import com.study.thread.future.service.UserServiceFutureTask; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import com.study.thread.future.service.UserService; @RunWith(SpringRunner.class) @SpringBootTest(classes = WebsiteDemoApplication.class) public class WebsiteDemoApplicationTests { @Before public void start() { System.out.println("开始测试"); } @After public void end() { System.out.println("结束测试"); } @Test public void ex() { int x = 0, y = 100; System.out.println("结果" + (x + y)); int i = 1 / 0; } @Autowired UserService userService; //forkjoin实现 @Autowired UserServiceForkJoin userServiceForkJoin; //future实现 @Autowired UserServiceFutureTask userServiceFutureTask; //倒计时实现 @Autowired UserServiceCountLatch UserServiceCountLatch; @Test public void testUserSerivce() throws Exception { // 调用 long currentTimeMillis = System.currentTimeMillis(); // http 实际就是 线程 调用service Object userInfo = userServiceFutureTask.getUserInfo("chen"); System.out.println("getUserInfo总执行时间为" + (System.currentTimeMillis() - currentTimeMillis)); System.out.println(userInfo.toString()); } }
以上部分内容和图片来自网易云课堂
