首先介紹兩個重要的接口,Executor和ExecutorService,定義如下:
- public interface Executor {
- void execute(Runnable command);
- }
- public interface ExecutorService extends Executor {
- //不再接受新任務,待所有任務執行完畢后關閉ExecutorService
- void shutdown();
- //不再接受新任務,直接關閉ExecutorService,返回沒有執行的任務列表
- List<Runnable> shutdownNow();
- //判斷ExecutorService是否關閉
- boolean isShutdown();
- //判斷ExecutorService是否終止
- boolean isTerminated();
- //等待ExecutorService到達終止狀態
- boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
- <T> Future<T> submit(Callable<T> task);
- //當task執行成功的時候future.get()返回result
- <T> Future<T> submit(Runnable task, T result);
- //當task執行成功的時候future.get()返回null
- Future<?> submit(Runnable task);
- //批量提交任務並獲得他們的future,Task列表與Future列表一一對應
- <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
- throws InterruptedException;
- //批量提交任務並獲得他們的future,並限定處理所有任務的時間
- <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
- long timeout, TimeUnit unit) throws InterruptedException;
- //批量提交任務並獲得一個已經成功執行的任務的結果
- <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
- <T> T invokeAny(Collection<? extends Callable<T>> tasks,
- long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException;
- }
為了配合使用上面的並發編程接口,有一個Executors工廠類,負責創建各類滿足ExecutorService接口的線程池,具體如下:
newFixedThreadPool:創建一個固定長度的線程池,線程池中線程的數量從1增加到最大值后保持不變。如果某個線程壞死掉,將會補充一個新的線程。
newCachedThreadPool:創建長度不固定的線程池,線程池的規模不受限制,不常用。
newSingleThreadExecutor:創建一個單線程的Executor,他其中有一個線程來處理任務,如果這個線程壞死掉,將補充一個新線程。
newScheduledThreadPool:創建固定長度的線程池,以延時或定時的方式來執行任務。
下面是Executor和ExecutorService中常用方法的示例:
- import java.util.ArrayList;
- import java.util.Collection;
- import java.util.Iterator;
- import java.util.List;
- import java.util.concurrent.Callable;
- import java.util.concurrent.Executor;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.Future;
- import java.util.concurrent.TimeUnit;
- public class Demo{
- public static void main(String [] args){
- //--------Executor示例------------//
- Executor s=Executors.newSingleThreadExecutor();
- s.execute(new MyRunnableTask("1"));
- //--------ExecutorService示例------------//
- ExecutorService es=Executors.newFixedThreadPool(2);
- //--------get()示例------------//
- Future<String> future=es.submit(new MyCallableTask("10"));
- try{
- System.out.println(future.get());
- }catch(Exception e){}
- //--------get(timeout, timeunit)示例------------//
- future=es.submit(new MyCallableTask("11"));
- try{
- System.out.println(future.get(500,TimeUnit.MILLISECONDS));
- }catch(Exception e){
- System.out.println("cancle because timeout");
- }
- //--------invokeAll(tasks)示例------------//
- List<MyCallableTask> myCallableTasks=new ArrayList<MyCallableTask>();
- for(int i=0;i<6;i++){
- myCallableTasks.add(new MyCallableTask(i+""));
- }
- try {
- List<Future<String>> results = es.invokeAll(myCallableTasks);
- Iterator<Future<String>> iterator=results.iterator();
- while(iterator.hasNext()){
- future=iterator.next();
- System.out.println(future.get());
- }
- } catch (Exception e) {}
- //--------invokeAll(tasks,timeout,timeunit))示例------------//
- try {
- //限定執行時間為2100ms,每個任務需要1000ms,線程池的長度為2,因此最多只能處理4個任務。一共6個任務,有2個任務會被取消。
- List<Future<String>> results = es.invokeAll(myCallableTasks,2100,TimeUnit.MILLISECONDS);
- Iterator<Future<String>> iterator=results.iterator();
- while(iterator.hasNext()){
- future=iterator.next();
- if(!future.isCancelled())
- System.out.println(future.get());
- else
- System.out.println("cancle because timeout");
- }
- } catch (Exception e) {}
- es.shutdown();
- }
- }
- class MyRunnableTask implements Runnable{
- private String name;
- public MyRunnableTask(String name) {
- this.name=name;
- }
- @Override
- public void run() {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("runnable task--"+name);
- }
- }
- class MyCallableTask implements Callable<String>{
- private String name;
- public MyCallableTask(String name) {
- this.name=name;
- }
- @Override
- public String call() throws Exception {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {}
- StringBuilder sb=new StringBuilder("callable task--");
- return sb.append(name).toString();
- }
- }
上面的ExecutorSerivce接口中的invokeAll(tasks)方法用於批量執行任務,並且將結果按照task列表中的順序返回。此外,還存在一個批量執行任務的接口CompletionTask。ExecutorCompletionService是實現CompletionService接口的一個類,該類的實現原理很簡單:
用Executor類來執行任務,同時把在執行任務的Future放到BlockingQueue<Future<V>>隊列中。該類實現的關鍵就是重寫FutureTask類的done()方法,FutureTask類的done()方法是一個鈎子函數(關於鈎子函數,請讀者自行查詢),done()方法在FutureTask任務被執行的時候被調用。
ExecutorCompletionService類的核心代碼如下:
- public Future<V> submit(Runnable task, V result) {
- if (task == null) throw new NullPointerException();
- RunnableFuture<V> f = newTaskFor(task, result);
- executor.execute(new QueueingFuture(f));
- return f;
- }
- private class QueueingFuture extends FutureTask<Void> {
- QueueingFuture(RunnableFuture<V> task) {
- super(task, null);
- this.task = task;
- }
- protected void done() { completionQueue.add(task); }
- private final Future<V> task;
- }
其中的done()方法定義如下:
- /**
- * Protected method invoked when this task transitions to state
- * <tt>isDone</tt> (whether normally or via cancellation). The
- * default implementation does nothing. Subclasses may override
- * this method to invoke completion callbacks or perform
- * bookkeeping. Note that you can query status inside the
- * implementation of this method to determine whether this task
- * has been cancelled.
- */
- protected void done() { }
ExecutorCompletionService的使用示例如下:
- import java.util.concurrent.Callable;
- import java.util.concurrent.CompletionService;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.ExecutorCompletionService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.Future;
- public class Demo{
- public static void main(String [] args) throws InterruptedException, ExecutionException{
- CompletionService<String> cs=new ExecutorCompletionService<String>(
- Executors.newFixedThreadPool(2));
- for(int i=0;i<6;i++){
- cs.submit(new MyCallableTask(i+""));
- }
- for(int i=0;i<6;i++){
- Future<String> future=cs.take();
- //Retrieves and removes the Future representing the next completed task,
- //waiting if none are yet present.
- System.out.println(future.get());
- }
- }
- }
- class MyCallableTask implements Callable<String>{
- private String name;
- public MyCallableTask(String name) {
- this.name=name;
- }
- @Override
- public String call() throws Exception {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {}
- StringBuilder sb=new StringBuilder("callable task--");
- return sb.append(name).toString();
- }
- }