Java8並行流使用注意事項


對於從事Java開發的童鞋來說,相信對於Java8的並行流並不陌生,沒錯,我們常常用它來執行並行任務,但是由於並行流(parallel stream)采用的是享線程池,可能會對我們的性能造成嚴重影響,那怎么處理呢?

問題

首先我們來看看具體的問題。在開發中,我們常常通過以下方法,實現並行流執行並行任務:

myList.parallelStream.map(obj -> longRunningOperation())

但是這存在一個嚴重的問題:在 JVM 的后台,使用通用的 fork/join 池來完成上述功能,該池是所有並行流共享的。默認情況,fork/join 池會為每個處理器分配一個線程。假設你有一台16核的機器,這樣你就只能創建16個線程。對 CPU 密集型的任務來說,這樣是有意義的,因為你的機器確實只能執行16個線程。但是真實情況下,不是所有的任務都是 CPU 密集型的。例如:

myList.parallelStream 

  .map(this::retrieveFromA)

  .map(this::processUsingB)

  .forEach(this::saveToC)

 

myList.parallelStream 

  .map(this::retrieveFromD)

  .map(this::processUsingE)

  .forEach(this::saveToD)

這兩個流很大程度上是受限於IO操作,所以會等待其他系統。但這兩個流使用相同的(小)線程池,因此會相互等待而被阻塞,非常不友好。比如:

 

final List<Integer> firstRange = buildIntRange(); 

   firstRange.parallelStream().forEach((number) -> {

      try {

         // do something slow

         Thread.sleep(5);

      } catch (InterruptedException e) { }

});

 

在執行期間,我獲取了一份線程dump的文件。這是相關的線程:

 

ForkJoinPool.commonPool-worker-1 

ForkJoinPool.commonPool-worker-2 

ForkJoinPool.commonPool-worker-3 

ForkJoinPool.commonPool-worker-4

現在,我要並行的執行這兩個並行流:

Runnable firstTask = () -> { 

  firstRange.parallelStream().forEach((number) -> {

    try {

      // do something slow

      Thread.sleep(5);

    } catch (InterruptedException e) { }

  });

};

Runnable secondTask = () -> { 

  secondRange.parallelStream().forEach((number) -> {

    try {

      // do something slow

      Thread.sleep(5);

    } catch (InterruptedException e) { }

  });

};

// run threads

 

這次我們再看一下線程dump文件:

 

ForkJoinPool.commonPool-worker-1 

ForkJoinPool.commonPool-worker-2 

ForkJoinPool.commonPool-worker-3 

ForkJoinPool.commonPool-worker-4

正如你所見,結果是一樣的。我們只使用了4個線程。

 

解決辦法

對於上面的問題,我們可以在JVM 后台使用 fork/join 池,在 ForkJoinTask 的文檔中,我們可以看到:

如果合適,安排一個異步執行的任務到當前正在運行的池中。如果任務不在inForkJoinPool()中,也可以調用ForkJoinPool.commonPool()獲取新的池來執行,比如:

 

ForkJoinPool forkJoinPool = new ForkJoinPool(3); 

forkJoinPool.submit(() -> { 

  firstRange.parallelStream().forEach((number) -> {

    try {

      Thread.sleep(5);

    } catch (InterruptedException e) { }

  });

});

ForkJoinPool forkJoinPool2 = new ForkJoinPool(3); 

forkJoinPool2.submit(() -> { 

  secondRange.parallelStream().forEach((number) -> {

    try {

      Thread.sleep(5);

    } catch (InterruptedException e) {

    }

  });

});

 

現在,我們再次查看線程池:

 

ForkJoinPool-1-worker-1 

ForkJoinPool-1-worker-2 

ForkJoinPool-1-worker-3 

ForkJoinPool-1-worker-4 

ForkJoinPool-2-worker-1 

ForkJoinPool-2-worker-2 

ForkJoinPool-2-worker-3 

ForkJoinPool-1-worker-4

上面這種方法為什么又正確顯示了呢?因為我們創建自己的線程池,所以可以避免共享線程池,如果有需要,甚至可以分配比處理機數量更多的線程。

ForkJoinPool forkJoinPool = new ForkJoinPool(<numThreads>);

 

以上就是Java8並行流在使用中所存在的一些問題及解決辦法,部分內容參考自一個Java教學網站,希望對Java初學者有所幫助。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM