CompletionService的功能是以異步的方式一邊生產新的任務,一邊處理已完成任務的結果,這樣可以將執行任務與處理任務分離開來進行處理。今天我們通過實例來學習一下CompletionService的用法。
CompletionService的簡單使用
使用submit()方法執行任務,使用take取得已完成的任務,並按照完成這些任務的時間順序處理它們的結果。
一、CompletionService的submit方法
public class CompletionServiceTest { public static void main(String[] args) throws Exception { ExecutorService service = Executors.newFixedThreadPool(5); CompletionService<String> completionService = new ExecutorCompletionService<String>(service); for (int i = 0; i < 5; i++) { completionService.submit(new ReturnAfterSleepCallable(i)); } System.out.println("after submit"); for (int i = 0; i < 5; i++) { System.out.println("result: " + completionService.take().get()); // 這個方法是阻塞的 } System.out.println("after get"); service.shutdown(); } private static class ReturnAfterSleepCallable implements Callable<String> { int sleep; public ReturnAfterSleepCallable(int sleep) { this.sleep = sleep; } @Override public String call() throws Exception { TimeUnit.SECONDS.sleep(sleep); return System.currentTimeMillis() + ",sleep=" + String.valueOf(sleep); } } }
運行的結果如下:
after submit result: 1501052486631,sleep=0 result: 1501052487632,sleep=1 result: 1501052488632,sleep=2 result: 1501052489632,sleep=3 result: 1501052490633,sleep=4 after get
官方文檔上的說明:
Submits a value-returning task for execution and returns a Future representing the pending results of the task. Upon completion, this task may be taken or polled.
二、使用CompletionService的take方法
take()方法取得最先完成任務的Future對象,誰執行時間最短誰最先返回。
package com.linux.thread; import java.util.Random; import java.util.concurrent.*; public class RunMain1 { public static void main(String[] args) { try { ExecutorService executorService = Executors.newCachedThreadPool(); ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executorService); for (int i = 0; i < 10; i++) { completionService.submit(new Callable<String>() { @Override public String call() throws Exception { int sleepValue = new Random().nextInt(5); System.out.println("sleep = " + sleepValue + ", name: " + Thread.currentThread().getName()); TimeUnit.SECONDS.sleep(sleepValue); return "huhx: " + sleepValue + ", " + Thread.currentThread().getName(); } }); } for (int i = 0; i < 10; i++) { System.out.println(completionService.take().get()); } executorService.shutdown(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
一次的運行結果如下:
sleep = 4, name: pool-1-thread-1 sleep = 0, name: pool-1-thread-2 sleep = 0, name: pool-1-thread-3 huhx: 0, pool-1-thread-3 huhx: 0, pool-1-thread-2 sleep = 1, name: pool-1-thread-4 sleep = 4, name: pool-1-thread-5 sleep = 3, name: pool-1-thread-6 sleep = 4, name: pool-1-thread-7 sleep = 0, name: pool-1-thread-8 huhx: 0, pool-1-thread-8 sleep = 4, name: pool-1-thread-9 sleep = 3, name: pool-1-thread-10 huhx: 1, pool-1-thread-4 huhx: 3, pool-1-thread-6 huhx: 3, pool-1-thread-10 huhx: 4, pool-1-thread-1 huhx: 4, pool-1-thread-5 huhx: 4, pool-1-thread-7 huhx: 4, pool-1-thread-9
官方文檔上的說明:
Retrieves and removes the Future representing the next completed task, waiting if none are yet present.
三、使用CompletionService的poll方法
方法poll的作用是獲取並移除表示下一個已完成任務的Future,如果不存在這樣的任務,則返回null,方法poll是無阻塞的。
package com.linux.thread; import java.util.concurrent.*; public class RunMain2 { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); CompletionService<String> service = new ExecutorCompletionService<String>(executorService); service.submit(new Callable<String>() { @Override public String call() throws Exception { TimeUnit.SECONDS.sleep(3); System.out.println("3 seconds pass."); return "3秒"; } }); System.out.println(service.poll()); executorService.shutdown(); } }
運行的結果如下:
null 3 seconds pass.
官方文檔上的說明:
Retrieves and removes the Future representing the next completed task or null if none are present.
友情鏈接
