import com.google.common.collect.Lists; import org.apache.commons.collections.CollectionUtils; import java.util.List; /** * 拆分結合工具類 * * @author shiwen * @date 2020/12/27 */ public class SplitListUtils { /** * 拆分集合 * * @param <T> 泛型對象 * @param resList 需要拆分的集合 * @param subListLength 每個子集合的元素個數 * @return 返回拆分后的各個集合組成的列表 * 代碼里面用到了guava和common的結合工具類 **/ public static <T> List<List<T>> split(List<T> resList, int subListLength) { if (CollectionUtils.isEmpty(resList) || subListLength <= 0) { return Lists.newArrayList(); } List<List<T>> ret = Lists.newArrayList(); int size = resList.size(); if (size <= subListLength) { // 數據量不足 subListLength 指定的大小 ret.add(resList); } else { int pre = size / subListLength; int last = size % subListLength; // 前面pre個集合,每個大小都是 subListLength 個元素 for (int i = 0; i < pre; i++) { List<T> itemList = Lists.newArrayList(); for (int j = 0; j < subListLength; j++) { itemList.add(resList.get(i * subListLength + j)); } ret.add(itemList); } // last的進行處理 if (last > 0) { List<T> itemList = Lists.newArrayList(); for (int i = 0; i < last; i++) { itemList.add(resList.get(pre * subListLength + i)); } ret.add(itemList); } } return ret; } // 運行代碼 public static void main(String[] args) { List<String> list = Lists.newArrayList(); int size = 1099; for (int i = 0; i < size; i++) { list.add("hello-" + i); } // 大集合里面包含多個小集合 List<List<String>> temps = split(list, 100); int j = 0; // 對大集合里面的每一個小集合進行操作 for (List<String> obj : temps) { System.out.println(String.format("row:%s -> size:%s,data:%s", ++j, obj.size(), obj)); } } }
public void threadMethod() { List<T> updateList = new ArrayList(); // 初始化線程池, 參數一定要一定要一定要調好!!!! ThreadPoolExecutor threadPool = new ThreadPoolExecutor(20, 50, 4, TimeUnit.SECONDS, new ArrayBlockingQueue(10), new ThreadPoolExecutor.AbortPolicy()); // 大集合拆分成N個小集合, 這里集合的size可以稍微小一些(這里我用100剛剛好), 以保證多線程異步執行, 過大容易回到單線程 List<T> splitNList = SplitListUtils.split(totalList, 100); // 記錄單個任務的執行次數 CountDownLatch countDownLatch = new CountDownLatch(splitNList.size()); // 對拆分的集合進行批量處理, 先拆分的集合, 再多線程執行 for (List<T> singleList : splitNList) { // 線程池執行 threadPool.execute(new Thread(new Runnable(){ @Override public void run() { for (Entity yangshiwen : singleList) { // 將每一個對象進行數據封裝, 並添加到一個用於存儲更新數據的list // ...... // 任務個數 - 1, 直至為0時喚醒await() countDownLatch.countDown(); } } })); } try { // 讓當前線程處於阻塞狀態,直到鎖存器計數為零 countDownLatch.await(); } catch (InterruptedException e) { throw new BusinessLogException(ResponseEnum.FAIL); } // 通過mybatis的批量插入的方式來進行數據的插入, 這一步還是要做判空 if (GeneralUtil.listNotNull(updateList)) { batchUpdateEntity(updateList); LogUtil.info("xxxxxxxxxxxxxxx"); } }
CountDownLatch概念
CountDownLatch是一個同步工具類,用來協調多個線程之間的同步,或者說起到線程之間的通信(而不是用作互斥的作用)。
CountDownLatch能夠使一個線程在等待另外一些線程完成各自工作之后,再繼續執行。使用一個計數器進行實現。計數器初始值為線程的數量。當每一個線程完成自己任務后,計數器的值就會減一。當計數器的值為0時,表示所有的線程都已經完成一些任務,然后在CountDownLatch上等待的線程就可以恢復執行接下來的任務。
CountDownLatch的用法
CountDownLatch典型用法:1、某一線程在開始運行前等待n個線程執行完畢。將CountDownLatch的計數器初始化為new CountDownLatch(n),每當一個任務線程執行完畢,就將計數器減1 countdownLatch.countDown(),當計數器的值變為0時,在CountDownLatch上await()的線程就會被喚醒。一個典型應用場景就是啟動一個服務時,主線程需要等待多個組件加載完畢,之后再繼續執行。
CountDownLatch典型用法:2、實現多個線程開始執行任務的最大並行性。注意是並行性,不是並發,強調的是多個線程在某一時刻同時開始執行。類似於賽跑,將多個線程放到起點,等待發令槍響,然后同時開跑。做法是初始化一個共享的CountDownLatch(1),將其計算器初始化為1,多個線程在開始執行任務前首先countdownlatch.await(),當主線程調用countDown()時,計數器變為0,多個線程同時被喚醒。