CompletionService用法踩坑解決優化


 

轉自:https://blog.csdn.net/xiao__miao/article/details/86352380 

 

1.近期工作的時候,運維通知一個系統的內存一直在增長,leader叫我去排查,我開始看了一下,沒處理,leader自己去看了一下,發現是線程池的問題,我開頭沒注意那塊,一看才發現,確實因為CompletionService里的結果隊列引起的。CompletionService里面有一個BlockingQueue維護結果,如果不去取結果就會導致一直里面一直增長

  @SuppressWarnings("unchecked")
    public void doExecute(Msg msg, List<Object> actList) {
        try {
            // 1、開啟任務處理mq消息
            service.submit(new ActMqTask(msg, actList));
        } catch (Exception e) {
            LOG.error(prefix + " doExecute is Exception", e);
            msg.setStatus(MqMsgStatus.PROCESS);
            msg.setResultDesc("消息處理異常" + e.getMessage());
        }
 
    }

 

就這段代碼,里面沒有去消費這個結果隊列,導致結果隊列一直增長。

 

已經找原因了,那現在分析下這個ExecutorCompletionService

分析前,我是會默認當前讀者是會使用線程池以及了解FutureTask了,不熟悉的源碼強烈建議看下這篇博文Java線程池源碼分析,讀完可能理解就輕松許多

接下來我們就進入分析階段

1.ExecutorCompletionService

來看下這段代碼,網上都有的

public static void main(String[] args) throws InterruptedException, ExecutionException {
 
    Random random = new Random();
    ExecutorService pool = Executors.newFixedThreadPool(3);
 
    CompletionService<String> service = new ExecutorCompletionService<String>(pool);
 
    for(int i = 0; i<4; i++) {
 
        service.submit(() -> {
            Thread.sleep(random.nextInt(1000));
            System.out.println(Thread.currentThread().getName()+"|完成任務");
            return "data"+random.nextInt(10);
        });
    }
 
    for(int j = 0; j < 4; j++) {
        Future<String> take = service.take(); //這一行沒有完成的任務就阻塞
        String result = take.get(); // 這一行在這里不會阻塞,引入放入隊列中的都是已經完成的任務
        System.out.println("獲取到結果:"+result);
    }   
} 


CompletionService里的結果集,就是take出來的結果,不是先進先出原則,先完成先出

所以你放入blockingQueue<Future<V>>都是已經完成的執行結果。所以take去拿的時候都是由結果的不會去阻塞

public class ExecutorCompletionService<V> implements CompletionService<V> {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;
 
 
    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }
    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }
    .......
}

這里主要重寫了FutureTask<Void>里的done方法,執行完之后把結果集放入blockQueue里

再貼一段日常的結果集代碼,與之對比

public static void main(String[] args) throws InterruptedException, ExecutionException {
 
    Random random = new Random();
    ExecutorService pool = Executors.newFixedThreadPool(5);     
    List<Future<String>> resultFuture = new ArrayList<>();
 
    for(int i = 0; i<4; i++) {
        final int tmp = i;
        Future<String> future = pool.submit(() -> {
            Thread.sleep(1000+10*tmp);
            System.out.println(Thread.currentThread().getName()+"|完成任務");
            return "data"+random.nextInt(10);
        });
        resultFuture.add(future);
    }
    System.out.println("--------------");
 
    for(Future<String> future:resultFuture) {
        String result = future.get();
        System.out.println("執行結果"+result);      
    }
}

區別對比

1.上面這段代碼里沒有維護一個結果集的隊列

2.取出的結果的不同和執行效率的不同。ExecutorCompletionService里拿結果是最快的,他是根據里面的任務完成就取出。而上面那段代碼是根據任務先后順序然后取出結果集。

 

注意:

一:結果集的順序,因為ExecutorCompletionService是根據完成的先后,順序是不定的

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM