Java數據庫分表與多線程查詢結果匯總


今天接到一個需求:要對一個物理分表的邏輯表進行查詢統計。而數據庫用的是公司自己研發的產品,考慮的到公司產品的特點以及業務的需求,該邏輯表是按年月進行分表的,而非分區。我們來看一下,在按時間段進行查詢統計的時候,會有哪些問題:

  1. 需要對多個表查詢,且表個數不確定
  2. 時間跨度越大,查詢等等表個數越多,對應查詢時間也會越長

如何解決?一起來看一下


分表與分區


目的

既然談到數據的分表與分區,那我們來簡單了解一下。先說一下分表與分區的目的。我們日常開發中都會經常遇到百萬或千萬級的數據大表,這些表數據量大,數據增速快,不用太久就會造成在查詢或修改數據庫數據的時候造成性能低下的問題,聯合查詢的時候,情況可能更糟。一次有必要對原來的表進行改造設計。這時候數據庫分區和分表技術就應運而生了

區別
  • 分表

    分表是將一個大表按照一定的規則分解成多張子表,而各個子表存儲空間彼此獨立。

  • 分區

    分區也是按照一定的規則進行數據划分,對各部分數據各自存儲,但在處理邏輯上,散列存放的數據還是屬於同一張大表。

    依賴於數據庫實現,對程序屏蔽,減輕程序員編程壓力


分表邏輯下的多線程查詢與數據匯總


回到文首提到的情況,當前的情況是分表,分表的划分依據是根據年月划分,一個月一張表。意味着當我們要統計跨多個隔離單位的數據進行統計時,要自己去實現的對分散在多個表中數據的查詢匯總處理。

通常表名會帶有划分依據的信息,比如按年月划分,表名格式一般為TABLE_NAME_YYYYMM


確定數據表

當前的需求是對一段時間內的數據進行統計,時間單位精確到月份。一次當我們根據服務入參拿到開始月份和結束月份后,要先得到所有涉及的月份。我們可以計算出將所有月份並保存在一個List中,方便我們查詢各個表時進行表名的拼接。代碼實現如下

/**
 * 獲取時間段內所有月份集合
 * @param beginMonth 開始年月
 * @param endMonth 結束年月
 **/
private List<String> getMonths(String beginMonth,String endMonth){

    List<String> result = new ArrayList<>();

    Date beginDate = DateUtils.getDate("yyyyMM",beginMonth);
    Date endDate = DateUtils.getDate("yyyyMM",endMonth);

    if (beginDate.after(endDate)) {
        throw new BusiException("時間入參非法");
    }

    result.add(beginMonth);

    Calendar cal = Calendar.getInstance();
    Date originalDate = beginDate;

    while (endDate.after(originalDate)) {

        cal.setTime(originalDate);
        cal.add(Calendar.MONTH, 1);
        originalDate = cal.getTime();
        result.add(DateUtils.getFormatDate(originalDate).substring(0,6));
    }

    return result;
}

確認線程個數

拿到所有月份后,進行分多線程處理的操作,增加單位時間內查詢表的個數,以此縮短查詢時間,通常我們都利用線程池來進行多線程操作。這里會涉及線程池大小的考慮問題,可以參考以下博文:計算線程池最佳線程數。我們姑且用CPU復雜型公式進行計算

int cpuNums = Runtime.getRuntime().availableProcessors() + 1;

均勻分配數據

確定好線程的大小,我們還要考慮一個問題,那就是我們如何為一個線程均勻地分配數據的處理量,在當前的需求下,就是如何均勻地為每個線程分配對應處理的月份,可以參考以下代碼:

/**
 * 平衡分組算法 - 已知分配份數
 * @param sourceList 數據源
 * @param groupNum 被非配份數
 **/
public static <T> List<T>[] spiltDataList(List<T> sourceList,int groupNum){

    List<T> [] group = new List[groupNum];
    /* 初始化數組 */
    for (int i = 0 ; i < groupNum ; i++) {
        group[i] = new ArrayList<>();
    }

    int sourceSize = sourceList.size();

    int batchNum = sourceSize % groupNum == 0 ? sourceSize / groupNum : sourceSize / groupNum + 1;

    for (int i = 1; i <= batchNum ; i++){

        if (i == batchNum){
            int finalBatchNum = sourceSize - (i - 1) * groupNum;
            for (int j = 0 ; j < finalBatchNum ; j++){
                group[j].add(sourceList.get((i - 1) * groupNum + j));
            }
        }else {
            for (int j = 0 ; j < groupNum ; j++){
                group[j].add(sourceList.get((i - 1) * groupNum + j));
            }
        }

    }
    return group;

}

多線程實現

要對所有子線程進行匯總,就必須使用Callable和Future的方式來實現多線程,我們就可以拿到每個子線程的查詢返回,進而匯總分析處理。關於多線程實現方式,可以參考Java多線程事務管理中對多線程實現方式的介紹

以下為核心代碼實現

/**
 * @param qryType 查詢類型
 * @param qryValue 查詢值
 * @param payType 扣費類型
 * @param beginMonth 開始年月
 * @param endMonth 結束年月
 **/
public List<CollInfoQueryBo> collInfoQueryByThread(
        String qryType,Long qryValue,String payType,String beginMonth,String endMonth)
throws InterruptedException,ExecutionException{

    List<CollInfoQueryBo> collInfoQueryBos = new ArrayList<>();

    List<String> months = getMonths(beginMonth,endMonth);
    int cpuNums = Runtime.getRuntime().availableProcessors() + 1;
    int totalNum = months.size();

    int threadNum;

    if (totalNum < cpuNums){
        threadNum = totalNum;
    }else {
        threadNum = cpuNums;
    }

    /* 分線程處理 */
    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(threadNum);
    CountDownLatch endLock = new CountDownLatch(threadNum);
    BlockingQueue<Future<List<CollInfoQueryBo>>> queue = new LinkedBlockingQueue<>();

    List<String>[] stringList = spiltDataList(months,threadNum);

    for (List<String> monthList : stringList) {

        Future<List<CollInfoQueryBo>> future = fixedThreadPool.submit(new Callable<List<CollInfoQueryBo>>() {
            @Override
            public List<CollInfoQueryBo> call() throws Exception {

                List<CollInfoQueryBo> collInfoQueryBoList = getAllMonthResult(monthList,qryType,qryValue);

                endLock.countDown();

                return collInfoQueryBoList;
            }
        });

        queue.add(future);
    }

    endLock.await();

    /* 匯總結果 */
    for(Future<List<CollInfoQueryBo>> future : queue) {
        List<CollInfoQueryBo> currentThreadList = future.get();
        collInfoQueryBos.addAll(currentThreadList);
    }
    fixedThreadPool.shutdown(); //關閉線程池

    return collInfoQueryBos;

}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM