public void dealGovernanceStrategyNew(List<StrategyStreamOperation> commonAll, StrategyDetail strategyDetail, List<String> instanceList) {
if (Objects.isNull(strategyDetail.getType()) && Objects.isNull(strategyDetail.getRetainNum())) {
// 500一組分批處理
List<List<String>> lists = ListSplitUtil.splitList(instanceList, 500);
// 對於集合寫操作:synchronizedList 相對於Vector 、CopyOnWriteArrayList性能更佳。 讀操作建議CopyOnWriteArrayList
// 多線程批量處理數據(500一組) 這里使用java8 CompletableFuture.supplyAsync實現多線程處理
List<StrategyStreamOperation> operationList = Collections.synchronizedList(new ArrayList<>());
// 手動創建線程池
ExecutorService executorService = new ThreadPoolExecutor(lists.size(), lists.size(), 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), new ThreadPoolExecutor.AbortPolicy());
lists.stream().forEach((List<String> item) ->{
CompletableFuture<List<StrategyStreamOperation>> cf = CompletableFuture.supplyAsync(()->{
// 持續天數的峰值判斷的
List<StrategyStreamOperation> futureList = getStrategyByCommon();
return futureList;
}, executorService);
try {
// 獲取線程執行結果
operationList.addAll(cf.get());
} catch (InterruptedException e) {
log.error("從線程中獲取執行結果失敗", e);
} catch (ExecutionException e) {
log.error("從線程中獲取執行結果失敗", e);
}
});
// 關閉線程池,不再接收新任務
executorService.shutdown();
commonAll.addAll(operationList);
}
}