背景
由於項目上要和其他系統交互,而該系統采用同步接口,我們采用單線程調用,接收一條數據平均需要4~6s。而我們需要匯總近三個月的訂單信息,大約一次有幾千條數據,所以進行一次交互大概需要幾百分鍾。經過溝通后,對方系統不願意修改他們的程序,所以我們這邊就要改成多線程多次同時調用接口,來達到降低處理時間的效果。
代碼
話不多說,先上代碼
public ReturnT execute(Map<String, String> map, SchedulerTool tool) { tool.info("<==== MakeOrderOutJob is starting: " + map); //獲取最大線程數 int threadNum = Integer.parseInt(map.get("threadNum")); //獲取線程結束超時時間 int timeOut = Integer.parseInt(map.get("timeOut")); //獲取傳輸最大條數 int max = Integer.parseInt(map.get("max")); //獲取待發送的生產訂單數據 List<MakeOrder> makeOrderList = makeOrderRepository.getMakeOrder(max); tool.info("<==== MakeOrderOutJob makeOrderList size: " + makeOrderList.size()); //返回消息集合 List<MakeOrderReturnVO> makeOrderReturnVoList = new ArrayList<>(); try { //手動創建線程池 ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("thread-call-runner-%d").build(); ExecutorService executor = new ThreadPoolExecutor(threadNum, threadNum, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), namedThreadFactory); //循環所有待傳出數據,調用接口 makeOrderList.stream().forEach(one -> { executor.execute(new ThreadPoolMakeOrderOutTask(tool, user, password, url, one, moRequirementRepository.getMoRequirementList(one.getKid()), makeOrderReturnVoList, restUtil, makeOrderRepository)); }); executor.shutdown(); while (!executor.awaitTermination(timeOut, TimeUnit.SECONDS)) { executor.shutdownNow(); } tool.info("<==== MakeOrderOutJob makeOrderReturnVoList size: " + makeOrderReturnVoList.size()); //循環返回結果更新接口表 makeOrderReturnVoList.stream().forEach((vo) -> { tool.info("<==== MakeOrderOutJob update status: " + vo.getMessageV1()); //更新接口表狀態 vo.setStatus(vo.getType()); tool.info("<==== MakeOrderOutJob update status success: " + vo.getMessageV1()); }); } catch (Exception e) { e.printStackTrace(); tool.error("<==== MakeOrderOutJob is error: " + e.getMessage()); return ReturnT.FAILURE; } return ReturnT.SUCCESS; }
總結
這里用到了線程池ExecutorService
1.線程的創建
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); }
線程池一共有五種
- newCachedThreadPool:用來創建一個可以無限擴大的線程池
- newFixedThreadPool:創建一個固定大小的線程池
- newSingleThreadExecutor:創建一個單線程的線程池
- newScheduledThreadPool:可以延時啟動,定時啟動的線程池
- newWorkStealingPool:創建一個擁有多個任務隊列的線程池
經BD后發現所有的線程池最終都是通過ThreadPoolExecutor創建的
其中各個參數說明:
- corePoolSize : 核心線程數,一旦創建將不會再釋放。如果創建的線程數還沒有達到指定的核心線程數量,將會繼續創建新的核心線程,直到達到最大核心線程數后,核心線程數將不在增加;如果沒有空閑的核心線程,同時又未達到最大線程數,則將繼續創建非核心線程;如果核心線程數等於最大線程數,則當核心線程都處於激活狀態時,任務將被掛起,等待空閑線程來執行。
- maximumPoolSize : 最大線程數,允許創建的最大線程數量。如果最大線程數等於核心線程數,則無法創建非核心線程;如果非核心線程處於空閑時,超過設置的空閑時間,則將被回收,釋放占用的資源。
- keepAliveTime : 當線程空閑時,所允許保存的最大時間,超過這個時間,線程將被釋放銷毀,但只針對於非核心線程。
- unit : 時間單位,TimeUnit.SECONDS等。
- workQueue : 任務隊列,存儲暫時無法執行的任務,等待空閑線程來執行任務。
- threadFactory : 線程工程,用於創建線程。本例用到new ThreadFactoryBuilder().setNameFormat("thread-call-runner-%d").build();來為線程創建名稱。
- handler : 當線程邊界和隊列容量已經達到最大時,用於處理阻塞時的程序
2.創建好了線程后就要執行任務,實現多線程有三種方法
- 繼承Thread類,重寫run方法
- 實現Runnable接口,重寫run方法
- 實現Callable接口,重寫call方法(有返回值)
本例中ThreadPoolMakeOrderOutTask用到了第2種方法
3.任務執行完畢,要結束線程池任務
本例調用shutdown() 方法在終止前允許執行以前提交的任務,shutdown()方法的作用是:停止接收新任務,原來的任務繼續執行
然后調用awaitTermination(long timeOut, TimeUnit unit)方法,使當前線程阻塞,直到:
- 等所有已提交的任務(包括正在跑的和隊列中等待的)執行完
- 或者 等超時時間到了(timeout 和 TimeUnit設定的時間)
- 或者 線程被中斷,拋出InterruptedException
然后會監測 ExecutorService 是否已經關閉,返回true(shutdown請求后所有任務執行完畢)或false(已超時)
最后調用shutdownNow()方法,停止接收新任務,原來的任務停止執行
結果
經測試,啟用多線程后,發現交互時間確實有明顯提升。