拆分任务,多线程处理,等待返回总和结果(fork/join、CountDownLatch、FutureTask)


fork/join使用

ForkJoinPool是ExecultorService接口的实现,它专为可以递归分解成小块的工作而设计

fork/join框架将任务分配给线程池中的工作线程,充分利用多处理器的优势,提高程序性能。

使用fork/join框架的第一步是编写一部分工作的代码。类似的伪代码如下:

如果(当前工作部分足够小)

  直接做这项工作

其他

  把当前工作分成两部分

调用这两部分并等待结果

将此代码包装在ForkJoinTask子类中,通常RecursiveTask(可以返回结果)或  RecursiveAction

关键点:分解任务fork出新任务,汇集join任务执行结果

 

@Service
public class UserServiceForkJoin {
    // 本质是一个线程池,默认的线程数量:CPU的核数
    ForkJoinPool forkJoinPool = new ForkJoinPool(10, ForkJoinPool.defaultForkJoinWorkerThreadFactory,
            null, true);
    @Autowired
    private RestTemplate restTemplate;

    /**
     * 查询多个系统的数据,合并返回
     */
    public Object getUserInfo(String userId) throws ExecutionException, InterruptedException {
        // 其他例子, 查数据库的多个表数据,分多次查询
        // fork/join
        // forkJoinPool.submit()
        ArrayList<String> urls = new ArrayList<>();
        urls.add("http://www.tony.com/userinfo-api/get?userId=" + userId);
        urls.add("http://www.tony.com/integral-api/get?userId=" + userId);

        HttpJsonRequest httpJsonRequest = new HttpJsonRequest(restTemplate, urls, 0, urls.size() - 1);
        ForkJoinTask<JSONObject> forkJoinTask = forkJoinPool.submit(httpJsonRequest);

        JSONObject result = forkJoinTask.get();
        return result;
    }
}

// 任务
class HttpJsonRequest extends RecursiveTask<JSONObject> {

    RestTemplate restTemplate;
    ArrayList<String> urls;
    int start;
    int end;

    HttpJsonRequest(RestTemplate restTemplate, ArrayList<String> urls, int start, int end) {
        this.restTemplate = restTemplate;
        this.urls = urls;
        this.start = start;
        this.end = end;
    }

    // 就是实际去执行的一个方法入口(任务拆分)
    @Override
    protected JSONObject compute() {
        int count = end - start; // 代表当前这个task需要处理多少数据
        // 自行根据业务场景去判断是否是大任务,是否需要拆分
        if (count == 0) {
            String url = urls.get(start);
            // TODO 如果只有一个接口调用,立刻调用
            long userinfoTime = System.currentTimeMillis();
            String response = restTemplate.getForObject(url, String.class);
            JSONObject value = JSONObject.parseObject(response);
            System.out.println(Thread.currentThread() + " 接口调用完毕" + (System.currentTimeMillis() - userinfoTime) + " #" + url);
            return value;
        } else { // 如果是多个接口调用,拆分成子任务  7,8,   9,10
            System.out.println(Thread.currentThread() + "任务拆分一次");
            int x = (start + end) / 2;
            HttpJsonRequest httpJsonRequest = new HttpJsonRequest(restTemplate, urls, start, x);// 负责处理哪一部分?
            httpJsonRequest.fork();

            HttpJsonRequest httpJsonRequest1 = new HttpJsonRequest(restTemplate, urls, x + 1, end);// 负责处理哪一部分?
            httpJsonRequest1.fork();

            // join获取处理结果
            JSONObject result = new JSONObject();
            result.putAll(httpJsonRequest.join());
            result.putAll(httpJsonRequest1.join());
            return result;
        }
    }
}
示例代码
@RunWith(SpringRunner.class)
@SpringBootTest(classes = WebsiteDemoApplication.class)
public class WebsiteDemoApplicationTests {

    @Before
    public void start() {
        System.out.println("开始测试");
    }

    @After
    public void end() {
        System.out.println("结束测试");
    }

    @Test
    public void ex() {
        int x = 0, y = 100;
        System.out.println("结果" + (x + y));
        int i = 1 / 0;
    }

    @Autowired
    UserService userService;

    //forkjoin实现
    @Autowired
    UserServiceForkJoin userServiceForkJoin;
    //future实现
    @Autowired
    UserServiceFutureTask userServiceFutureTask;
    //倒计时实现
    @Autowired
    UserServiceCountLatch UserServiceCountLatch;

    @Test
    public void testUserSerivce() throws Exception {
        // 调用
        long currentTimeMillis = System.currentTimeMillis();
        // http 实际就是 线程 调用service
        Object userInfo = userServiceForkJoin.getUserInfo("chen");

        System.out.println("getUserInfo总执行时间为" + (System.currentTimeMillis() - currentTimeMillis));
        System.out.println(userInfo.toString());
    }

}
测试函数

倒计时实现

/**
 * 调用http接口
 */
@Service
public class UserServiceCountLatch {
    ExecutorService executorService = Executors.newCachedThreadPool();

    @Autowired
    private RestTemplate restTemplate;

    /**
     * 查询多个系统的数据,合并返回
     */
    public Object getUserInfo(String userId) throws InterruptedException {
        CountDownLatch count = new CountDownLatch(2);
        ArrayList<JSONObject> values = new ArrayList<>();
        // 你可以封装成一个 提交URL 就能自动多线程调用的 工具
            executorService.submit(() -> {
                // 1. 先从调用获取用户基础信息的http接口
                long userinfoTime = System.currentTimeMillis();
                String value = restTemplate.getForObject("http://localhost:8080/Test002/TestServlet/get?userId=" + userId, String.class);
                JSONObject userInfo = JSONObject.parseObject(value);
                System.out.println("userinfo-api用户基本信息接口调用时间为" + (System.currentTimeMillis() - userinfoTime));
                values.add(userInfo);
                count.countDown();
            });
            executorService.submit(() -> {
                // 2. 再调用获取用户积分信息的接口
                long integralApiTime = System.currentTimeMillis();
                String intergral = restTemplate.getForObject("http://localhost:8080/Test002/TestServlet/get?userId=" + userId,
                        String.class);
                JSONObject intergralInfo = JSONObject.parseObject(intergral);
                System.out.println("integral-api积分接口调用时间为" + (System.currentTimeMillis() - integralApiTime));
                values.add(intergralInfo);
                count.countDown();
        });

        count.await();// 等待计数器归零

        // 3. 合并为一个json对象
        JSONObject result = new JSONObject();
        for (JSONObject value : values) {
            result.putAll(value);
        }
        return result;
    }
}
@RunWith(SpringRunner.class)
@SpringBootTest(classes = WebsiteDemoApplication.class)
public class WebsiteDemoApplicationTests {

    @Before
    public void start() {
        System.out.println("开始测试");
    }

    @After
    public void end() {
        System.out.println("结束测试");
    }

    @Test
    public void ex() {
        int x = 0, y = 100;
        System.out.println("结果" + (x + y));
        int i = 1 / 0;
    }

    @Autowired
    UserService userService;

    //forkjoin实现
    @Autowired
    UserServiceForkJoin userServiceForkJoin;
    //future实现
    @Autowired
    UserServiceFutureTask userServiceFutureTask;
    //倒计时实现
    @Autowired
    UserServiceCountLatch UserServiceCountLatch;

    @Test
    public void testUserSerivce() throws Exception {
        // 调用
        long currentTimeMillis = System.currentTimeMillis();
        // http 实际就是 线程 调用service
        Object userInfo = UserServiceCountLatch.getUserInfo("chen");

        System.out.println("getUserInfo总执行时间为" + (System.currentTimeMillis() - currentTimeMillis));
        System.out.println(userInfo.toString());
    }

}
测试函数

FutureTask实现

Future表示异步计算的结果,提供了用于检查计算是否完成。等待计算完成以及获取结果的方法

import java.util.concurrent.*;
import java.util.concurrent.locks.LockSupport;

// 我们想一想,这个功能怎么实现
// (jdk本质,就是利用一些底层API,为开发人员提供便利)
public class NeteaseFutureTask<T> implements Runnable, Future { // 获取 线程异步执行结果 的方式
    Callable<T> callable; //  业务逻辑在callable里面
    T result = null;
    volatile String state = "NEW";  // task执行状态
    LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();// 定义一个存储等待者的集合

    public NeteaseFutureTask(Callable<T> callable) {
        this.callable = callable;
    }

    @Override
    public void run() {
        try {
            result = callable.call();
        } catch (Exception e) {
            e.printStackTrace();
            // result = exception
        } finally {
            state = "END";
        }

        // 唤醒等待者
        Thread waiter = waiters.poll();
        while (waiter != null) {
            LockSupport.unpark(waiter);

            waiter = waiters.poll(); // 继续取出队列中的等待者
        }
    }

    // 返回结果,
    @Override
    public T get() {
        if ("END".equals(state)) {
            return result;
        }

        waiters.offer(Thread.currentThread()); // 加入到等待队列,线程不继续往下执行

        while (!"END".equals(state)) {
            LockSupport.park(); // 线程通信的知识点
        }
        // 如果没有结束,那么调用get方法的线程,就应该进入等待
        return result;
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return false;
    }

    @Override
    public boolean isCancelled() {
        return false;
    }

    @Override
    public boolean isDone() {
        return false;
    }

    @Override
    public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return null;
    }
}
Future task
import com.alibaba.fastjson.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

import java.util.concurrent.*;

@Service
public class UserServiceFutureTask {
    ExecutorService executorService = Executors.newCachedThreadPool();
    @Autowired
    private RestTemplate restTemplate;

    /**
     * 查询多个系统的数据,合并返回
     */
    public Object getUserInfo(String userId) throws ExecutionException, InterruptedException {
        // 其他例子, 查数据库的多个表数据,分多次查询

        // 原味爱好
        // Future < >  Callable
        // 1 和runnable一样的业务定义.  但是本质上是有区别的:  返回值 异常 call run.
        Callable<JSONObject> callable = new Callable<JSONObject>() {
            @Override
            public JSONObject call() throws Exception {
                // 1. 先从调用获取用户基础信息的http接口
                long userinfoTime = System.currentTimeMillis();
                String value = restTemplate.getForObject("http://localhost:8080/Test002/TestServlet/get?userId=" + userId, String.class);
                JSONObject userInfo = JSONObject.parseObject(value);
                System.out.println("userinfo-api用户基本信息接口调用时间为" + (System.currentTimeMillis() - userinfoTime));
                return userInfo;
            }
        };

        // 通过多线程运行callable
        executorService.submit(callable);
        NeteaseFutureTask<JSONObject> userInfoFutureTask = new NeteaseFutureTask<>(callable);
        new Thread(userInfoFutureTask).start();

        NeteaseFutureTask<JSONObject> intergralInfoTask = new NeteaseFutureTask(() -> {
            // 2. 再调用获取用户积分信息的接口
            long integralApiTime = System.currentTimeMillis();
            String intergral = restTemplate.getForObject("http://localhost:8080/Test002/TestServlet/get?userId=" + userId,
                    String.class);
            JSONObject intergralInfo = JSONObject.parseObject(intergral);
            System.out.println("integral-api积分接口调用时间为" + (System.currentTimeMillis() - integralApiTime));
            return intergralInfo;
        });
        new Thread(intergralInfoTask).start();

        // 3. 合并为一个json对象
        JSONObject result = new JSONObject();
        result.putAll(userInfoFutureTask.get()); // 会等待任务执行结束
        result.putAll(intergralInfoTask.get());

        return result;
    }

}
import com.study.thread.future.service.UserServiceCountLatch;
import com.study.thread.future.service.UserServiceForkJoin;
import com.study.thread.future.service.UserServiceFutureTask;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import com.study.thread.future.service.UserService;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = WebsiteDemoApplication.class)
public class WebsiteDemoApplicationTests {

    @Before
    public void start() {
        System.out.println("开始测试");
    }

    @After
    public void end() {
        System.out.println("结束测试");
    }

    @Test
    public void ex() {
        int x = 0, y = 100;
        System.out.println("结果" + (x + y));
        int i = 1 / 0;
    }

    @Autowired
    UserService userService;

    //forkjoin实现
    @Autowired
    UserServiceForkJoin userServiceForkJoin;
    //future实现
    @Autowired
    UserServiceFutureTask userServiceFutureTask;
    //倒计时实现
    @Autowired
    UserServiceCountLatch UserServiceCountLatch;

    @Test
    public void testUserSerivce() throws Exception {
        // 调用
        long currentTimeMillis = System.currentTimeMillis();
        // http 实际就是 线程 调用service
        Object userInfo = userServiceFutureTask.getUserInfo("chen");

        System.out.println("getUserInfo总执行时间为" + (System.currentTimeMillis() - currentTimeMillis));
        System.out.println(userInfo.toString());
    }

}
测试代码

 

  

以上部分内容和图片来自网易云课堂


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM