一、基礎篇
1.線程池的參數
1> CorePoolSize 核心線程數
2> MaxPoolSize 最大線程數
3> QueueCapacity 隊列容量
4> KeepAliveSecond 沒有任務存活時間
5> TimeUnit 時間單位
6> rejectedExecutionHandler 拒絕策略
7> threadFactory一般使用默認的即可
2.拒絕策略
1>ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。
2>ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。
3>ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執行任務(重復此過程)
4>ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務
二、應用篇
1.使用@Scheduled注解實現簡單定時任務,如:沒5秒中打印一次come on
1>項目啟動類添加@EnableScheduling注解開啟
2>定時類上加@Component注解
3>定時方法上加@Scheduled(fixedDelay = 5000) 或者@Scheduled(cron = "0/3 * * * * *")
三、線程池的使用
@Slf4j
@Component
public class DataMarketComponent implements CommandLineRunner {
@Override
public void run(String... args) throws Exception {
dynamicTimerComponent.register("proxy_concurrent_req", new DataMarketComponent.ProxyConcurrentReq(), "30 */1 * * * ?");
}
/***
* 根據proxyType proxyid統計每分鍾的請求量
*/
public class ProxyConcurrentReq implements Runnable {
@Override
public void run() {
//多線程池
ExecutorService exs = null;
try {
//代理統計對象集合
if (tableList.size() > 0){
List<ProxyConReqLog> proxyConReqLogList = new ArrayList<>();
ProxyConReqLog proxyConReqLog;
List<Future<Map<String,Map<String,Long>>>> futureMap = new ArrayList<>();
String startTimeStr = DateUtil.getStringFormat(startTime, DateUtil.DATETIMEFORMAT2);
String endTimeStr = DateUtil.getStringFormat(endTime, DateUtil.DATETIMEFORMAT2);
//開啟多線程
exs = Executors.newFixedThreadPool(tableList.size());
for (String otsTableName : tableList){
if (otsTableName.contains("ods_output_proxy_")){
try {
taskId = otsTableName.replaceAll("ods_output_proxy_","").replaceAll("_","");
String tableIndex = "_index_" + taskId;
if (null != dispatcherService.getOne(new Dispatcher(taskId,null))){
//1.高速提交任務,每個任務返回一個Future入list
futureMap.add(exs.submit(new CallableTaskAgg(startTimeStr, endTimeStr, otsTableName, tableIndex)));
}
} catch (Exception e) {
log.error("提交任務發生錯誤", e.getMessage());
}
}
}
//所有表聚合到一起的結果
Map<String,Long> httpMap = new HashMap<>();
Map<String,Long> tcpMap = new HashMap<>();
for (Future<Map<String,Map<String,Long>>> future : futureMap) {
while (true) {
if (future.isDone()&& !future.isCancelled()) {
Map<String, Map<String, Long>> tableResult = future.get();
Map<String, Long> singleTcpMap = tableResult.get(ProxyTypeEnum.TCP.code);
Map<String, Long> singleHttpMap = tableResult.get(ProxyTypeEnum.HTTP.code);
if (null != singleTcpMap && !CollectionUtils.isEmpty(singleTcpMap)){
singleTcpMap.forEach((key,value) -> tcpMap.merge(key,value,Long::sum));
}
if (null != singleHttpMap && !CollectionUtils.isEmpty(singleHttpMap)){
singleHttpMap.forEach((key,value) -> httpMap.merge(key,value,Long::sum));
}
break;
} else {
//每次輪詢休息1毫秒(CPU納秒級),避免CPU高速輪循耗空CPU
Thread.sleep(1);
}
}
}
}
} catch (Exception e) {
log.error("run exception, message is : {} ", e.getMessage());
} finally {
exs.shutdown();
}
}
}
private String generatorId(ProxyConReqLog proxyConReqLog) {
String dataVersion = DateUtil.getDateFormat(new Date(),"yyyyMMddHHmmss");
String uuid = UUID.randomUUID().toString().replaceAll("-", "").toLowerCase();
String bn = dataVersion + uuid.substring(0,8);
return bn;
}
class CallableTaskAgg implements Callable{
String otsTableName;
String tableIndex;
String startTime;
String endTime;
public CallableTaskAgg(String startTime, String endTime, String otsTableName, String tableIndex) {
this.startTime = startTime;
this.endTime = endTime;
this.otsTableName = otsTableName;
this.tableIndex = tableIndex;
}
@Override
public Map<String,Map<String,Long>> call() {
Map map = otsQueryService.subGroupByFilter(startTime, endTime, otsTableName, tableIndex, "proxy_type", "proxy_id");
return map;
}
}
}