情景: 在高並發的請求中, 立刻返回響應消息, 調用第三方接口 繼續執行, 且超時調用停止該線程,
package com.cjcx.inter.utils; import java.util.concurrent.*; /** * */ public class ThreadPoolTest { // 線程池維護線程的最少數量 private static Integer corePoolSize = 2; // 線程池維護線程的最大數量 private static Integer maxPoolSize = 10; // 線程池維護線程所允許的空閑時間 private static Integer keepAliveTime = 90; // 線程池所使用的緩沖隊列大小 private static Integer workQueueSize = 1024; public static void main(String[] args) { try { ThreadPoolExecutor pool = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadPoolExecutor.CallerRunsPolicy()); // 每秒輸出線程池情況 new Thread(new Runnable() { @Override public void run() { try { while (true) { Executors.newFixedThreadPool(2); Thread.sleep(3000); System.out.println("============================================活躍的線程數:" + pool.getActiveCount() + ",核心線程數:" + pool.getCorePoolSize() + ",線程池大小:" + pool.getPoolSize() + ",隊列的大小" + pool.getQueue().size() + "=============================================="); } } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); // 多任務執行 for (int i = 0; i < 2000; i++) { final int k = i; System.out.println("i:" + i); /* 可丟棄任務(超出隊列) 后台定時沖DB拉取再處理的場景, if(pool.getQueue().size() > 1000){ return; }*/ pool.execute(new Runnable() { @Override public void run() { // 任務 小票接口 上傳給IBC try { long s = System.currentTimeMillis(); System.out.println(Thread.currentThread().getName() + "===================== 任務" + k + " 開始"); Future<Integer> future = executorService.submit(new Callable<Integer>() { public Integer call() throws Exception { return executeTask(k); } }); try { Integer result = future.get(1, TimeUnit.SECONDS); System.out.println("設備ID: " + k + ", 結果:" + result + " 上傳接口程序 正常完成"); UploadDescribe.getInstance().record(System.currentTimeMillis() - s); } catch (InterruptedException | ExecutionException | TimeoutException e) { future.cancel(true); UploadDescribe.getInstance().cancel("DV_" + k); System.out.println("設備ID: " + k + " 上傳接口程序 1秒內未完成, 停止上傳.退出"); } finally { System.out.println(Thread.currentThread().getName() + "清理資源===================== 任務" + k + " 結束, 用時:" + (System.currentTimeMillis() - s)); } } catch (Exception e) { e.printStackTrace(); } } }); } } catch (Exception e) { e.printStackTrace(); } } // 上傳IBC任務 static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); private static Integer executeTask(Integer k) { Integer r = -1; try { int s = (int) (Math.random() * 2000); System.out.println("k:" + k + ", sleep:" + s); Thread.sleep(s); } catch (InterruptedException e) { e.printStackTrace(); } r = 0; return r; } }
// 編寫一個類記錄 執行的成功,失敗次數,記錄成功的平均耗時
package com.cjcx.inter.utils; import java.util.HashSet; import java.util.Set; import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 接口上傳情況 * @author e58 * */ public class UploadDescribe { private Logger logger = LoggerFactory.getLogger(UploadDescribe.class); private static UploadDescribe describe; private UploadDescribe() {} public synchronized static UploadDescribe getInstance() { if (describe == null) { describe = new UploadDescribe(); } return describe; } private ReentrantLock put = new ReentrantLock(); // 上傳超時時間ms private Long connectTimeOut = 5000L; // 上傳總次數 private Integer uploadTimes = 0; // 上傳平均耗時ms private Long uploadAverageTime = 0L; // 超時總次數 private Integer cancelTimes = 0; // 超時的設備ID private Set<String> sets = new HashSet<String>(); /** * 請求成功,記錄次數和耗時 */ public void record(Long increTime){ try { put.lock(); // 次數+1 uploadTimes = uploadTimes + 1; // 平均時間 if(uploadTimes != 0) uploadAverageTime = (uploadAverageTime + increTime ) / uploadTimes; desc(); } catch (Exception e) { logger.error("UploadDescribe record:", e); } finally{ put.unlock(); } } /** * 設備請求超時耗時 */ public void cancel(String deviceId){ try { put.lock(); // 次數+1 cancelTimes = cancelTimes + 1; // 平均時間 sets.add(deviceId); desc(); } catch (Exception e) { logger.error("UploadDescribe cancel:", e); } finally{ put.unlock(); } } /** * 清空數據 */ public void clear(){ try { put.lock(); uploadTimes = 0; uploadAverageTime = 0L; cancelTimes = 0; sets.clear(); } catch (Exception e) { logger.error("UploadDescribe clear:", e); } finally{ put.unlock(); } } /** * 輸出描述信息 */ public void desc(){ logger.info("uploadTimes=" + uploadTimes + ", uploadAverageTime=" + uploadAverageTime + ", cancelTimes=" + cancelTimes + "]"); } }