使用ForkJoinPool來多線程的拆分任務,執行任務,合並結果。


ForkJoinPool 是jdk1.7 由Doug Lea 寫的實現   遞歸調用任務拆分,合並,的線程池。

代碼示例:

package www.itbac.com;

import com.alibaba.fastjson.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

import java.util.ArrayList;
import java.util.concurrent.*;

/**
 * 並行調用http接口
 */
@Service
public class UserServiceForkJoin {
    // 本質是一個線程池,默認的線程數量:CPU的核數
    ForkJoinPool forkJoinPool = new ForkJoinPool(10, ForkJoinPool.defaultForkJoinWorkerThreadFactory,
            null, true);
    @Autowired
    private RestTemplate restTemplate;

    /**
     * 查詢多個系統的數據,合並返回
     */
    public Object getUserInfo(String userId) throws ExecutionException, InterruptedException {
        // 其他例子, 查數據庫的多個表數據,分多次查詢
        // fork/join
        // forkJoinPool.submit()
        ArrayList<String> urls = new ArrayList<>();
        urls.add("http://www.itbac.com/userinfo-api/get?userId=" + userId);
        urls.add("http://www.itbac.com/integral-api/get?userId=" + userId);

        HttpJsonRequest httpJsonRequest = new HttpJsonRequest(restTemplate, urls, 0, urls.size() - 1);
        ForkJoinTask<JSONObject> forkJoinTask = forkJoinPool.submit(httpJsonRequest);

        JSONObject result = forkJoinTask.get();
        return result;
    }
}

// 自定義任務類, 繼承遞歸任務。
class HttpJsonRequest extends RecursiveTask<JSONObject> {

    RestTemplate restTemplate;
    ArrayList<String> urls;
    int start;
    int end;

    HttpJsonRequest(RestTemplate restTemplate, ArrayList<String> urls, int start, int end) {
        this.restTemplate = restTemplate;
        this.urls = urls;
        this.start = start;
        this.end = end;
    }

    // 就是實際去執行的一個方法入口(任務拆分)
    @Override
    protected JSONObject compute() {
        int count = end - start; // 代表當前這個task需要處理多少數據
        // 自行根據業務場景去判斷是否是大任務,是否需要拆分
        if (count == 0) {
            String url = urls.get(start);
            // TODO 如果只有一個接口調用,立刻調用
            long userinfoTime = System.currentTimeMillis();
            String response = restTemplate.getForObject(url, String.class);
            JSONObject value = JSONObject.parseObject(response);
            System.out.println(Thread.currentThread() + " 接口調用完畢" + (System.currentTimeMillis() - userinfoTime) + " #" + url);
            return value;
        } else { // 如果是多個接口調用,拆分成子任務  7,8,   9,10
            System.out.println(Thread.currentThread() + "任務拆分一次");
            //求中間值。
            int x = (start + end) / 2;
            //任務從開始,到中間值。
            HttpJsonRequest httpJsonRequest = new HttpJsonRequest(restTemplate, urls, start, x);// 負責處理哪一部分?
            //fork拆分任務。
            httpJsonRequest.fork();
            //任務從中間值+1 ,到結束。
            HttpJsonRequest httpJsonRequest1 = new HttpJsonRequest(restTemplate, urls, x + 1, end);// 負責處理哪一部分?
            httpJsonRequest1.fork();

            // join獲取處理結果
            JSONObject result = new JSONObject();
            
            //join合並結果。
            result.putAll(httpJsonRequest.join());
            result.putAll(httpJsonRequest1.join());
            
            return result;
        }
    }
}

就是把任務拆分,交給線程池執行,再合並。與Future的獲取返回值有點相似。只是對任務拆分做了抽象封裝。

 

特點:

線程池 ThreadPoolExecutor 中只維護了一個隊列。多線程去隊列中爭搶任務來執行。

 

而ForkJoinPool 是每一個大任務是維護一個隊列,fork拆分出的小任務也是在自己隊列中。一個線程去處理自己隊列中的任務,此時,沒有線程爭搶,效率比線程池要高。

該線程把當前自己的隊列處理完了,就去和其他線程爭搶其他隊列的任務來處理,這個術語叫工作竊取work-stealing .

 

ForkJoinPool 維護了多個隊列,ThreadPoolExecutor只維護了一個隊列,通過多個隊列來減少線程爭搶,從而提高了效率。

但是:


每個worker線程都維護一個任務隊列,ForkJoinWorkerThread中的任務隊列。當這個worker線程處理完自己隊列的任務,會隨機從其他的worker的隊列中拿走一個任務執行(工作竊取:work-stealing )。

如果所有worker線程都很忙,大家都沒有工作竊取,那就是單線程處理完整個任務隊列。對於請求方而言,本次任務拆分,並沒有提高響應的效率?

而且,如果任務拆分太細,遞歸調用太深,這個拆分,合並,的過程,也是消耗性能的。

 

結語:

 

ForkJoinPool的工作竊取帶來的性能提升偏理論,API的源碼復雜度較高,實際研發中可控性來說不如其他API ,謹慎使用。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM