拆分任務,多線程處理,等待返回總和結果(fork/join、CountDownLatch、FutureTask)


fork/join使用

ForkJoinPool是ExecultorService接口的實現,它專為可以遞歸分解成小塊的工作而設計

fork/join框架將任務分配給線程池中的工作線程,充分利用多處理器的優勢,提高程序性能。

使用fork/join框架的第一步是編寫一部分工作的代碼。類似的偽代碼如下:

如果(當前工作部分足夠小)

  直接做這項工作

其他

  把當前工作分成兩部分

調用這兩部分並等待結果

將此代碼包裝在ForkJoinTask子類中,通常RecursiveTask(可以返回結果)或  RecursiveAction

關鍵點:分解任務fork出新任務,匯集join任務執行結果

 

@Service
public class UserServiceForkJoin {
    // 本質是一個線程池,默認的線程數量:CPU的核數
    ForkJoinPool forkJoinPool = new ForkJoinPool(10, ForkJoinPool.defaultForkJoinWorkerThreadFactory,
            null, true);
    @Autowired
    private RestTemplate restTemplate;

    /**
     * 查詢多個系統的數據,合並返回
     */
    public Object getUserInfo(String userId) throws ExecutionException, InterruptedException {
        // 其他例子, 查數據庫的多個表數據,分多次查詢
        // fork/join
        // forkJoinPool.submit()
        ArrayList<String> urls = new ArrayList<>();
        urls.add("http://www.tony.com/userinfo-api/get?userId=" + userId);
        urls.add("http://www.tony.com/integral-api/get?userId=" + userId);

        HttpJsonRequest httpJsonRequest = new HttpJsonRequest(restTemplate, urls, 0, urls.size() - 1);
        ForkJoinTask<JSONObject> forkJoinTask = forkJoinPool.submit(httpJsonRequest);

        JSONObject result = forkJoinTask.get();
        return result;
    }
}

// 任務
class HttpJsonRequest extends RecursiveTask<JSONObject> {

    RestTemplate restTemplate;
    ArrayList<String> urls;
    int start;
    int end;

    HttpJsonRequest(RestTemplate restTemplate, ArrayList<String> urls, int start, int end) {
        this.restTemplate = restTemplate;
        this.urls = urls;
        this.start = start;
        this.end = end;
    }

    // 就是實際去執行的一個方法入口(任務拆分)
    @Override
    protected JSONObject compute() {
        int count = end - start; // 代表當前這個task需要處理多少數據
        // 自行根據業務場景去判斷是否是大任務,是否需要拆分
        if (count == 0) {
            String url = urls.get(start);
            // TODO 如果只有一個接口調用,立刻調用
            long userinfoTime = System.currentTimeMillis();
            String response = restTemplate.getForObject(url, String.class);
            JSONObject value = JSONObject.parseObject(response);
            System.out.println(Thread.currentThread() + " 接口調用完畢" + (System.currentTimeMillis() - userinfoTime) + " #" + url);
            return value;
        } else { // 如果是多個接口調用,拆分成子任務  7,8,   9,10
            System.out.println(Thread.currentThread() + "任務拆分一次");
            int x = (start + end) / 2;
            HttpJsonRequest httpJsonRequest = new HttpJsonRequest(restTemplate, urls, start, x);// 負責處理哪一部分?
            httpJsonRequest.fork();

            HttpJsonRequest httpJsonRequest1 = new HttpJsonRequest(restTemplate, urls, x + 1, end);// 負責處理哪一部分?
            httpJsonRequest1.fork();

            // join獲取處理結果
            JSONObject result = new JSONObject();
            result.putAll(httpJsonRequest.join());
            result.putAll(httpJsonRequest1.join());
            return result;
        }
    }
}
示例代碼
@RunWith(SpringRunner.class)
@SpringBootTest(classes = WebsiteDemoApplication.class)
public class WebsiteDemoApplicationTests {

    @Before
    public void start() {
        System.out.println("開始測試");
    }

    @After
    public void end() {
        System.out.println("結束測試");
    }

    @Test
    public void ex() {
        int x = 0, y = 100;
        System.out.println("結果" + (x + y));
        int i = 1 / 0;
    }

    @Autowired
    UserService userService;

    //forkjoin實現
    @Autowired
    UserServiceForkJoin userServiceForkJoin;
    //future實現
    @Autowired
    UserServiceFutureTask userServiceFutureTask;
    //倒計時實現
    @Autowired
    UserServiceCountLatch UserServiceCountLatch;

    @Test
    public void testUserSerivce() throws Exception {
        // 調用
        long currentTimeMillis = System.currentTimeMillis();
        // http 實際就是 線程 調用service
        Object userInfo = userServiceForkJoin.getUserInfo("chen");

        System.out.println("getUserInfo總執行時間為" + (System.currentTimeMillis() - currentTimeMillis));
        System.out.println(userInfo.toString());
    }

}
測試函數

倒計時實現

/**
 * 調用http接口
 */
@Service
public class UserServiceCountLatch {
    ExecutorService executorService = Executors.newCachedThreadPool();

    @Autowired
    private RestTemplate restTemplate;

    /**
     * 查詢多個系統的數據,合並返回
     */
    public Object getUserInfo(String userId) throws InterruptedException {
        CountDownLatch count = new CountDownLatch(2);
        ArrayList<JSONObject> values = new ArrayList<>();
        // 你可以封裝成一個 提交URL 就能自動多線程調用的 工具
            executorService.submit(() -> {
                // 1. 先從調用獲取用戶基礎信息的http接口
                long userinfoTime = System.currentTimeMillis();
                String value = restTemplate.getForObject("http://localhost:8080/Test002/TestServlet/get?userId=" + userId, String.class);
                JSONObject userInfo = JSONObject.parseObject(value);
                System.out.println("userinfo-api用戶基本信息接口調用時間為" + (System.currentTimeMillis() - userinfoTime));
                values.add(userInfo);
                count.countDown();
            });
            executorService.submit(() -> {
                // 2. 再調用獲取用戶積分信息的接口
                long integralApiTime = System.currentTimeMillis();
                String intergral = restTemplate.getForObject("http://localhost:8080/Test002/TestServlet/get?userId=" + userId,
                        String.class);
                JSONObject intergralInfo = JSONObject.parseObject(intergral);
                System.out.println("integral-api積分接口調用時間為" + (System.currentTimeMillis() - integralApiTime));
                values.add(intergralInfo);
                count.countDown();
        });

        count.await();// 等待計數器歸零

        // 3. 合並為一個json對象
        JSONObject result = new JSONObject();
        for (JSONObject value : values) {
            result.putAll(value);
        }
        return result;
    }
}
@RunWith(SpringRunner.class)
@SpringBootTest(classes = WebsiteDemoApplication.class)
public class WebsiteDemoApplicationTests {

    @Before
    public void start() {
        System.out.println("開始測試");
    }

    @After
    public void end() {
        System.out.println("結束測試");
    }

    @Test
    public void ex() {
        int x = 0, y = 100;
        System.out.println("結果" + (x + y));
        int i = 1 / 0;
    }

    @Autowired
    UserService userService;

    //forkjoin實現
    @Autowired
    UserServiceForkJoin userServiceForkJoin;
    //future實現
    @Autowired
    UserServiceFutureTask userServiceFutureTask;
    //倒計時實現
    @Autowired
    UserServiceCountLatch UserServiceCountLatch;

    @Test
    public void testUserSerivce() throws Exception {
        // 調用
        long currentTimeMillis = System.currentTimeMillis();
        // http 實際就是 線程 調用service
        Object userInfo = UserServiceCountLatch.getUserInfo("chen");

        System.out.println("getUserInfo總執行時間為" + (System.currentTimeMillis() - currentTimeMillis));
        System.out.println(userInfo.toString());
    }

}
測試函數

FutureTask實現

Future表示異步計算的結果,提供了用於檢查計算是否完成。等待計算完成以及獲取結果的方法

import java.util.concurrent.*;
import java.util.concurrent.locks.LockSupport;

// 我們想一想,這個功能怎么實現
// (jdk本質,就是利用一些底層API,為開發人員提供便利)
public class NeteaseFutureTask<T> implements Runnable, Future { // 獲取 線程異步執行結果 的方式
    Callable<T> callable; //  業務邏輯在callable里面
    T result = null;
    volatile String state = "NEW";  // task執行狀態
    LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();// 定義一個存儲等待者的集合

    public NeteaseFutureTask(Callable<T> callable) {
        this.callable = callable;
    }

    @Override
    public void run() {
        try {
            result = callable.call();
        } catch (Exception e) {
            e.printStackTrace();
            // result = exception
        } finally {
            state = "END";
        }

        // 喚醒等待者
        Thread waiter = waiters.poll();
        while (waiter != null) {
            LockSupport.unpark(waiter);

            waiter = waiters.poll(); // 繼續取出隊列中的等待者
        }
    }

    // 返回結果,
    @Override
    public T get() {
        if ("END".equals(state)) {
            return result;
        }

        waiters.offer(Thread.currentThread()); // 加入到等待隊列,線程不繼續往下執行

        while (!"END".equals(state)) {
            LockSupport.park(); // 線程通信的知識點
        }
        // 如果沒有結束,那么調用get方法的線程,就應該進入等待
        return result;
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return false;
    }

    @Override
    public boolean isCancelled() {
        return false;
    }

    @Override
    public boolean isDone() {
        return false;
    }

    @Override
    public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return null;
    }
}
Future task
import com.alibaba.fastjson.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

import java.util.concurrent.*;

@Service
public class UserServiceFutureTask {
    ExecutorService executorService = Executors.newCachedThreadPool();
    @Autowired
    private RestTemplate restTemplate;

    /**
     * 查詢多個系統的數據,合並返回
     */
    public Object getUserInfo(String userId) throws ExecutionException, InterruptedException {
        // 其他例子, 查數據庫的多個表數據,分多次查詢

        // 原味愛好
        // Future < >  Callable
        // 1 和runnable一樣的業務定義.  但是本質上是有區別的:  返回值 異常 call run.
        Callable<JSONObject> callable = new Callable<JSONObject>() {
            @Override
            public JSONObject call() throws Exception {
                // 1. 先從調用獲取用戶基礎信息的http接口
                long userinfoTime = System.currentTimeMillis();
                String value = restTemplate.getForObject("http://localhost:8080/Test002/TestServlet/get?userId=" + userId, String.class);
                JSONObject userInfo = JSONObject.parseObject(value);
                System.out.println("userinfo-api用戶基本信息接口調用時間為" + (System.currentTimeMillis() - userinfoTime));
                return userInfo;
            }
        };

        // 通過多線程運行callable
        executorService.submit(callable);
        NeteaseFutureTask<JSONObject> userInfoFutureTask = new NeteaseFutureTask<>(callable);
        new Thread(userInfoFutureTask).start();

        NeteaseFutureTask<JSONObject> intergralInfoTask = new NeteaseFutureTask(() -> {
            // 2. 再調用獲取用戶積分信息的接口
            long integralApiTime = System.currentTimeMillis();
            String intergral = restTemplate.getForObject("http://localhost:8080/Test002/TestServlet/get?userId=" + userId,
                    String.class);
            JSONObject intergralInfo = JSONObject.parseObject(intergral);
            System.out.println("integral-api積分接口調用時間為" + (System.currentTimeMillis() - integralApiTime));
            return intergralInfo;
        });
        new Thread(intergralInfoTask).start();

        // 3. 合並為一個json對象
        JSONObject result = new JSONObject();
        result.putAll(userInfoFutureTask.get()); // 會等待任務執行結束
        result.putAll(intergralInfoTask.get());

        return result;
    }

}
import com.study.thread.future.service.UserServiceCountLatch;
import com.study.thread.future.service.UserServiceForkJoin;
import com.study.thread.future.service.UserServiceFutureTask;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import com.study.thread.future.service.UserService;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = WebsiteDemoApplication.class)
public class WebsiteDemoApplicationTests {

    @Before
    public void start() {
        System.out.println("開始測試");
    }

    @After
    public void end() {
        System.out.println("結束測試");
    }

    @Test
    public void ex() {
        int x = 0, y = 100;
        System.out.println("結果" + (x + y));
        int i = 1 / 0;
    }

    @Autowired
    UserService userService;

    //forkjoin實現
    @Autowired
    UserServiceForkJoin userServiceForkJoin;
    //future實現
    @Autowired
    UserServiceFutureTask userServiceFutureTask;
    //倒計時實現
    @Autowired
    UserServiceCountLatch UserServiceCountLatch;

    @Test
    public void testUserSerivce() throws Exception {
        // 調用
        long currentTimeMillis = System.currentTimeMillis();
        // http 實際就是 線程 調用service
        Object userInfo = userServiceFutureTask.getUserInfo("chen");

        System.out.println("getUserInfo總執行時間為" + (System.currentTimeMillis() - currentTimeMillis));
        System.out.println(userInfo.toString());
    }

}
測試代碼

 

  

以上部分內容和圖片來自網易雲課堂


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM