使用MongoDB的開發人員應該都聽說過孤兒文檔(orphaned document)這回事兒,可謂聞着沉默,遇者流淚。本文基於MongoDB3.0來看看怎么產生一個orphaned document,要求MongoDB的運行方式需要是sharded cluster,如果對這一部分還不是很了解,可以參考一下這篇文章。
在MongoDB的官方文檔中,對orphaned document的描述非常簡單:
In a sharded cluster, orphaned documents are those documents on a shard that also exist in chunks on other shards as a result of failed migrations or incomplete migration cleanup due to abnormal shutdown. Delete orphaned documents using
cleanupOrphaned
to reclaim disk space and reduce confusion
可以看到,orphaned document是指在sharded cluster環境下,一些同時存在於不同shard上的document。我們知道,在mongodb sharded cluster中,分布在不同shard的數據子集是正交的,即理論上一個document只能出現在一個shard上,document與shard的映射關系維護在config server中。官方文檔指出了可能產生orphaned document的情況:在chunk遷移的過程中,mongod實例異常宕機,導致遷移過程失敗或者部分完成。文檔中還指出,可以使用 cleanupOrphaned
來刪除orphaned document。
新聞報道災難、事故的時候,一般都有這么一個潛規則:內容越短,事情也嚴重。不知道MongoDB對於orphaned document是不是也采用了這個套路,一來對orphaned document發生的可能原因描述不夠詳盡,二來也沒有提供檢測是否存在orphaned document的方法。對於cleanupOrphaned,要在生產環境使用也需要一定的勇氣。
orphaned document產生原因
作為一個沒有看過MongoDB源碼的普通應用開發人員,拍腦袋想想,chuck的遷移應該有以下三個步驟:將數據從源shard拷貝到目標shard,更新config server中的metadata,從源shard刪除數據。當然,這三個步驟的順序不一定是上面的順序。這三個步驟,如果能保證原子性,那么理論上是不會出問題的。但是,orphaned document具體怎么出現的一直不是很清楚。
前些天在瀏覽官方文檔的時候,發現有對遷移過程描述(chunk-migration-procedure),大致過程翻譯如下:
- balancer向源shard發送moveChunk命令
- 源shard內部執行moveChunk命令,並保證在遷移的過程中,新插入的document還是寫入源shard
- 如果需要的話,目標shard創建需要的索引
- 目標shard從源shard請求數據;注意,這里是一個copy操作,而不是move操作
- 在接收完chunk的最后一個文檔后,目標shard啟動一個同步拷貝進程,保證拷貝到在遷移過程中又寫入源shard上的相關文檔
- 完全同步之后,目標shard向config server報告新的metadata(chunk的新位置信息)
- 在上一步完成之后,源shard開始刪除舊的document
如果能保證以上操作的原子性,在任何步驟出問題應該都沒問題;如果不能保證,那么在第4,5,6,7步出現機器宕機,都有可能出問題。對於出問題的原因,官網(chunk-migration-queuing )是這么解釋的:
the balancer does not wait for the current migration’s delete phase to complete before starting the next chunk migration
This queuing behavior allows shards to unload chunks more quickly in cases of heavily imbalanced cluster, such as when performing initial data loads without pre-splitting and when adding new shards.If multiple delete phases are queued but not yet complete, a crash of the replica set’s primary can orphan data from multiple migrations.
簡而言之,為了加速chunk 遷移的速度(比如在新的shard加入的時候,有大量的chunk遷移),因此delete phase(第7步)不會立刻執行,而是放入一個隊列,異步執行,此時如果crash,就可能產生孤兒文檔
產生一個orphaned document
基於官方文檔,如何產生一個orphaned document呢? 我的想法很簡單:監控MongoDB日志,在出現標志遷移過程的日志出現的時候,kill掉shard中的primary!
預備知識
在《通過一步步創建sharded cluster來認識mongodb》一文中,我詳細介紹了如何搭建一個sharded cluster,在我的實例中,使用了兩個shard,其中每個shard包括一個primary、一個secondary,一個arbiter。另外,創建了一個允許sharding的db -- test_db, 然后sharded_col這個集合使用_id分片,本文基於這個sharded cluster進行實驗。但需要注意的是:在前文中,為了節省磁盤空間,我禁用了mongod實例的journal機制(啟動選項中的 --nojourbal),但在本文中,為了盡量符合真實情況,在啟動mongod的時候使用了--journal來啟用journal機制。
另外,再補充兩點,第一個是chunk遷移的條件,只有當shards之間chunk的數目差異達到一定程度才會發生遷移:
Number of Chunks | Migration Threshold |
Fewer than 20 | 2 |
20-79 | 4 |
80 and greater | 8 |
第二個是,如果沒有在document中包含_id,那么mongodb會自動添加這個字段,其value是一個ObjectId,ObjectId由一下部分組成:
- a 4-byte value representing the seconds since the Unix epoch,
- a 3-byte machine identifier,
- a 2-byte process id, and
- a 3-byte counter, starting with a random value.
准備
首先,得知道chunk遷移的時候日志是什么樣子的,因此我用python腳本插入了一些記錄,通過sh.status()發現有chunk分裂、遷移的時候去查看mongodb日志,在rs1(sharded_col這個集合的primary shard)的primary(rs1_1.log)里面發現了如下的輸出:
34 2017-07-06T21:43:21.629+0800 I NETWORK [conn6] starting new replica set monitor for replica set rs2 with seeds 127.0.0.1:27021,127.0.0.1:27022 48 2017-07-06T21:43:23.685+0800 I SHARDING [conn6] moveChunk data transfer progress: { active: true, ns: "test_db.sharded_col", from: "rs1/127.0.0.1:27018,127.0.0.1:27019" , min: { _id: ObjectId('595e3e74d71ffd5c7be8c8b7') }, max: { _id: MaxKey }, shardKeyPattern: { _id: 1.0 }, state: "steady", counts: { cloned: 1, clonedBytes: 83944, cat chup: 0, steady: 0 }, ok: 1.0, $gleStats: { lastOpTime: Timestamp 0|0, electionId: ObjectId('595e3b0ff70a0e5c3d75d684') } } my mem used: 0 52 -017-07-06T21:43:23.977+0800 I SHARDING [conn6] moveChunk migrate commit accepted by TO-shard: { active: false, ns: "test_db.sharded_col", from: "rs1/127.0.0.1:27018,12 7.0.0.1:27019", min: { _id: ObjectId('595e3e74d71ffd5c7be8c8b7') }, max: { _id: MaxKey }, shardKeyPattern: { _id: 1.0 }, state: "done", counts: { cloned: 1, clonedBytes : 83944, catchup: 0, steady: 0 }, ok: 1.0, $gleStats: { lastOpTime: Timestamp 0|0, electionId: ObjectId('595e3b0ff70a0e5c3d75d684') } } 53 w017-07-06T21:43:23.977+0800 I SHARDING [conn6] moveChunk updating self version to: 3|1||590a8d4cd2575f23f5d0c9f3 through { _id: ObjectId('5937e11f48e2c04f793b1242') } -> { _id: ObjectId('595b829fd71ffd546f9e5b05') } for collection 'test_db.sharded_col' 54 2017-07-06T21:43:23.977+0800 I NETWORK [conn6] SyncClusterConnection connecting to [127.0.0.1:40000] 55 2017-07-06T21:43:23.978+0800 I NETWORK [conn6] SyncClusterConnection connecting to [127.0.0.1:40001] 56 2017-07-06T21:43:23.978+0800 I NETWORK [conn6] SyncClusterConnection connecting to [127.0.0.1:40002] 57 2017-07-06T21:43:24.413+0800 I SHARDING [conn6] about to log metadata event: { _id: "xxx-2017-07-06T13:43:24-595e3e7c0db0d72b7244e620", server: "xxx", clientAddr: "127.0.0.1:52312", time: new Date(1499348604413), what: "moveChunk.commit", ns: "test_db.sharded_col", details: { min: { _id: ObjectId( '595e3e74d71ffd5c7be8c8b7') }, max: { _id: MaxKey }, from: "rs1", to: "rs2", cloned: 1, clonedBytes: 83944, catchup: 0, steady: 0 } } 58 2017-07-06T21:43:24.417+0800 I SHARDING [conn6] MigrateFromStatus::done About to acquire global lock to exit critical section 59 2017-07-06T21:43:24.417+0800 I SHARDING [conn6] forking for cleanup of chunk data 60 2017-07-06T21:43:24.417+0800 I SHARDING [conn6] MigrateFromStatus::done About to acquire global lock to exit critical section 61 2017-07-06T21:43:24.417+0800 I SHARDING [RangeDeleter] Deleter starting delete for: test_db.sharded_col from { _id: ObjectId('595e3e74d71ffd5c7be8c8b7') } -> { _id: MaxKey }, with opId: 6 62 2017-07-06T21:43:24.417+0800 I SHARDING [RangeDeleter] rangeDeleter deleted 1 documents for test_db.sharded_col from { _id: ObjectId('595e3e74d71ffd5c7be8c8b7') } -> { _id: MaxKey }
上面第59行,“forking for cleanup of chunk data”,看起來是准備刪除舊的數據了
check_loop() { echo 'checking' ret=`grep -c 'forking for cleanup of chunk data' /home/mongo_db/log/rs1_1.log` if [ $ret -gt 0 ]; then echo "will kill rs1 primary" kill -s 9 `ps aux | grep rs1_1 | awk '{print $2}'` exit 0 fi ret=`grep -c 'forking for cleanup of chunk data' /home/mongo_db/log/rs2_1.log` if [ $ret -gt 0 ]; then echo "will kill rs2 primary" kill -s 9 `ps aux | grep rs2_1 | awk '{print $2}'` exit 0 fi sleep 0.1 check_loop } check_loop
第一次嘗試
第一次嘗試就是使用的上面的腳本。
首先運行上面的shell腳本,然后另起一個終端開始插入數據,在shell腳本kill掉進程之后,立即登上rs1和rs2查看統計數據,發現並沒有產生orphaned document(怎么檢測看第二次嘗試)
再回看前面的日志,幾乎是出現“forking for cleanup of chunk data”的同一時刻就出現了“rangeDeleter deleted 1 documents for test_db.sharded_col from”,后者表明數據已經被刪除。而shell腳本0.1s才檢查一次,很可能在遷移過程已經完成之后才發出kill信號。於是將kill的時機提前,在shell腳本中檢查“moveChunk migrate commit accepted”(上述文檔中的第52行)
對shell腳本的修改也很簡單,替換一下grep的內容:

check_loop() { echo 'checking' ret=`grep -c 'moveChunk migrate commit accepted' /home/mongo_db/log/rs1_1.log` if [ $ret -gt 0 ]; then echo "will kill rs1 primary" kill -s 9 `ps aux | grep rs1_1 | awk '{print $2}'` exit 0 fi ret=`grep -c 'moveChunk migrate commit accepted' /home/mongo_db/log/rs2_1.log` if [ $ret -gt 0 ]; then echo "will kill rs2 primary" kill -s 9 `ps aux | grep rs2_1 | awk '{print $2}'` exit 0 fi sleep 0.1 check_loop } check_loop
第二次嘗試
在進行第二次嘗試之前,清空了sharded_col中的記錄,一遍更快產生chunk遷移。
重復之間的步驟:啟動shell腳本,然后插入數據,等待shell腳本kill掉進程后終止
很快,shell腳本就終止了,通過ps aux | grep mongo 也證實了rs1_1被kill掉了,登錄到mongos(mongo --port 27017)
此時,重新啟動rs1_1, 通過在rs.status()查看rs1這個shard正常之后,重新查看sh.status(),發現結果還是一樣的。據此推斷,並沒有journal信息恢復被終止的遷移過程。
使用cleanupOrphaned
orphaned document的影響在於某些查詢會多出一些記錄:多出這些孤兒文檔,比如前面的count操作,事實上只有3條記錄,但返回的是4條記錄。如果查詢的時候沒用使用sharding key(這里的_id)精確匹配,也會返回多余的記錄,另外,即使使用了sharding key,但是如果使用了$in,或者范圍查詢,都可能出錯。比如:
上面第二條查詢語句,使用了$in,理論上應該返回兩條記錄,單因為孤兒文檔,返回了三條。
本質上,如果一個操作要路由到多個shard,存在orphaned document的情況下都可能出錯。這讓應用開發人員防不勝防,也不可能在邏輯里面兼容孤兒文檔這種異常情況。
cleanupOrphaned
,那我們來試試看,按照官方的文檔(
remove-all-orphaned-documents-from-a-shard),刪除所有的orphaned document。注意,cleanupOrphaned要在shard的primary上執行;
但是在哪一個shard上執行呢,是“正確的shard“,還是“錯誤的shard”呢,文檔里面並沒有寫清楚,我想在兩個shard上都執行一下應該沒問題吧。
不管在rs1, 還是rs2上執行,結果都是一樣的:
errmsg:server is not part of a sharded cluster or the sharding metadata is not yet initialized.
通過sh.status()查看:
顯然,rs1、rs2都是sharded cluster的一部分,那么可能的情況就是“sharding metadata is not yet initialized”
關於這個錯誤,在https://groups.google.com/forum/#!msg/mongodb-user/cRr7SbE1xlU/VytVnX-ffp8J 這個討論中指出了同樣的問題,但似乎沒有完美的解決辦法。至少,對於討論中提到的方法,我都嘗試過,但都沒作用。
總結
本文只是嘗試復現孤兒文檔,當然我相信也更多的方式可以發現孤兒文檔。另外,按照上面的日志,選擇在不同的時機kill掉shard(replica set)的primary應該有不同的效果,也就是在遷移過程中的不同階段終止遷移過程。另外,通過實驗,發現cleanupOrphaned指令並沒有想象中好使,對於某些情況下產生的孤兒文檔,並不一定能清理掉。當然,也有可能是我的姿勢不對,歡迎讀者親自實驗。