使用ExecutorCompletionService 管理線程池處理任務的返回結果


在我們日常使用線程池的時候,經常會有需要獲得線程處理結果的時候。此時我們通常有兩種做法。

1. 使用並發容器將callable.call() 的返回Future存儲起來。然后使用一個消費者線程去遍歷這個並發容器,調用Future.isDone()去判斷各個任務是否處理完畢。然后再處理響應的業務。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;

public class ExecutorResultManager {

    public static void main(String[] args) {
        // 隊列
        BlockingQueue<Future<String>> futures = new LinkedBlockingQueue<>();
        
        // 生產者
        new Thread() {
            @Override
            public void run() {
                ExecutorService pool = Executors.newCachedThreadPool();
                
                for (int i=0; i< 10; i++) {
                    int index = i;
                    Future<String> submit = pool.submit(new Callable<String>() {
                        @Override
                        public String call() throws Exception {
                            return "task done" + index;
                        }
                    });
                    try {
                        futures.put(submit);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();
        
        

        // 消費者
        new Thread() {
            @Override
            public void run() {
                while(true) {
                    for (Future<String> future : futures) {
                        if(future.isDone()) {
                            // 處理業務
                            // .............
                            
                            
                        };
                    }
                }
            }
        }.start();
    }

}

2. 使用jdk 自帶線程池結果管理器:ExecutorCompletionService。它將BlockingQueue 和Executor 封裝起來。然后使用ExecutorCompletionService.submit()方法提交任務。

submit 方法如下:

 public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
 
        // RunnableFuture封裝了任務,使得任務既能run 也能get()
        RunnableFuture<V> f = newTaskFor(task);
        // 使用一個繼承Runnable類的QueueingFutue 再次封裝了我們的任務
        executor.execute(new QueueingFuture(f));
        return f;
   }

我們再來看看QueueingFuture:

// 繼承自FutureTask, FutureTask 也是一個Runnable的子類
 private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        // 實現了FutureTask 的done 方法,在任務處理完畢或者拋異常后將封裝成Future的任務加入到隊列。這樣我們就能在隊列中取到處處理完的任務,並通過Future.get()方法去取得處理完后的結果。不用自己去判斷任務是否處理完畢了
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }    

簡單實現:

import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ExecutorCompletionServiceManager {

    public static void main(String[] args) {

        ExecutorCompletionService<String> service = new ExecutorCompletionService<String>(
                Executors.newCachedThreadPool());

        // 生產者
        new Thread() {
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    int index = i;
                    service.submit(new Callable<String>() {
                        @Override
                        public String call() throws Exception {
                            return "task done" + index;
                        }
                    });
                }
            }
        }.start();
        

        // 消費者
        new Thread() {
            @Override
            public void run() {
                try {
                    Future<String> take = service.take();
                    // do some thing........
                    
                    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }.start();

    }
}

相對於原始的自造輪子處理方式,jdk 自帶的工具類處理方式顯得優雅了許多。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM