import org.apache.commons.collections.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; /** * 以下是偽代碼,要根據自己的實際邏輯進行整合 */ @Service public class PushProcessServiceImpl implements PushProcessService { private final static Logger logger = LoggerFactory.getLogger(PushProcessServiceImpl.class); /** *每個線程更新的條數 * 這表示每次執行五千條數據的推送操作 */ private static final Integer LIMIT = 5000; /** * 起的線程數 */ private static final Integer THREAD_NUM = 5; /** * 創建線程池 * * - corePoolSize:線程核心參數選擇了5 * * - maximumPoolSize:最大線程數選擇了核心線程數2倍數 * * - keepAliveTime:非核心閑置線程存活時間直接置為0 * * - unit:非核心線程保持存活的時間選擇了 TimeUnit.SECONDS 秒 * * - workQueue:線程池等待隊列,使用 容量初始為100的 LinkedBlockingQueue阻塞隊列 * * 線程池拒絕策略,采用了默認AbortPolicy:直接丟棄任務,拋出異常。 * */ ThreadPoolExecutor pool = new ThreadPoolExecutor(THREAD_NUM, THREAD_NUM * 2, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100)); /** * 執行推送任務 * @throws ExecutionException * @throws InterruptedException */ public void pushData() throws ExecutionException, InterruptedException { //計數器,需要保證線程安全 int count = 0; //這里從數據庫查詢出要推送數據總數,根據自己實際的來 Integer total = pushProcessMapper.getCountByState(0); logger.info("未推送數據條數:{}", total); //計算需要循環多少輪 int num = total / (LIMIT * THREAD_NUM) + 1; logger.info("要經過的輪數:{}", num); //統計總共推送成功的數據條數 int totalSuccessCount = 0; for (int i = 0; i < num; i++) { //使用集合來接收線程的運行結果,防止阻塞,接收線程返回結果 List<Future<Integer>> futureList = new ArrayList<>(32); //起THREAD_NUM個線程並行查詢更新庫,加鎖 for (int j = 0; j < THREAD_NUM; j++) { //使用 synchronized 來保證線程安全,保證計數器的增加是有序的 synchronized (PushProcessServiceImpl.class) { int start = count * LIMIT; count++; /** * 提交線程,用數據起始位置標識線程 * 這里前兩個參數start和limit參數相當於執行sql * limit start,limit * */ Future<Integer> future = pool.submit(new PushDataTask(start, LIMIT, start)); //先不取值,防止阻塞,放進集合 futureList.add(future); } } //統計本輪推送成功數據 for (Future f : futureList) { totalSuccessCount = totalSuccessCount + (int) f.get(); } } //把數據庫的推送標識更新為已推送(已推送!=推送成功),可以根據自己實際的來 pushProcessMapper.updateAllState(1); logger.info("推送數據完成,需推送數據:{},推送成功:{}", total, totalSuccessCount); } /** * 推送數據線程類 */ class PushDataTask implements Callable<Integer> { int start; int limit; int threadNo; //線程編號 PushDataTask(int start, int limit, int threadNo) { this.start = start; this.limit = limit; this.threadNo = threadNo; } @Override public Integer call() throws Exception { int count = 0; //分頁查詢每次執行的推送的數據,查詢數據 List<PushProcess> pushProcessList = pushProcessMapper.findPushRecordsByStateLimit(0, start, limit); if (CollectionUtils.isEmpty(pushProcessList)) { return count; } logger.info("線程{}開始推送數據", threadNo); /** * 遍歷需要更新的數據實體類 */ for (PushProcess process : pushProcessList) { //這里是執行推送請求,根據自己實際的來,也可以要處理的任務 boolean isSuccess = pushUtil.sendRecord(process); //根據主鍵更新推送是否成功狀態標識 if (isSuccess) { //推送成功 pushProcessMapper.updateFlagById(process.getId(), 1); count++; } else { //推送失敗 pushProcessMapper.updateFlagById(process.getId(), 2); } } logger.info("線程{}推送成功{}條", threadNo, count); return count; } } }