Fork/Join框架與Java8 Stream API 之並行流的速度比較


  Fork/Join 框架有特定的ExecutorService和線程池構成。ExecutorService可以運行任務,並且這個任務會被分解成較小的任務,它們從線程池中被fork(被不同的線程執行)出來,在join(即它的所有的子任務都完成了)之前會一直等待。

  Fork/Join 使用了任務竊取來最小化線程的征用和開銷。線程池中的每條工作線程都有自己的雙端工作隊列並且會將新任務放到這個隊列中去。它從隊列的頭部讀取任務。如果隊列是空的,工作線程就嘗試從另外一個隊列的末尾獲取一個任務。竊取操作不會很頻繁,因為工作線程會采用后進先出的順序將任務放入它們的隊列中,同時工作項的規模會隨着問題分割成子問題而變小。你一開始把任務交給一個中心的工作線程,之后它會繼續將這個任務分解成更小的任務。最終所有的工作線程都只會設計很少量的同步操作。

  Stream介紹(引)

  Stream 作為 Java 8 的一大亮點,它與 java.io 包里的 InputStream 和 OutputStream 是完全不同的概念。它也不同於 StAX 對 XML 解析的 Stream,也不是 Amazon Kinesis 對大數據實時處理的 Stream。Java 8 中的 Stream 是對集合(Collection)對象功能的增強,它專注於對集合對象進行各種非常便利、高效的聚合操作(aggregate operation),或者大批量數據操作 (bulk data operation)。Stream API 借助於同樣新出現的 Lambda 表達式,極大的提高編程效率和程序可讀性。同時它提供串行和並行兩種模式進行匯聚操作,並發模式能夠充分利用多核處理器的優勢,使用 fork/join 並行方式來拆分任務和加速處理過程。通常編寫並行代碼很難而且容易出錯, 但使用 Stream API 無需編寫一行多線程的代碼,就可以很方便地寫出高性能的並發程序。所以說,Java 8 中首次出現的 java.util.stream 是一個函數式語言+多核時代綜合影響的產物。

  Stream 不是集合元素,它不是數據結構並不保存數據,它是有關算法和計算的,它更像一個高級版本的 Iterator。原始版本的 Iterator,用戶只能顯式地一個一個遍歷元素並對其執行某些操作;高級版本的 Stream,用戶只要給出需要對其包含的元素執行什么操作,比如 “過濾掉長度大於 10 的字符串”、“獲取每個字符串的首字母”等,Stream 會隱式地在內部進行遍歷,做出相應的數據轉換。

  Stream 就如同一個迭代器(Iterator),單向,不可往復,數據只能遍歷一次,遍歷過一次后即用盡了,就好比流水從面前流過,一去不復返。

  而和迭代器又不同的是,Stream 可以並行化操作,迭代器只能命令式地、串行化操作。顧名思義,當使用串行方式去遍歷時,每個 item 讀完后再讀下一個 item。而使用並行去遍歷時,數據會被分成多個段,其中每一個都在不同的線程中處理,然后將結果一起輸出。Stream 的並行操作依賴於 Java7 中引入的 Fork/Join 框架(JSR166y)來拆分任務和加速處理過程。

 

  所以說,實際上Stream並行流實際上就是一個幫你fork/join 后的API,為了驗證效率,我編寫了一個對1000_000個數進行排序的程序

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

public class ParallelMergeSort {

    public static void main(String[] args) {
        final int SIZE = 10000000;
        int[] list1 = new int[SIZE];
        int[] list2 = new int[SIZE];
        Integer[] list3 = new Integer[SIZE];
        
        for (int i = 0; i < list1.length; i++) {
            list1[i] = list2[i] = (int)(Math.random() * 10000000);
            list3[i] = list1[i];
        }
        
        long startTime = System.currentTimeMillis();
        parallelMergeSort(list1);
        long endTime = System.currentTimeMillis();
        System.out.println("Parallel time with " + Runtime.getRuntime().availableProcessors() + " processors is " + (endTime - startTime) + " milliseconds");
        
        startTime = System.currentTimeMillis();
        MergeSort.mergeSort(list2);
        endTime = System.currentTimeMillis();
        System.out.println("Sequent time is " + (endTime - startTime) + " milliseconds");
        
        
        List<Integer> tmp = new ArrayList<Integer>();
        Collections.addAll(tmp, list3);
        startTime = System.currentTimeMillis();
        IntStream tmp1 = tmp.stream().parallel().mapToInt(Integer::intValue).sorted();
        endTime = System.currentTimeMillis();
        System.out.println("ParallelStream time is " + (endTime - startTime) + " milliseconds");
        
        tmp1.limit(100).forEachOrdered(System.out::println);
    
        /*
        for(int i = 0; i < 100; i++) {
            System.out.println(tmp2.get(i));
        }*/
    }


    public static void parallelMergeSort(int[] list) {
        RecursiveAction mainTask = new SortTask(list);
        ForkJoinPool pool = new ForkJoinPool();
        pool.invoke(mainTask);
    }
    
    public static class SortTask extends RecursiveAction{
        /**
         * 
         */
        private static final long serialVersionUID = 1L;
        private final int THRESHOLD = 500;
        private int[] list;
        
        SortTask(int[] list){
            this.list = list;
        }
        @Override
        protected void compute() {
            if (list.length < THRESHOLD)
                java.util.Arrays.sort(list);
            else {
                //Obtain the first half
                int[] firstHalf = new int[list.length / 2];
                System.arraycopy(list, 0, firstHalf, 0, list.length / 2);
                
                //Obtain the second half
                int secondHalfLength = list.length - list.length / 2;
                int[] secondHalf = new int[secondHalfLength];
                System.arraycopy(list, list.length /2, secondHalf, 0, secondHalfLength);
                
                //Recursively sort the two halves
                invokeAll(new SortTask(firstHalf), new SortTask(secondHalf));
                
                //Merge firstHalf with second
                MergeSort.merge(firstHalf, secondHalf, list);
            }
        }
    }
    
    public static class MergeSort {
          /** The method for sorting the numbers */
          public static void mergeSort(int[] list) {
            if (list.length > 1) {
              // Merge sort the first half
              int[] firstHalf = new int[list.length / 2];
              System.arraycopy(list, 0, firstHalf, 0, list.length / 2);
              mergeSort(firstHalf);

              // Merge sort the second half
              int secondHalfLength = list.length - list.length / 2;
              int[] secondHalf = new int[secondHalfLength];
              System.arraycopy(list, list.length / 2,
                secondHalf, 0, secondHalfLength);
              mergeSort(secondHalf);

              // Merge firstHalf with secondHalf into list
              merge(firstHalf, secondHalf, list);
            }
          }

          /** Merge two sorted lists */
          public static void merge(int[] list1, int[] list2, int[] temp) {
            int current1 = 0; // Current index in list1
            int current2 = 0; // Current index in list2
            int current3 = 0; // Current index in temp

            while (current1 < list1.length && current2 < list2.length) {
              if (list1[current1] < list2[current2])
                temp[current3++] = list1[current1++];
              else
                temp[current3++] = list2[current2++];
            }

            while (current1 < list1.length)
              temp[current3++] = list1[current1++];

            while (current2 < list2.length)
              temp[current3++] = list2[current2++];
          }
    }
}

代碼可以看到,利用三種方法,對隨機生成的 int 數據排序

第一種是自己編寫的fork/join利用二分法排序

第二種是單線程下的二分法排序

第三種是並行流的排序

為了驗證並行流是否排序正確,輸出流前100個數

結果如圖:

但是這是為沒有收集器的情況,並行流很快的完成並且得到IntStream,加上收集器后:

可以看出,排序很快完成,在最后的類型轉換上花費了大量的時間,

而根據Stream 的介紹,實驗fork/join方法完成的時間應該不會與並行流差距太大,實際上,實驗中編寫的代碼在fork分解階段和join階段花費了大量時間,遠不如直接使用API快速

但是如果正確使用fork/join框架的話也不會很慢

但是相比單線程已經遠遠提升了效率

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM