業務中,要實現數據日終同步,采用將同步文件中的數據封裝成List集合分批處理加多線程的方式,根據數據量動態設置線程數,同時控制最大並發數量(業務中有IO操作,避免過大並發導致堵塞),實現效率提高
//最大線程數控制 private static int MAX_THREADS= 5; //跑批分頁大小 private static int EXPIRED_PAGE_SIZE = 30; private void dataHandler(List<SyncFileDto> list) { //處理數據數量 int listSize = list.size(); //線程數 int runSize; if (listSize % EXPIRED_PAGE_SIZE == 0) { runSize = (listSize / EXPIRED_PAGE_SIZE); } else { runSize = (listSize / EXPIRED_PAGE_SIZE) + 1; } ThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(runSize); CountDownLatch countDownLatch = new CountDownLatch(runSize); //最大並發線程數控制 final Semaphore semaphore = new Semaphore(MAX_THREADS); List handleList = null; for (int i = 0; i < runSize; i++) { if ((i + 1) == runSize) { int startIndex = i * EXPIRED_PAGE_SIZE; int endIndex = list.size(); handleList = list.subList(startIndex, endIndex); } else { int startIndex = i * EXPIRED_PAGE_SIZE; int endIndex = (i + 1) * EXPIRED_PAGE_SIZE; handleList = list.subList(startIndex, endIndex); } SyncTask task = new SyncTask(handleList, countDownLatch, semaphore); executor.execute(task); } try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } finally{ executor.shutdown(); } } class SyncTask implements Runnable { private List<SyncFileDto> list; private CountDownLatch countDownLatch; private Semaphore semaphore; public SyncSyncTask(List<SyncFileDto> list, CountDownLatch countDownLatch, Semaphore semaphore) { this.list = list; this.countDownLatch = countDownLatch; this.semaphore = semaphore; } @Override public void run() { if (!CollectionUtils.isEmpty(list)) { try { semaphore.acquire(); list.stream().forEach(fileDto -> { //業務處理 }); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } } //線程任務完成 countDownLatch.countDown(); } }
上面是通過手動數據分片,CountDownLatch計數器閉鎖和Semaphore限流的方式進行的並發控制,后期改造時發現邏輯較復雜,因此改變線程池的類型,創建可控制的線程池ThreadPoolExecutor(該線程池也是ScheduledThreadPoolExecutor的父類),自定義其屬性實現跑批線程池線程數量及並發量可控。
ExecutorService fixedThreadPool = new ThreadPoolExecutor(INIT_NTHREADS, INIT_NTHREADS, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
該線程池的使用我在后面博客【地址】 中有介紹,可移步閱讀。