轉自:https://blog.csdn.net/xiao__miao/article/details/86352380
1.近期工作的時候,運維通知一個系統的內存一直在增長,leader叫我去排查,我開始看了一下,沒處理,leader自己去看了一下,發現是線程池的問題,我開頭沒注意那塊,一看才發現,確實因為CompletionService里的結果隊列引起的。CompletionService里面有一個BlockingQueue維護結果,如果不去取結果就會導致一直里面一直增長
@SuppressWarnings("unchecked") public void doExecute(Msg msg, List<Object> actList) { try { // 1、開啟任務處理mq消息 service.submit(new ActMqTask(msg, actList)); } catch (Exception e) { LOG.error(prefix + " doExecute is Exception", e); msg.setStatus(MqMsgStatus.PROCESS); msg.setResultDesc("消息處理異常" + e.getMessage()); } }
就這段代碼,里面沒有去消費這個結果隊列,導致結果隊列一直增長。
已經找原因了,那現在分析下這個ExecutorCompletionService
分析前,我是會默認當前讀者是會使用線程池以及了解FutureTask了,不熟悉的源碼強烈建議看下這篇博文Java線程池源碼分析,讀完可能理解就輕松許多
接下來我們就進入分析階段
1.ExecutorCompletionService
來看下這段代碼,網上都有的
public static void main(String[] args) throws InterruptedException, ExecutionException { Random random = new Random(); ExecutorService pool = Executors.newFixedThreadPool(3); CompletionService<String> service = new ExecutorCompletionService<String>(pool); for(int i = 0; i<4; i++) { service.submit(() -> { Thread.sleep(random.nextInt(1000)); System.out.println(Thread.currentThread().getName()+"|完成任務"); return "data"+random.nextInt(10); }); } for(int j = 0; j < 4; j++) { Future<String> take = service.take(); //這一行沒有完成的任務就阻塞 String result = take.get(); // 這一行在這里不會阻塞,引入放入隊列中的都是已經完成的任務 System.out.println("獲取到結果:"+result); } } CompletionService里的結果集,就是take出來的結果,不是先進先出原則,先完成先出 所以你放入blockingQueue<Future<V>>都是已經完成的執行結果。所以take去拿的時候都是由結果的不會去阻塞 public class ExecutorCompletionService<V> implements CompletionService<V> { private final Executor executor; private final AbstractExecutorService aes; private final BlockingQueue<Future<V>> completionQueue; private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); } private final Future<V> task; } public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; } ....... } 這里主要重寫了FutureTask<Void>里的done方法,執行完之后把結果集放入blockQueue里
再貼一段日常的結果集代碼,與之對比
public static void main(String[] args) throws InterruptedException, ExecutionException { Random random = new Random(); ExecutorService pool = Executors.newFixedThreadPool(5); List<Future<String>> resultFuture = new ArrayList<>(); for(int i = 0; i<4; i++) { final int tmp = i; Future<String> future = pool.submit(() -> { Thread.sleep(1000+10*tmp); System.out.println(Thread.currentThread().getName()+"|完成任務"); return "data"+random.nextInt(10); }); resultFuture.add(future); } System.out.println("--------------"); for(Future<String> future:resultFuture) { String result = future.get(); System.out.println("執行結果"+result); } }
區別對比
1.上面這段代碼里沒有維護一個結果集的隊列
2.取出的結果的不同和執行效率的不同。ExecutorCompletionService里拿結果是最快的,他是根據里面的任務完成就取出。而上面那段代碼是根據任務先后順序然后取出結果集。
注意:
一:結果集的順序,因為ExecutorCompletionService是根據完成的先后,順序是不定的