ThreadPoolTaskExecutor執行任務小總結


一、基礎篇

 1.線程池的參數

 1> CorePoolSize  核心線程數

 2> MaxPoolSize 最大線程數

 3> QueueCapacity 隊列容量

 4> KeepAliveSecond  沒有任務存活時間

 5> TimeUnit  時間單位

 6> rejectedExecutionHandler 拒絕策略

 7> threadFactory一般使用默認的即可

2.拒絕策略

 1>ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。

 2>ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。

 3>ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執行任務(重復此過程)

 4>ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務

二、應用篇

1.使用@Scheduled注解實現簡單定時任務,如:沒5秒中打印一次come on

1>項目啟動類添加@EnableScheduling注解開啟

2>定時類上加@Component注解

3>定時方法上加@Scheduled(fixedDelay = 5000) 或者@Scheduled(cron = "0/3 * * * * *")

 

三、線程池的使用


@Slf4j
@Component
public class DataMarketComponent implements CommandLineRunner {




@Override
public void run(String... args) throws Exception {

dynamicTimerComponent.register("proxy_concurrent_req", new DataMarketComponent.ProxyConcurrentReq(), "30 */1 * * * ?");
}

/***
* 根據proxyType proxyid統計每分鍾的請求量
*/
public class ProxyConcurrentReq implements Runnable {
@Override
public void run() {

//多線程池
ExecutorService exs = null;
try {

//代理統計對象集合

if (tableList.size() > 0){
List<ProxyConReqLog> proxyConReqLogList = new ArrayList<>();
ProxyConReqLog proxyConReqLog;
List<Future<Map<String,Map<String,Long>>>> futureMap = new ArrayList<>();
String startTimeStr = DateUtil.getStringFormat(startTime, DateUtil.DATETIMEFORMAT2);
String endTimeStr = DateUtil.getStringFormat(endTime, DateUtil.DATETIMEFORMAT2);
//開啟多線程
exs = Executors.newFixedThreadPool(tableList.size());
for (String otsTableName : tableList){
if (otsTableName.contains("ods_output_proxy_")){
try {
taskId = otsTableName.replaceAll("ods_output_proxy_","").replaceAll("_","");
String tableIndex = "_index_" + taskId;
if (null != dispatcherService.getOne(new Dispatcher(taskId,null))){
//1.高速提交任務,每個任務返回一個Future入list
futureMap.add(exs.submit(new CallableTaskAgg(startTimeStr, endTimeStr, otsTableName, tableIndex)));
}
} catch (Exception e) {
log.error("提交任務發生錯誤", e.getMessage());
}
}
}
//所有表聚合到一起的結果
Map<String,Long> httpMap = new HashMap<>();
Map<String,Long> tcpMap = new HashMap<>();
for (Future<Map<String,Map<String,Long>>> future : futureMap) {
while (true) {
if (future.isDone()&& !future.isCancelled()) {
Map<String, Map<String, Long>> tableResult = future.get();
Map<String, Long> singleTcpMap = tableResult.get(ProxyTypeEnum.TCP.code);
Map<String, Long> singleHttpMap = tableResult.get(ProxyTypeEnum.HTTP.code);
if (null != singleTcpMap && !CollectionUtils.isEmpty(singleTcpMap)){
singleTcpMap.forEach((key,value) -> tcpMap.merge(key,value,Long::sum));
}
if (null != singleHttpMap && !CollectionUtils.isEmpty(singleHttpMap)){
singleHttpMap.forEach((key,value) -> httpMap.merge(key,value,Long::sum));
}
break;
} else {
//每次輪詢休息1毫秒(CPU納秒級),避免CPU高速輪循耗空CPU
Thread.sleep(1);
}
}
}





}
} catch (Exception e) {
log.error("run exception, message is : {} ", e.getMessage());
} finally {
exs.shutdown();
}

}
}

private String generatorId(ProxyConReqLog proxyConReqLog) {
String dataVersion = DateUtil.getDateFormat(new Date(),"yyyyMMddHHmmss");
String uuid = UUID.randomUUID().toString().replaceAll("-", "").toLowerCase();
String bn = dataVersion + uuid.substring(0,8);
return bn;

}



class CallableTaskAgg implements Callable{

String otsTableName;
String tableIndex;
String startTime;
String endTime;

public CallableTaskAgg(String startTime, String endTime, String otsTableName, String tableIndex) {
this.startTime = startTime;
this.endTime = endTime;
this.otsTableName = otsTableName;
this.tableIndex = tableIndex;
}
@Override
public Map<String,Map<String,Long>> call() {
Map map = otsQueryService.subGroupByFilter(startTime, endTime, otsTableName, tableIndex, "proxy_type", "proxy_id");
return map;
}
}
}

 

 
       


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM