1. 跑批是什么
顧名思義,就是應用程序對數據的批量處理。
跑批有以下特性:
- 大數據量:批量任務一般伴隨着大量的數據處理;
- 自動化:要求制定時間或頻率自動運行;
- 性能:要求在指定時間內完成批處理任務。
2. 跑批應用場景
在開發中常見的跑批應用場景如下(以目前做的系統舉例):
- 定時的數據狀態更新:到期失效
- 數據的計算:計算罰息、計提
- 文件處理:與其他應用系統同步,如還款計划同步
- 生成文件:對賬、提供同步文件
在跑批開發中有一些存有潛在隱患的處理方式需要指出來:
- 對文件內或數據庫數據一次性讀取、查詢加載到內存
- 基於要處理的數據進行循環,循環內部操作數據庫
- 對數據逐條處理
- 事務范圍基於整個跑批管理
- 有關聯的跑批任務人為設定定時執行時間
3. 優化思考
當前的資源現狀:
- 服務器資源利用率低,生產環境按照集群部署,跑批觸發只會調度到其中一台
- CPU利用率低,被調度到的機器單線程運行整個任務
- 任務執行間隔時間內服務器資源空閑
批處理的核心思想:
分片:
- 對交易數據進行拆分成多個小片,且每一片都能夠快速定位
- 任何數據只要有順序,就能夠分片
並發:
- 調度服務器層面使用廣播策略,充分利用服務器資源
- 單個服務器內部通過線程池進行並發處理多片數據,充分利用CPU資源
- 純運算型可通過 lambda並行流運算(前提是基礎數據量足夠多)
任務流:
- 前后關系緊密任務組成任務流,避免間隔時間資源空閑
- 對任務拆分為任務組件,不同任務支持不同的路由策略
4. 數據分片分析
4.1 數據准備
如下圖,准備了200萬的數據:
4.2 利用limit
測試最簡單的分片邏輯——limit的執行效率:
上面四種limit的耗時統計如下:
可以看出,隨着起始記錄的增加,查詢時間也隨着增大, 這說明分頁語句limit跟起始頁碼是有很大關系的,所以limit 對記錄很多的表並不適合直接使用。
4.3 分片優化——id>=形式
利用索引的排序特性與葉子節點鏈表鏈接特性以及快速定位數據所在位置的二分查找法,對分片進行優化。
下面是加條件查詢:
對全表查詢,速度也有很大提升:
將兩次查詢進行比對:
很明顯,當我們使用id進行范圍查詢的時候,查詢效率趨於平穩且快速,那么只需要確定每一片需要使用的查詢條件的id值與 business_date值,業務場景上一般按照業務日期進行跑批,日期是已知確定的,id可以通過lmit1000,1獲取。
以此類推,10萬數據按照1000條一片需要進行100次分片條件查詢,預計平均耗時在3秒以內可以得到一組分片條件數據。
4.4 分片再優化——覆蓋索引
針對覆蓋索引的知識,對分片SQL進一步優化,對於 business date與id本身已經是聯合索引,而分片的條件只需要返回id。
如上,為了驗證覆蓋索引效果,對查詢執行了N次,提取出來各自執行能達到的最短時間,最小值提升了2毫秒。
通過explain,僅查詢id字段是用到了覆蓋索引。
5. 分片結果的高效使用
系統中的批量任務是基於開源項目xxl-job進行調度執行的,項目詳見作者博客 許雪里 。
在該項目落地過程中,有些問題浮現:
- 前面已經將數據完成分片,這個時候分片結果我們需要有個地方進行存儲
- 需要注意的是分片是單機調度執行的,那么分片后,我們需要調度所有能夠執行任務的機器,基於xxl-job調度框架,我們需要配置廣播策略
- 每個機器節點基於數據存儲的地方獲取分配給自己的任務:xxl-job會通知到任務機器當前是第幾個節點,基於這個值對分片數取模可以得到任務
- 每個節點可以獨立起線程池將任務列表用多線程並發執行
- 系統中有好多模塊要執行任務,是否要重復寫相同的處理邏輯
所以基於上面的思路,為避免各個子系統對於分片存儲和廣播通知后多線程執行邏輯重復造輪子,對xxl-job進行了適配改造,具體改造點:
- 在分片任務執行完成后返回分片結果有調度器進行保存
- 保存的分片結果自動分配給各節點
- 在任務執行器中,會自動進行多線程調度
- 基於任務的作業圖:可以將分片任務組件與任務執行組件進行繪圖配置前后執行關系,並且不同仼務組件配置不同的路由策略,比如分片任務隨機路由,任務執行配廣播策略
- 針對不同的任務間執行順序管理,通過作圖方式繪制出任務流
6. MySQL數據庫批量操作
當業務設計到數據庫操作時,相比較於單條的新增或更新,批量的執行效率更高,所以涉及到批量的業務,能夠使用批量就盡量使用批量。
批量插入:
insert into table_name(column1,column2,column3) values ('column11','column12','column13'),('column21','column22','column23')···
批量更新:
update table_name set column_name = 'column1' where column_name2 = 'column2';update table_name set column_name = 'column3' where column_name2 = 'column4';···
需要在數據庫連接上開啟批量操作:rewriteBatchedStatements=true&allowMultiQueries=true
master.jdbc.url=jdbc:mysql://127.0.0.1:3306/batch_test?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true&allowMultiQueries=true
7. 讀取文件優化
對文件進行分片,文件中有一個元素,天然是有序的:行號。且按照行進行分片能夠保證一片內數據完整性。
可以使用Java IO提供的RandomAccessFile類來進行文件的解析,主要基於以下三個方法:
分片思路:
- 開始對文件進行分片,初始指針位置為0,假設每1000行為一片,文件頭為第一行,先讀取一行
- 第一片記為第2行頭部指針位置,從此位置開始讀1000行
- 第二片記為第1002行頭部指針位置,從此位置開始讀1000行
- 依次類推,整個文件全部分片完成
- 各個分片被執行的時候只需要通過指針位置可以直接定位到數據點開始讀取1000行
此外,Java NIO類SeekableByteChannel同樣支持隨機訪問,只是沒有整行讀取功能,需要識別字節中是否有換行符,性能上強於RandomAccessFile。
8. 生成文件優化
優化思路:
- 每個服務器並發多線程生成單獨的子文件
- 每個服務器本地文件進行合並為該服務器處理部分數據子文件
- 每個服務器對合並后的文件上傳文件服務器匯聚,文件名可以帶上服務器的獨特標示
- 調度某個服務器獨立進行文件服務器上文件合並動作,補充需要的文件內容,如頭信息、數據量,壓縮、加密處理等