例子1:單線程
例子2:多線程,同步求和(如果沒有計算完成,會阻塞)
例子3:多線程,異步求和(先累加已經完成的計算結果)
例子1-代碼
package cn.fansunion.executorservice; public class BasicCaculator { public static long sum(int[] numbers){ long sum = 0; for(int i=0;i<numbers.length;i++){ sum += numbers[i]; } return sum; } }
例子2-代碼
ExecutoreService提供了submit()方法,傳遞一個Callable,或Runnable,返回Future。如果Executor后台線程池還沒有完成Callable的計算,這調用返回Future對象的get()方法,會阻塞直到計算完成。
package cn.fansunion.executorservice; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; //並發計算數組的和,“同步”求和 public class ConcurrentCalculator { private ExecutorService exec; //這個地方,純粹是“一廂情願”,“並行執行”不受咱們控制,取決於操作系統的“態度” private int cpuCoreNumber; private List<Future<Long>> tasks = new ArrayList<Future<Long>>(); class SumCalculator implements Callable<Long> { private int[] numbers; private int start; private int end; public SumCalculator(final int[] numbers, int start, int end) { this.numbers = numbers; this.start = start; this.end = end; } public Long call() throws Exception { Long sum = 0L; for (int i = start; i < end; i++) { sum += numbers[i]; } return sum; } } public ConcurrentCalculator() { cpuCoreNumber = Runtime.getRuntime().availableProcessors(); exec = Executors.newFixedThreadPool(cpuCoreNumber); } public Long sum(final int[] numbers) { // 根據CPU核心個數拆分任務,創建FutureTask並提交到Executor for (int i = 0; i < cpuCoreNumber; i++) { int increment = numbers.length / cpuCoreNumber + 1; int start = increment * i; int end = increment * i + increment; if (end > numbers.length) end = numbers.length; SumCalculator subCalc = new SumCalculator(numbers, start, end); FutureTask<Long> task = new FutureTask<Long>(subCalc); tasks.add(task); if (!exec.isShutdown()) { exec.submit(task); } } return getResult(); } /** * 迭代每個只任務,獲得部分和,相加返回 */ public Long getResult() { Long result = 0l; for (Future<Long> task : tasks) { try { // 如果計算未完成則阻塞 Long subSum = task.get(); result += subSum; } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } return result; } public void close() { exec.shutdown(); } }
例子3-代碼
在剛在的例子中,getResult()方法的實現過程中,迭代了FutureTask的數組,如果任務還沒有完成則當前線程會阻塞。
如果我們希望任意字任務完成后就把其結果加到result中,而不用依次等待每個任務完成,可以使CompletionService。
生產者submit()執行的任務。使用者take()已完成的任務,並按照完成這些任務的順序處理它們的結果 。也就是調用CompletionService的take方法是,會返回按完成順序放回任務的結果。
CompletionService內部維護了一個阻塞隊列BlockingQueue,如果沒有任務完成,take()方法也會阻塞。
修改剛才的例子2,使用CompletionService:
package cn.fansunion.executorservice; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; //並發計算數組的和,“異步”求和 public class ConcurrentCalculatorAsync { private ExecutorService exec; private CompletionService<Long> completionService; //這個地方,純粹是“一廂情願”,“並行執行”不受咱們控制,取決於操作系統的“態度” private int cpuCoreNumber; class SumCalculator implements Callable<Long> { private int[] numbers; private int start; private int end; public SumCalculator(final int[] numbers, int start, int end) { this.numbers = numbers; this.start = start; this.end = end; } public Long call() throws Exception { Long sum = 0l; for (int i = start; i < end; i++) { sum += numbers[i]; } return sum; } } public ConcurrentCalculatorAsync() { cpuCoreNumber = Runtime.getRuntime().availableProcessors(); exec = Executors.newFixedThreadPool(cpuCoreNumber); completionService = new ExecutorCompletionService<Long>(exec); } public Long sum(final int[] numbers) { // 根據CPU核心個數拆分任務,創建FutureTask並提交到Executor for (int i = 0; i < cpuCoreNumber; i++) { int increment = numbers.length / cpuCoreNumber + 1; int start = increment * i; int end = increment * i + increment; if (end > numbers.length){ end = numbers.length; } SumCalculator subCalc = new SumCalculator(numbers, start, end); if (!exec.isShutdown()) { completionService.submit(subCalc); } } return getResult(); } /** * 迭代每個只任務,獲得部分和,相加返回 */ public Long getResult() { Long result = 0l; for (int i = 0; i < cpuCoreNumber; i++) { try { Long subSum = completionService.take().get(); result += subSum; System.out.println("subSum="+subSum+",result="+result); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } return result; } public void close() { exec.shutdown(); } }
運行代碼
package cn.fansunion.executorservice; import java.math.BigDecimal; //數組求和3個Demo public class ArraySumDemo { public static void main(String[] args) { int n = 200000000; int[] numbers = new int[n]; for(int i=1;i<=n;i++){ numbers[i-1]=i; } basic(numbers); long time = System.currentTimeMillis(); concurrentCaculatorAsync(numbers); long endTime=System.currentTimeMillis(); System.out.println("多核並行計算,異步相加:"+time(time,endTime)); long time2 = System.currentTimeMillis(); concurrentCaculator(numbers); long endTime2=System.currentTimeMillis(); System.out.println("多核並行計算,同步相加:"+time(time2,endTime2)); } private static void basic(int[] numbers) { long time1 = System.currentTimeMillis(); long sum=BasicCaculator.sum(numbers); long endTime1 = System.currentTimeMillis(); System.out.println("單線程:"+time(time1,endTime1)); System.out.println("Sum:"+sum); } private static double time(long time, long endTime) { long costTime = endTime-time; BigDecimal bd = new BigDecimal(costTime); //本來想着,把毫秒轉換成秒的,最后發現計算太快了 BigDecimal unit = new BigDecimal(1L); BigDecimal s= bd.divide(unit,3); return s.doubleValue(); } //並行計算,“同步”求和 private static void concurrentCaculator(int[] numbers) { ConcurrentCalculator calc = new ConcurrentCalculator(); Long sum = calc.sum(numbers); System.out.println(sum); calc.close(); } //並行計算,“異步”求和 private static void concurrentCaculatorAsync(int[] numbers) { ConcurrentCalculatorAsync calc = new ConcurrentCalculatorAsync(); Long sum = calc.sum(numbers); System.out.println("Sum:"+sum); calc.close(); } }
控制台輸出
單線程:93.0
Sum:20000000100000000
subSum=3750000175000002,result=3750000175000002
subSum=1250000075000001,result=5000000250000003
subSum=6250000275000003,result=11250000525000006
subSum=8749999574999994,result=20000000100000000
Sum:20000000100000000
多核並行計算,異步相加:786.0
20000000100000000
多核並行計算,同步相加:650.0
個人看法:3段代碼的時間僅供參考,沒有排除干擾因素。
總的來說,單線程執行更快一些,應該是由於“數組求和”本身,並不需要其它額外資源,不會阻塞。
而多線程,反而增加了“線程調度”的時間開銷。
還可以看出,CPU計算還是非常快的。“200000000”2億個整數相加,用了不到0.1秒的時間。
插曲
最開始看代碼的時候,誤解了。以為“根據CPU核心個數拆分任務”,這個時候的“多線程”就是“並行”了。
實際上,不一定,除了要看CPU的核數,還要看操作系統的分配。
// 根據CPU核心個數拆分任務,創建FutureTask並提交到Executor
for (int i = 0; i < cpuCoreNumber; i++) {
}
最開始,我還在考慮“單線程”、“多核並行+多線程並發”、“單核+多線程並發”,等好幾種情況來實現“數組求和”。
最后,感覺自己還是想多了。“並行”應該不受自己控制,只能控制是“單線程”或者“多線程”。
“java並發編程-Executor框架”這篇文章中的“例子:並行計算數組的和。” 這句話,誤導了我,根本不能保證是“並行計算”。
友情提示:網絡上的文章,僅供參考學習,需要自己的判斷。
關於Java-多核-並行-多線程,我初步認為“多線程可以並行執行,但不受我們自己的控制,取決於操作系統”。
網友的一些看法:
看法1:
java線程可以在運行在多個cpu核上嗎?
我是一直都以為這個問題的答案是肯定的,也就是說可以運行在多核上。
但是有一天見到這樣的一個理論,我就頓時毀三觀了。
JVM在操作系統中是作為一個進程的,java所有的線程都運行自這個JVM進程中,
所以說java線程某個時間只可能運行在一個核上。
這個說法對我的打擊太大了,我不能接受。於是就開始多方求證。網上搜索 和朋友一起討論,
最終證實了java線程是可以運行在多核上的,為什么呢?
下面一句話將驚醒夢中人:
現代os都將線程作為最小調度單位,進程作為資源分配的最小單位。 在windows中進程是不活動的,
只是作為線程的容器。
也就是說,java中的所有線程確實在JVM進程中,但是CPU調度的是進程中的線程。
看法2:
JAVA中的多線程能在多CPU機器上並行執行嗎?注意,我說的不是並發執行哦 。
我們用java寫一個多線程程序,就啟動了一個JVM進程,所以這些線程都是在這一個JVM進程之中的,我不知道同一時刻,能不能有多個CPU運行同一進程,進而並行執行這同一進程中的不同線程?一直很疑惑
你的思路是對的,CPU就是為了迎合操作系統的多線程從而提高系統的計算效率.但是具體分配任務到各個內核中去執行的並非JAVA與JVM而是操作系統.
也就是說,你所執行的多線程,可能會被分配到同一個CPU內核中運行.也可能非配到不同的cpu中運行.如果可以控制CPU的分配,那也應該是操作系統的api才能實現的了。
我用JAVA創建了一個線程,這時候有主線程和子線程都在運行,那意思雙核CPU有可能在同一時刻點並行運行這兩個線程咯?
我翻了好多JAVA的有關多線程的章節,似乎都沒有說道多核CPU運行JAVA多線程,貌似都是已單核為例講解的,所以我一直覺得可能都是並發的而不是並行的?
不是,你要將你的軟件線程和計算機的CPU處理線程區分開呀.簡單說,你是無法控制CPU對於任務的分配的.
更多代碼示例:
http://git.oschina.net/fansunion/Concurrent(逐步更新中)
參考資料:
java並發編程-Executor框架
http://www.iteye.com/topic/366591
java線程可以在運行在多個cpu核上嗎?
http://blog.csdn.net/maosijunzi/article/details/42527553
JAVA中的多線程能在多CPU上並行執行嗎?注意,我說的不是並發執行哦
http://zhidao.baidu.com/link?url=e11sEOSNFoLTfVyP-5FfpktIXEgbMQkbLAzvgh8mn4V16n_qQas89voj5gVhOEkho0jRA7fp_vbnElxKgeQCDrOxGkcu6xAWaUniqpcWg33