package com.loan.modules.common.util; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @SuppressWarnings("all") public class ETLThreadPool { private static ThreadPoolExecutor etlExectutor = null; /** * 功能:得到線程池實例 * @param corePoolSize 線程池維護線程的最少數量 * @param maximumPoolSize 線程池維護線程的最大數量 * @param keepAliveTime 線程池維護線程所允許的空閑時間 * @param unit 線程池維護線程所允許的空閑時間的單位 * @param workQueue 線程池所使用的緩沖隊列 * @return */ @SuppressWarnings("unchecked") public static ThreadPoolExecutor getThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { synchronized (ETLThreadPool.class) { if (etlExectutor == null) { etlExectutor = createExecutor( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } } return etlExectutor; } /** * 功能:創建ThreadPoolExecutor實例; * @param corePoolSize:核心線程數量 * @param maximumPoolSize:最大線程數量 * @param keepAliveTime:線程空閑保持時間 * @param unit:時間單位 * @param workQueue:工作隊列 * @param handler:舊任務拋棄策略 * @return * ThreadPoolExecutor */ @SuppressWarnings("unchecked") private static ThreadPoolExecutor createExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { etlExectutor = new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); return etlExectutor; } }
package com.loan.modules; import java.util.Queue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import com.loan.modules.common.util.ETLThreadPool; public class test { private static ThreadPoolExecutor cachedThreadPool = ETLThreadPool.getThreadPool(8,10, 3000, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(20000)); public static void main(String[] args) throws InterruptedException { Thred1(); Thred2(); Thred1(); } public static void Thred1() throws InterruptedException{ int total = 30; final CountDownLatch countDownLatch = new CountDownLatch(total); for (int i = 0; i < total; i++) { Thred1 t1 = new Thred1(i,countDownLatch); cachedThreadPool.execute(t1); } countDownLatch.await();// 等待所有子線程執行完 } public static void Thred2() throws InterruptedException{ int total = 5; final CountDownLatch countDownLatch = new CountDownLatch(total); for (int i = 0; i < total; i++) { cachedThreadPool.execute(new Runnable() { @Override public void run() { // 批量向instinct系統發送進件信息 // 計數器 減一 System.out.println("2"); countDownLatch.countDown(); } }); } countDownLatch.await();// 等待所有子線程執行完 } private synchronized static int getQueueSize(Queue queue) { return queue.size(); } }