出處:https://blog.csdn.net/kity9420/article/details/80740466
前言
經常會遇到一些性能問題,比如調用某個接口,可能要循環調用100次,並且需要拿到每一次調用的返回結果,通常我們都是放在for循環中一次次的串行調用,這種方式可想而知道有多慢,那怎么解決這個問題呢?
多線程
為了解決以上問題,我使用的方式是多線程。多線程常規的有兩種實現方式,即繼承Tread類,實現Runnable接口,但是這兩種實現方式,有一個共同的問題,就是沒有返回值,對於我們來說,獲得每個線程的返回值,是個很困難的問題,因此不能用Tread類或Runnable接口,我用的是Callable和ThreadPoolExecutor,Callable的process方法可以允許有返回值,ThreadPoolExecutor的invokeAll或submit方法可以拿到線程的執行結果
案例
假設需要給100個用戶發送郵件,並需要每個用戶的返回結果,先看下代碼結構
CallableTemplate.java
package com.gdut.thread.multiThread; import java.util.concurrent.Callable; /** * 多線程模板類 * @author yang.han * * @param <V> */ public abstract class CallableTemplate<V> implements Callable<V>{ /** * 前置處理,子類可以Override該方法 */ public void beforeProcess() { System.out.println("before process"); } /** * 處理業務邏輯的方法,需要子類去Override * @param <V> * @return */ public abstract V process(); /** * 后置處理,子類可以Override該方法 */ public void afterProcess() { System.out.println("after process"); } @Override public V call() throws Exception { beforeProcess(); V result = process(); afterProcess(); return result; } }
CallableTemplate類實現了Callable接口,並實現了process方法,該類是一個抽象類,接收任意返回值的類型,beforeProcess方法為前置處理,afterProcess的后置處理,process為具體的業務邏輯抽象方法,該方法在子類中實現
IConcurrentThreadPool.java
package com.gdut.thread.multiThread; import java.util.List; import java.util.concurrent.ExecutionException; public interface IConcurrentThreadPool { /** * 初始化線程池 */ void initConcurrentThreadPool(); /** * 提交單個任務 * @param <V> * @param task * @return * @throws InterruptedException * @throws ExecutionException */ <V> V submit(CallableTemplate<V> task) throws InterruptedException, ExecutionException; /** * 提交多個任務 * @param <V> * @param tasks * @return * @throws InterruptedException * @throws ExecutionException */ <V> List<V> invokeAll(List<? extends CallableTemplate<V>> tasks) throws InterruptedException, ExecutionException; }
IConcurrentThreadPool是多線程接口類,聲名了三個方法,initConcurrentThreadPool:初始化線程池,submit:提交單個任務的線程,並有返回值,invokeAll:提交多個任務的線程,並有返回值
ConcurrentThreadPool.java
package com.gdut.thread.multiThread; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ConcurrentThreadPool implements IConcurrentThreadPool{ private ThreadPoolExecutor threadPoolExecutor; // 核心線程數 private int corePoolSize = 10; // 最大線程數 private int maximumPoolSize = 20; // 超時時間30秒 private long keepAliveTime = 30; @Override public void initConcurrentThreadPool() { threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>() ); } @Override public <V> V submit(CallableTemplate<V> task) throws InterruptedException, ExecutionException { Future<V> result = threadPoolExecutor.submit(task); return result.get(); } @Override public <V> List<V> invokeAll(List<? extends CallableTemplate<V>> tasks) throws InterruptedException, ExecutionException { List<Future<V>> tasksResult = threadPoolExecutor.invokeAll(tasks); List<V> resultList = new ArrayList<V>(); for(Future<V> future : tasksResult) { resultList.add(future.get()); } return resultList; } }
ConcurrentThreadPool是創建線程池的實現類,用到了ThreadPoolExecutor線程池類及這個類的invokeAll方法和submit方法,這兩個方法的返回值,都可以通過Future類的get方法獲得
ICallableTaskFrameWork.java
package com.gdut.thread.multiThread; import java.util.List; import java.util.concurrent.ExecutionException; public interface ICallableTaskFrameWork { <V> List<V> submitsAll(List<? extends CallableTemplate<V>> tasks) throws InterruptedException, ExecutionException; }
ICallableTaskFrameWork是定義的線程任務框架接口,所有的多線程調用,都通過該接口發起
CallableTaskFrameWork.java
package com.gdut.thread.multiThread; import java.util.List; import java.util.concurrent.ExecutionException; public class CallableTaskFrameWork implements ICallableTaskFrameWork{ private IConcurrentThreadPool concurrentThreadPool = new ConcurrentThreadPool(); @Override public <V> List<V> submitsAll(List<? extends CallableTemplate<V>> tasks) throws InterruptedException, ExecutionException { concurrentThreadPool.initConcurrentThreadPool(); return concurrentThreadPool.invokeAll(tasks); } }
CallableTaskFrameWork是ICallableTaskFrameWork 的實現類,在submitsAll實現方法中,通過調用線程池對象IConcurrentThreadPool接口的invokeAll方法來發起多線程的調用,這里注意一個,在submitAll實現方法中,我手動的調用了初始化線程池的方法concurrentThreadPool.initConcurrentThreadPool(),在真實的項目上,應該在應用啟動的時候就調用該方法來初始化線程池
測試類代碼
SendMessageService.java,假設這是一個發送郵件信息的服務類
package com.gdut.thread.multiThread; public class SendMessageService { public void sendMessage(String email,String content){ System.out.println("發送郵件。。。"); } }
SendMessageHander.java,多線程發送郵件的處理類
package com.gdut.thread.multiThread; import java.util.HashMap; import java.util.Map; public class SendMessageHander extends CallableTemplate<Map<String, String>>{ private String email; private String content; public SendMessageHander(String email,String content) { this.email = email; this.content = content; } @Override public Map<String, String> process() { SendMessageService sendMessageService = new SendMessageService(); sendMessageService.sendMessage(email, content); Map<String, String> map = new HashMap<String, String>(); map.put(email, content); return map; } }
這個類繼承了上面的CallableTemplate,我們要的返回值是Map,因此泛型類型是Map,在類中還重寫了process方法,在方法中調用發送郵件的業務邏輯接口SendMessageService.sendMessage,並將返回結果組裝成Map返回,這里我就簡單處理了,將郵件地址及內容放在Map中直接返回了;另外還要注意這個類有個有參構造器,通過構建器可以接收需要傳遞進來的參數
SendMessageTest.java,測試類
package com.gdut.thread.multiThread; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ExecutionException; public class SendMessageTest { public static void main(String[] args) throws InterruptedException, ExecutionException { ICallableTaskFrameWork callableTaskFrameWork = new CallableTaskFrameWork(); List<CallableTemplate<Map<String, String>>> tasks = new ArrayList<CallableTemplate<Map<String, String>>>(); SendMessageHander sendMessageHander = null; // 將需要發送郵件的郵件地址及內容組裝好,放在一個集合中 for (int i = 0; i < 1000; i++) { sendMessageHander = new SendMessageHander("email" + i, "content" + i); tasks.add(sendMessageHander); } //通過多線程一次性發起郵件,並拿到返回結果集 List<Map<String, String>> results = callableTaskFrameWork.submitsAll(tasks); // 解析返回結果集 for (Map<String, String> map : results) { for (Entry<String, String> entry : map.entrySet()) { System.out.println(entry.getKey() + "\t" + entry.getValue()); } } } }
運行結果
附錄:還可以看這邊文章: java並發異步編程 原來十個接口的活現在只需要一個接口就搞定!