如何將幾千萬級數據刷新到redis的系統優化到秒級


項目背景:由於數據庫數據量的日益增加,查詢效率越來越慢,為增加數據查詢效率,准備將數據轉移至NOSQL,NOSQL根據公司實際情況選用了redis;

我是接手了這個項目,項目刷一次全量數據到redis用時1天半,而且系統還及其不穩定,各種bug,代碼結構,業務邏輯比較混亂,代碼質量不高。

經過對於業務的理解,鑒於項目問題太多,決定重構。

以下是原項目中存在的明顯問題:

1.框架層:dao層框架選用的是jpa 

2.單線程進行redis刷新

3.業務層: 根據客戶信息循環查詢數據庫進行數據封裝,然后組裝推送redis

4.業務層:全量查詢數據,再判斷是否需要刷新redis,在去刷新

5.單條命令推送到redis

6.定時任務用的是@Scheduled(fixedDelay={}),可以保證單線程執行,但是多個定時任務會相互等待,效率低下

 

為了解決以上對於效率的影響的問題:

1.框架層:經過測試使用jpa一次查詢200W的數據需要的時間是12分鍾,而mybatis使用的時候是19s,完全不是一個數量級。原因在於hibernates需要將查詢結果轉換成對象然后維護到hibernate的session緩存中,

這個非常耗時的一個過程。當大數據量查詢的時候,mybatis明顯是更優的選擇。

       1> 選用的tk_mybatis,為了加速開發效率,加入了generator進行逆向工程

  2>mybatis的fetchSize默認值是100,當查詢的數據達到百萬級的時候,defaultFetchSize增大這個數字可以減少客戶端與oracle的往返,減少響應時間; 本人設置的是10000

2. 業務邏輯:

     對於百萬級的數據,我們需要加快內存計算的速度,必然要引入多線程。因為每次需要處理批次的數量不相同,需要動態的去做任務分配,所以我們選用的是 forkjoin,分而治之的概念。

 //forkjoin 示例代碼

package com.msl.cedis.service.impl;

import com.common.base.utils.SpringContext;
import com.msl.cedis.eo.TclientItemsUw;
import com.msl.cedis.eo.TclientPolicyItemsUw;
import com.msl.cedis.service.ClientService;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Configurable;

import java.util.List;
import java.util.Map;
import java.util.concurrent.RecursiveTask;

/**
 * @Author tony_t_peng
 * @Date  2020-09-22 15:31
 */
@Configurable
public class SyncClientAndPolicyTask extends RecursiveTask<Integer> { //RecursiveTask RecursiveAction

    private final static Logger log = LoggerFactory.getLogger(ClientServiceImpl.class);


    private List<String> cliUwUids;
    private int odsProcessId;
    private Map<String, List<TclientItemsUw>> clientsItemsMap;
    private Map<String, List<TclientPolicyItemsUw>> clientsPolicyItemsMap ;
    private Map<String,Map<String,List<String>>> newPolicyInfoMap ;


    public SyncClientAndPolicyTask() {
    }

    public SyncClientAndPolicyTask(List<String> cliUwUids, Integer odsProcessId, Map<String, List<TclientItemsUw>> clientsItemsMap, Map<String, List<TclientPolicyItemsUw>> clientsPolicyItemsMap,Map<String,Map<String,List<String>>> newPolicyInfoMap) {
        this.cliUwUids = cliUwUids;
        this.odsProcessId= odsProcessId;
        this.clientsItemsMap=clientsItemsMap;
        this.clientsPolicyItemsMap=clientsPolicyItemsMap;
        this.newPolicyInfoMap=newPolicyInfoMap;
    }

    private ClientService clientService = SpringContext.getBean(ClientService.class);

    @Override
    protected Integer compute() {
        int count = 0;
        if (CollectionUtils.isEmpty(cliUwUids)) {
            return count;
        }
        if (cliUwUids.size() <= 2000) {
            clientService.refrushClientInfo2Redis(cliUwUids,clientsItemsMap,clientsPolicyItemsMap, newPolicyInfoMap);
            return cliUwUids.size();
        } else {
            List<String> left = cliUwUids.subList(0, cliUwUids.size() / 2);
            List<String> right = cliUwUids.subList(cliUwUids.size() / 2, cliUwUids.size());
            SyncClientAndPolicyTask leftJob = new SyncClientAndPolicyTask(left,odsProcessId,clientsItemsMap,clientsPolicyItemsMap, newPolicyInfoMap);
            SyncClientAndPolicyTask rightJob = new SyncClientAndPolicyTask(right,odsProcessId,clientsItemsMap,clientsPolicyItemsMap, newPolicyInfoMap);
            leftJob.fork();
            rightJob.fork();
            return leftJob.join() + rightJob.join();
        }
    }
}

  

  //調用forkjoin,其中count 就是處理的數據量

Integer count = forkJoinPool.invoke(new SyncUweCliTask(btchNo,cliUids,cliDtlMap,casCliClmMap,casCliCvgMap,casCliPolMap,glhCliClmMap,glhCliCvgMap,glhCliPolMap));

3和4:1.用批次號控制一次循環處理的數據量,我這邊一次循環的處理量大概在200W數據以內。

  2.對於一次循環需要刷新的數據用sql條件先進行過濾。減少加載到內存的數據量。

  3.一次全量查詢出一個批次內可能用到的各種數據並轉換成map,然后直接交給內存運算

    目的:1.減少數據加載到內存的數據量,做到查詢的結果就是需要刷新的數據 2. 批量查出一個批次所有需要的數據,業務邏輯全內存處理無sql等待。

 

5. 數據刷新到redis,使用管道批量刷新,減少連接獲取,資源關閉的開銷。 同時因為redis服務是單線程的,需要控制管道的命令量不要過分多,因為管道命令過多執行可能會導致redis線程阻塞,導致其他線程操作redis超時。所以需要控制管道的命令量,並且適當擴大redis的超時時間.    可以改為60s或者100秒應該足夠了

 

6.定時任務改為quaze,同時對於同一個任務做到單線程啟動。加上注解@DisallowConcurrentExecution

 

實現上面所有的功能點,項目有之前刷新redis的1天半已經可以跑到24分鍾以內,一個批次200W以內的數據,都是秒級刷新redis,基本實現了項目的效率期往

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM