JAVA使用多線程進行數據處理


 

 

 

import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/**
 * 以下是偽代碼,要根據自己的實際邏輯進行整合
 */
@Service
public class PushProcessServiceImpl implements PushProcessService {


    private final static Logger logger = LoggerFactory.getLogger(PushProcessServiceImpl.class);

    /**
     *每個線程更新的條數
     * 這表示每次執行五千條數據的推送操作
     */
    private static final Integer LIMIT = 5000;

    /**
     * 起的線程數
     */
    private static final Integer THREAD_NUM = 5;

    /**
     * 創建線程池
     *
     * -  corePoolSize:線程核心參數選擇了5
     *
     * - maximumPoolSize:最大線程數選擇了核心線程數2倍數
     *
     * - keepAliveTime:非核心閑置線程存活時間直接置為0
     *
     * - unit:非核心線程保持存活的時間選擇了 TimeUnit.SECONDS 秒
     *
     * - workQueue:線程池等待隊列,使用 容量初始為100的 LinkedBlockingQueue阻塞隊列
     *
     * 線程池拒絕策略,采用了默認AbortPolicy:直接丟棄任務,拋出異常。
     *
     */
    ThreadPoolExecutor pool = new ThreadPoolExecutor(THREAD_NUM, THREAD_NUM * 2, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));


    /**
     * 執行推送任務
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public void pushData() throws ExecutionException, InterruptedException {
        //計數器,需要保證線程安全
        int count = 0;

        //這里從數據庫查詢出要推送數據總數,根據自己實際的來
        Integer total = pushProcessMapper.getCountByState(0);


        logger.info("未推送數據條數:{}", total);
        //計算需要循環多少輪
        int num = total / (LIMIT * THREAD_NUM) + 1;
        logger.info("要經過的輪數:{}", num);

        //統計總共推送成功的數據條數
        int totalSuccessCount = 0;
        for (int i = 0; i < num; i++) {
            //使用集合來接收線程的運行結果,防止阻塞,接收線程返回結果
            List<Future<Integer>> futureList = new ArrayList<>(32);

            //起THREAD_NUM個線程並行查詢更新庫,加鎖
            for (int j = 0; j < THREAD_NUM; j++) {
                //使用 synchronized 來保證線程安全,保證計數器的增加是有序的
                synchronized (PushProcessServiceImpl.class) {
                    int start = count * LIMIT;
                    count++;
                    /**
                     * 提交線程,用數據起始位置標識線程
                     * 這里前兩個參數start和limit參數相當於執行sql
                     *  limit start,limit
                     *
                     */

                    Future<Integer> future = pool.submit(new PushDataTask(start, LIMIT, start));
                    //先不取值,防止阻塞,放進集合
                    futureList.add(future);
                }
            }
            //統計本輪推送成功數據
            for (Future f : futureList) {
                totalSuccessCount = totalSuccessCount + (int) f.get();
            }
        }
        //把數據庫的推送標識更新為已推送(已推送!=推送成功),可以根據自己實際的來
        pushProcessMapper.updateAllState(1);

        logger.info("推送數據完成,需推送數據:{},推送成功:{}", total, totalSuccessCount);
    }

    /**
     * 推送數據線程類
     */
    class PushDataTask implements Callable<Integer> {
        int start;
        int limit;
        int threadNo;   //線程編號

        PushDataTask(int start, int limit, int threadNo) {
            this.start = start;
            this.limit = limit;
            this.threadNo = threadNo;
        }

        @Override
        public Integer call() throws Exception {
            int count = 0;
            //分頁查詢每次執行的推送的數據,查詢數據
            List<PushProcess> pushProcessList = pushProcessMapper.findPushRecordsByStateLimit(0, start, limit);
            if (CollectionUtils.isEmpty(pushProcessList)) {
                return count;
            }
            logger.info("線程{}開始推送數據", threadNo);

            /**
             * 遍歷需要更新的數據實體類
             */
            for (PushProcess process : pushProcessList) {
                //這里是執行推送請求,根據自己實際的來,也可以要處理的任務
                boolean isSuccess = pushUtil.sendRecord(process);

                //根據主鍵更新推送是否成功狀態標識
                if (isSuccess) {
                    //推送成功
                    pushProcessMapper.updateFlagById(process.getId(), 1);
                    count++;
                } else {
                    //推送失敗
                    pushProcessMapper.updateFlagById(process.getId(), 2);
                }
            }
            logger.info("線程{}推送成功{}條", threadNo, count);
            return count;
        }
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM