java並行之parallelStream與CompletableFuture比較


1.

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;


public class CompletableFutureTest {
    static final UserFeatureable[] UserFeatures = {new GetCareerUserFeature(), new GetTradeUserFeature(), new
            GetVipLevelUserFeature()};

    public static void main(String[] args) {
        int id = 10001;
        List<UserFeatureable> userFeatures = Arrays.asList(UserFeatures);
        long startTime = System.currentTimeMillis();
        String result = userFeatures.parallelStream().map(p -> p.getKey() + ":" + p.getValue(id)).collect(joining(","));
        long endTime = System.currentTimeMillis();
        System.out.println(String.format("parallelStream消耗時間:%d,返回結果:%s", (endTime - startTime), result));

        startTime = System.currentTimeMillis();
        List<Future<String>> futureList = userFeatures.stream().map(
                p -> CompletableFuture.supplyAsync(() -> p.getKey() + ":" + p.getValue(id)))
                .collect(toList());
        result = futureList.stream().map(p->getVal(p,"")).collect(joining(","));
        endTime = System.currentTimeMillis();
        System.out.println(String.format("CompletableFuture的默認的ForkJoin線程池消耗時間:%d,返回結果:%s", (endTime - startTime),
                result));

        //當userFeature越多,使用自定義線程池更有利
        startTime = System.currentTimeMillis();
        futureList = userFeatures.stream().map(
                p -> CompletableFuture.supplyAsync(() -> p.getKey() + ":" + p.getValue(id),CustomThreadPool.INSTANCE))
                .collect(toList());
        result = futureList.stream().map(p->getVal(p,"")).collect(joining(","));
        endTime = System.currentTimeMillis();
        System.out.println(String.format("CompletableFuture的自定義線程池消耗時間:%d,返回結果:%s", (endTime - startTime), result));
    }

    private static <T>T getVal(Future<T> future,T defaultV){
        try {
            return future.get(2,TimeUnit.SECONDS);
        }catch (Exception ex){
            return defaultV;
        }
    }
}

interface UserFeatureable {
    String getKey();

    String getValue(int id);
}

class GetCareerUserFeature implements UserFeatureable {

    @Override
    public String getKey() {
        return "career";
    }

    @Override
    public String getValue(int id) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException ex) {

        }
        return "10";
    }
}

class GetTradeUserFeature implements UserFeatureable {

    @Override
    public String getKey() {
        return "trade";
    }

    @Override
    public String getValue(int id) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException ex) {

        }
        return "5";
    }
}

class GetVipLevelUserFeature implements UserFeatureable {

    @Override
    public String getKey() {
        return "vip";
    }

    @Override
    public String getValue(int id) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException ex) {

        }
        return "v1";
    }
}

 

2.自定義線程池配置

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


public class CustomThreadPool {
    /**
     * 默認核心線程池大小
     */
    private static final int DEFAULT_CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors();

    /**
     * 最大線程池大小
     * 最佳線程數目 = (線程等待時間與線程CPU時間之比 + 1)* CPU數目
     * 最佳線程數目 = (1s/0.1s + 1) * CPU數目
     */
    private static final int DEFAULT_MAXIMUM_POOL_SIZE = 11 * DEFAULT_CORE_POOL_SIZE;

    /**
     * 超過核心線程后,空閑線程等待時間
     */
    private static final long DEFAULT_KEEP_ALIVE_SECONDS = 10;

    /**
     * 等待執行的線程隊列
     */
    private static BlockingQueue<Runnable> WORK_QUEUE = new LinkedBlockingDeque(DEFAULT_MAXIMUM_POOL_SIZE * 2);

    public static ThreadPoolExecutor INSTANCE = new ThreadPoolExecutor(
            DEFAULT_CORE_POOL_SIZE,
            DEFAULT_MAXIMUM_POOL_SIZE,
            DEFAULT_KEEP_ALIVE_SECONDS,
            TimeUnit.SECONDS,
            WORK_QUEUE,
            new ThreadFactoryBuilder().setNameFormat("task-pool-thread-%d").build());

}

 

3.結果

parallelStream消耗時間:2889,返回結果:career:10,trade:5,vip:v1
CompletableFuture的ForkJoin線程池消耗時間:1010,返回結果:career:10,trade:5,vip:v1
CompletableFuture的自定義線程池消耗時間:1011,返回結果:career:10,trade:5,vip:v1

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM