項目背景:由於數據庫數據量的日益增加,查詢效率越來越慢,為增加數據查詢效率,准備將數據轉移至NOSQL,NOSQL根據公司實際情況選用了redis;
我是接手了這個項目,項目刷一次全量數據到redis用時1天半,而且系統還及其不穩定,各種bug,代碼結構,業務邏輯比較混亂,代碼質量不高。
經過對於業務的理解,鑒於項目問題太多,決定重構。
以下是原項目中存在的明顯問題:
1.框架層:dao層框架選用的是jpa
2.單線程進行redis刷新
3.業務層: 根據客戶信息循環查詢數據庫進行數據封裝,然后組裝推送redis
4.業務層:全量查詢數據,再判斷是否需要刷新redis,在去刷新
5.單條命令推送到redis
6.定時任務用的是@Scheduled(fixedDelay={}),可以保證單線程執行,但是多個定時任務會相互等待,效率低下
為了解決以上對於效率的影響的問題:
1.框架層:經過測試使用jpa一次查詢200W的數據需要的時間是12分鍾,而mybatis使用的時候是19s,完全不是一個數量級。原因在於hibernates需要將查詢結果轉換成對象然后維護到hibernate的session緩存中,
這個非常耗時的一個過程。當大數據量查詢的時候,mybatis明顯是更優的選擇。
1> 選用的tk_mybatis,為了加速開發效率,加入了generator進行逆向工程
2>mybatis的fetchSize默認值是100,當查詢的數據達到百萬級的時候,defaultFetchSize增大這個數字可以減少客戶端與oracle的往返,減少響應時間; 本人設置的是10000
2. 業務邏輯:
對於百萬級的數據,我們需要加快內存計算的速度,必然要引入多線程。因為每次需要處理批次的數量不相同,需要動態的去做任務分配,所以我們選用的是 forkjoin,分而治之的概念。
//forkjoin 示例代碼
package com.msl.cedis.service.impl; import com.common.base.utils.SpringContext; import com.msl.cedis.eo.TclientItemsUw; import com.msl.cedis.eo.TclientPolicyItemsUw; import com.msl.cedis.service.ClientService; import org.apache.commons.collections4.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Configurable; import java.util.List; import java.util.Map; import java.util.concurrent.RecursiveTask; /** * @Author tony_t_peng * @Date 2020-09-22 15:31 */ @Configurable public class SyncClientAndPolicyTask extends RecursiveTask<Integer> { //RecursiveTask RecursiveAction private final static Logger log = LoggerFactory.getLogger(ClientServiceImpl.class); private List<String> cliUwUids; private int odsProcessId; private Map<String, List<TclientItemsUw>> clientsItemsMap; private Map<String, List<TclientPolicyItemsUw>> clientsPolicyItemsMap ; private Map<String,Map<String,List<String>>> newPolicyInfoMap ; public SyncClientAndPolicyTask() { } public SyncClientAndPolicyTask(List<String> cliUwUids, Integer odsProcessId, Map<String, List<TclientItemsUw>> clientsItemsMap, Map<String, List<TclientPolicyItemsUw>> clientsPolicyItemsMap,Map<String,Map<String,List<String>>> newPolicyInfoMap) { this.cliUwUids = cliUwUids; this.odsProcessId= odsProcessId; this.clientsItemsMap=clientsItemsMap; this.clientsPolicyItemsMap=clientsPolicyItemsMap; this.newPolicyInfoMap=newPolicyInfoMap; } private ClientService clientService = SpringContext.getBean(ClientService.class); @Override protected Integer compute() { int count = 0; if (CollectionUtils.isEmpty(cliUwUids)) { return count; } if (cliUwUids.size() <= 2000) { clientService.refrushClientInfo2Redis(cliUwUids,clientsItemsMap,clientsPolicyItemsMap, newPolicyInfoMap); return cliUwUids.size(); } else { List<String> left = cliUwUids.subList(0, cliUwUids.size() / 2); List<String> right = cliUwUids.subList(cliUwUids.size() / 2, cliUwUids.size()); SyncClientAndPolicyTask leftJob = new SyncClientAndPolicyTask(left,odsProcessId,clientsItemsMap,clientsPolicyItemsMap, newPolicyInfoMap); SyncClientAndPolicyTask rightJob = new SyncClientAndPolicyTask(right,odsProcessId,clientsItemsMap,clientsPolicyItemsMap, newPolicyInfoMap); leftJob.fork(); rightJob.fork(); return leftJob.join() + rightJob.join(); } } }
//調用forkjoin,其中count 就是處理的數據量
Integer count = forkJoinPool.invoke(new SyncUweCliTask(btchNo,cliUids,cliDtlMap,casCliClmMap,casCliCvgMap,casCliPolMap,glhCliClmMap,glhCliCvgMap,glhCliPolMap));
3和4:1.用批次號控制一次循環處理的數據量,我這邊一次循環的處理量大概在200W數據以內。
2.對於一次循環需要刷新的數據用sql條件先進行過濾。減少加載到內存的數據量。
3.一次全量查詢出一個批次內可能用到的各種數據並轉換成map,然后直接交給內存運算
目的:1.減少數據加載到內存的數據量,做到查詢的結果就是需要刷新的數據 2. 批量查出一個批次所有需要的數據,業務邏輯全內存處理無sql等待。
5. 數據刷新到redis,使用管道批量刷新,減少連接獲取,資源關閉的開銷。 同時因為redis服務是單線程的,需要控制管道的命令量不要過分多,因為管道命令過多執行可能會導致redis線程阻塞,導致其他線程操作redis超時。所以需要控制管道的命令量,並且適當擴大redis的超時時間. 可以改為60s或者100秒應該足夠了
6.定時任務改為quaze,同時對於同一個任務做到單線程啟動。加上注解@DisallowConcurrentExecution
實現上面所有的功能點,項目有之前刷新redis的1天半已經可以跑到24分鍾以內,一個批次200W以內的數據,都是秒級刷新redis,基本實現了項目的效率期往