現場填坑系列:使用bulk操作提高性能,解決mongoshake 向ES同步延遲。


接到現場報告,MongoDB向ES同步數據延遲越來越大,有的已經超過10個小時,造成客戶新加入的用戶無法被搜索出來。由於在系統中ES類似於數倉,很多統計和第三方接系統都需要從ES獲取數據,所以也影響了一些其他依賴ES數據的功能和業務。

架構簡圖

tomcat------日志數據----->logstash-------日志數據--->|      E  S

mongodb---業務數據--->mongoshake---業務數據 -->|      集群

日志通過logstash同步到ES,業務數據通過mongoshake自己實現的ES推送組件推送到ES。ES 5台構成一個集群。

問題現象

  1. 延遲只發生在第二條通路,第一條通路雖然也有較大並發,但不發生延遲;
  2. 且延遲在業務峰期才會出現,非高峰期延遲不嚴重或沒有延遲;
  3. 通過上面的描述可以確定此延遲於業務相關,且與mongoshake推送的實現方式相關;

初步分析

這個結構在兩個異地機房各有一套,中間通過同步機制同步。由於業務數據是從本地mongodb同步到本地ES發生延遲,首先排除是雙機房間傳輸帶寬問題。

會不會是ES在業務高峰期負載過高,造成推送延遲:可能性有,但是較低,因為如果是ES查詢並不慢,且如果是由此造成,logstash 推送也應該受到影響。對ES的性能監控也可以印證這個問題。

會不會是兩種數據業務的不同造成的性能差異:logstash的數據主要是日志,插入為主,mongoshake的數據是業務數據,涉及到對以前的數據進行修改:確實有可能,但比較起來延遲不應該有如此之大,一個秒級延遲,一個天級延遲,還是首先考慮實現機制問題。

同步機制

mongoshake向ES同步機制,是將需要在ES存放的幾張表的oplog在ES回放,此程序由我們的開發人員擴展的mongoshake ES組件完成。

oplog ----- mongoshake----- oplog replay----->ES

聯合高峰期延遲增加的現象,可以猜測高峰期業務數據操作造成的oplog有大量增加,由於mongoshake本身(除非修改源碼)只能篩選表,不能篩選哪些表的具體日志,只要是這幾張包的oplog都會同步,所以造成延遲。到底是oplog過多需要篩選,還是同步能力太低需要改進,我們需要進一步查證。

推送能力統計

現場人員查看了一段時間的同步量,對現有機制的oplog處理及回放能力進行統計。

第一次統計:130秒同步 2186

第二次統計:180秒同步 2714

可見平均每秒能力不足20條日志,肯定是過低。那么客戶現場實際業務每秒到底要產生多少條數據?這個問題要查清楚,作為推送性能優化的底線。

實際業務oplog情況分析

對客戶業務能力進行統計,需要將一整天的oplog導出,oplog由於沒有索引,雖然可以直接通過find,並給出ts 查詢,但由於有40G數據,查詢及其緩慢 。所以我們選擇將數據導出到文本文件,進行分析

start=$(date +%s -d "2020-03-24 08:00:00")
end=$(date +%s -d "2020-03-24 10:00:00")
mongodump -h localhost -d local -c oplog.rs -q '{"ts":{$gt:Timestamp('$start',1),$lt:Timestamp('$end',1)}}' -o /home/backup
cd /home/backup/local
bsondump oplog.rs.bson > oplog.rs.txt

進一步對oplog進行分析,在vim中分別統計每個小時的日志數量,可以得到下表:橫軸是北京時間24個小時,縱軸是oplog數量,其中灰色是oplog中需要同步到ES的

可以看出高峰期oplog大量增長,需要同步的日志超過150000,平均每秒42條oplog需要同步。而處理能力不到20,所以高峰期一個小時的數據往往需要2-3個小時才能同步完成,且從8點開始,一直都下午18點,實際oplog產生都超過處理能力。

往下的優化方向,一個是減少日志,一個是增加處理能力,高峰期每秒42條日志已經不高,雖然可以優化,但可優化范圍有限。增加處理能力才是關鍵。

開發查看ES同步代碼,原有代碼使用逐條同步模式,同步一條,獲取一條,同時采用性能較低的腳本同步方式,現在使用批量處理(實現參照mongoshake 向mongodb同步的 direct writer實現。批量調用elastic Client 提供的bulk api進行操作)

bulkRequest := bw.client.Bulk()
	
for _, log := range oplogs { ...... bulkRequest.Add(elastic.NewBulkDeleteRequest().Index(index).Type(doc_type).Id(id)) } bulkResponse, err := bulkRequest.Do(context.Background()) 

對update 中的unset(刪除字段) 進行處理,因為unset執行有速度有瓶頸,所以根據實際情況直接改為將字段置空;

for _, v := range unsets { doc[v] = nil } 

效果

在進行上述操作后,單線程情況下數據處理的速度超過每秒1000條(未嚴格測試),新同步代碼幾分鍾就能同步一個小時的oplog,完全達到性能要求。

討論

oplog優化

由於ES同步性能大幅提高,所以可以不用繼續優化oplog,但是oplog可以反映關鍵業務對數據庫的訪問情況,特別是寫入,在mongodb replica set中只能在primary 節點完成,即使增加節點也無法分擔流量,所以對oplog的進一步分析依然必要。同時oplog中包含數量的大小,也對replicaset 的同步帶寬有影響,特別是出現跨機房同步的情況時。

所以更進一步,我們還對業務oplog進行了分析,此分析我們會在別的文章中討論。

多線程執行

注意這里ES同步沒有使用多線程處理,主要是考慮業務數據多線程操作的事務性。要實現此種事務,需要對mongodb本身進行一定改造。對mongodb源碼改造實現雙向同步和多線程寫入會在其他文章中討論。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM