孤兒文檔是怎樣產生的(MongoDB orphaned document)


  使用MongoDB的開發人員應該都聽說過孤兒文檔(orphaned document)這回事兒,可謂聞着沉默,遇者流淚。本文基於MongoDB3.0來看看怎么產生一個orphaned document,要求MongoDB的運行方式需要是sharded cluster,如果對這一部分還不是很了解,可以參考一下這篇文章

  在MongoDB的官方文檔中,對orphaned document的描述非常簡單:

  In a sharded cluster, orphaned documents are those documents on a shard that also exist in chunks on other shards as a result of failed migrations or incomplete migration cleanup due to abnormal shutdown. Delete orphaned documents using cleanupOrphaned to reclaim disk space and reduce confusion

  可以看到,orphaned document是指在sharded cluster環境下,一些同時存在於不同shard上的document。我們知道,在mongodb sharded cluster中,分布在不同shard的數據子集是正交的,即理論上一個document只能出現在一個shard上,document與shard的映射關系維護在config server中。官方文檔指出了可能產生orphaned document的情況:在chunk遷移的過程中,mongod實例異常宕機,導致遷移過程失敗或者部分完成。文檔中還指出,可以使用 cleanupOrphaned 來刪除orphaned document。

  新聞報道災難、事故的時候,一般都有這么一個潛規則:內容越短,事情也嚴重。不知道MongoDB對於orphaned document是不是也采用了這個套路,一來對orphaned document發生的可能原因描述不夠詳盡,二來也沒有提供檢測是否存在orphaned document的方法。對於cleanupOrphaned,要在生產環境使用也需要一定的勇氣。

orphaned document產生原因

   作為一個沒有看過MongoDB源碼的普通應用開發人員,拍腦袋想想,chuck的遷移應該有以下三個步驟:將數據從源shard拷貝到目標shard,更新config server中的metadata,從源shard刪除數據。當然,這三個步驟的順序不一定是上面的順序。這三個步驟,如果能保證原子性,那么理論上是不會出問題的。但是,orphaned document具體怎么出現的一直不是很清楚。

  前些天在瀏覽官方文檔的時候,發現有對遷移過程描述(chunk-migration-procedure),大致過程翻譯如下:

  1.   balancer向源shard發送moveChunk命令
  2.   源shard內部執行moveChunk命令,並保證在遷移的過程中,新插入的document還是寫入源shard
  3.   如果需要的話,目標shard創建需要的索引
  4.   目標shard從源shard請求數據;注意,這里是一個copy操作,而不是move操作
  5.   在接收完chunk的最后一個文檔后,目標shard啟動一個同步拷貝進程,保證拷貝到在遷移過程中又寫入源shard上的相關文檔
  6.   完全同步之后,目標shard向config server報告新的metadata(chunk的新位置信息)
  7.   在上一步完成之后,源shard開始刪除舊的document

  如果能保證以上操作的原子性,在任何步驟出問題應該都沒問題;如果不能保證,那么在第4,5,6,7步出現機器宕機,都有可能出問題。對於出問題的原因,官網(chunk-migration-queuing )是這么解釋的:

  the balancer does not wait for the current migration’s delete phase to complete before starting the next chunk migration

  This queuing behavior allows shards to unload chunks more quickly in cases of heavily imbalanced cluster, such as when performing initial data loads without pre-splitting and when adding new shards.
  If multiple delete phases are queued but not yet complete, a crash of the replica set’s primary can orphan data from multiple migrations.

  簡而言之,為了加速chunk 遷移的速度(比如在新的shard加入的時候,有大量的chunk遷移),因此delete phase(第7步)不會立刻執行,而是放入一個隊列,異步執行,此時如果crash,就可能產生孤兒文檔

產生一個orphaned document

  基於官方文檔,如何產生一個orphaned document呢? 我的想法很簡單:監控MongoDB日志,在出現標志遷移過程的日志出現的時候,kill掉shard中的primary!

預備知識

  在《通過一步步創建sharded cluster來認識mongodb》一文中,我詳細介紹了如何搭建一個sharded cluster,在我的實例中,使用了兩個shard,其中每個shard包括一個primary、一個secondary,一個arbiter。另外,創建了一個允許sharding的db -- test_db, 然后sharded_col這個集合使用_id分片,本文基於這個sharded cluster進行實驗。但需要注意的是在前文中,為了節省磁盤空間,我禁用了mongod實例的journal機制(啟動選項中的 --nojourbal),但在本文中,為了盡量符合真實情況,在啟動mongod的時候使用了--journal來啟用journal機制。

 

   另外,再補充兩點,第一個是chunk遷移的條件,只有當shards之間chunk的數目差異達到一定程度才會發生遷移:

Number of Chunks Migration Threshold
Fewer than 20 2
20-79 4
80 and greater 8

  第二個是,如果沒有在document中包含_id,那么mongodb會自動添加這個字段,其value是一個ObjectId,ObjectId由一下部分組成:

  • a 4-byte value representing the seconds since the Unix epoch,
  • a 3-byte machine identifier,
  • a 2-byte process id, and
  • a 3-byte counter, starting with a random value.
  因此,在沒有使用hash sharding key(默認是ranged sharding key)的情況下,在短時間內插入大量沒有攜帶_id字段的ducoment,會插入到同一個shard,這也有利於出現chunk分裂和遷移的情況。

准備

   首先,得知道chunk遷移的時候日志是什么樣子的,因此我用python腳本插入了一些記錄,通過sh.status()發現有chunk分裂、遷移的時候去查看mongodb日志,在rs1(sharded_col這個集合的primary shard)的primary(rs1_1.log)里面發現了如下的輸出:

   
 34 2017-07-06T21:43:21.629+0800 I NETWORK  [conn6] starting new replica set monitor for replica set rs2 with seeds 127.0.0.1:27021,127.0.0.1:27022
 48 2017-07-06T21:43:23.685+0800 I SHARDING [conn6] moveChunk data transfer progress: { active: true, ns: "test_db.sharded_col", from: "rs1/127.0.0.1:27018,127.0.0.1:27019"    , min: { _id: ObjectId('595e3e74d71ffd5c7be8c8b7') }, max: { _id: MaxKey }, shardKeyPattern: { _id: 1.0 }, state: "steady", counts: { cloned: 1, clonedBytes: 83944, cat    chup: 0, steady: 0 }, ok: 1.0, $gleStats: { lastOpTime: Timestamp 0|0, electionId: ObjectId('595e3b0ff70a0e5c3d75d684') } } my mem used: 0
 52 -017-07-06T21:43:23.977+0800 I SHARDING [conn6] moveChunk migrate commit accepted by TO-shard: { active: false, ns: "test_db.sharded_col", from: "rs1/127.0.0.1:27018,12    7.0.0.1:27019", min: { _id: ObjectId('595e3e74d71ffd5c7be8c8b7') }, max: { _id: MaxKey }, shardKeyPattern: { _id: 1.0 }, state: "done", counts: { cloned: 1, clonedBytes    : 83944, catchup: 0, steady: 0 }, ok: 1.0, $gleStats: { lastOpTime: Timestamp 0|0, electionId: ObjectId('595e3b0ff70a0e5c3d75d684') } }
 53 w017-07-06T21:43:23.977+0800 I SHARDING [conn6] moveChunk updating self version to: 3|1||590a8d4cd2575f23f5d0c9f3 through { _id: ObjectId('5937e11f48e2c04f793b1242') }     -> { _id: ObjectId('595b829fd71ffd546f9e5b05') } for collection 'test_db.sharded_col'
 54 2017-07-06T21:43:23.977+0800 I NETWORK  [conn6] SyncClusterConnection connecting to [127.0.0.1:40000]
 55 2017-07-06T21:43:23.978+0800 I NETWORK  [conn6] SyncClusterConnection connecting to [127.0.0.1:40001]
 56 2017-07-06T21:43:23.978+0800 I NETWORK  [conn6] SyncClusterConnection connecting to [127.0.0.1:40002]
 57 2017-07-06T21:43:24.413+0800 I SHARDING [conn6] about to log metadata event: { _id: "xxx-2017-07-06T13:43:24-595e3e7c0db0d72b7244e620", server: "xxx", clientAddr: "127.0.0.1:52312", time: new Date(1499348604413), what: "moveChunk.commit", ns: "test_db.sharded_col", details: { min: { _id: ObjectId(    '595e3e74d71ffd5c7be8c8b7') }, max: { _id: MaxKey }, from: "rs1", to: "rs2", cloned: 1, clonedBytes: 83944, catchup: 0, steady: 0 } }
 58 2017-07-06T21:43:24.417+0800 I SHARDING [conn6] MigrateFromStatus::done About to acquire global lock to exit critical section
 59 2017-07-06T21:43:24.417+0800 I SHARDING [conn6] forking for cleanup of chunk data
 60 2017-07-06T21:43:24.417+0800 I SHARDING [conn6] MigrateFromStatus::done About to acquire global lock to exit critical section
 61 2017-07-06T21:43:24.417+0800 I SHARDING [RangeDeleter] Deleter starting delete for: test_db.sharded_col from { _id: ObjectId('595e3e74d71ffd5c7be8c8b7') } -> { _id: MaxKey }, with opId: 6
 62 2017-07-06T21:43:24.417+0800 I SHARDING [RangeDeleter] rangeDeleter deleted 1 documents for test_db.sharded_col from { _id: ObjectId('595e3e74d71ffd5c7be8c8b7') } -> { _id: MaxKey }

  上面第59行,“forking for cleanup of chunk data”,看起來是准備刪除舊的數據了

  於是我寫了一個shell腳本: 在rs1_1.log日志中出現“forking for cleanup of chunk data”時kill掉rs1_1這個進程,腳本如下:
  
check_loop()
{
    echo 'checking'
    ret=`grep -c 'forking for cleanup of chunk data' /home/mongo_db/log/rs1_1.log`
    if [ $ret -gt 0 ]; then
         echo "will kill rs1 primary"
         kill -s 9 `ps aux | grep rs1_1 | awk '{print $2}'`
         exit 0
    fi

    ret=`grep -c 'forking for cleanup of chunk data' /home/mongo_db/log/rs2_1.log`
    if [ $ret -gt 0 ]; then
         echo "will kill rs2 primary"
         kill -s 9 `ps aux | grep rs2_1 | awk '{print $2}'`
         exit 0
    fi

    sleep 0.1
    check_loop
}
check_loop

 

第一次嘗試

     第一次嘗試就是使用的上面的腳本。

  首先運行上面的shell腳本,然后另起一個終端開始插入數據,在shell腳本kill掉進程之后,立即登上rs1和rs2查看統計數據,發現並沒有產生orphaned document(怎么檢測看第二次嘗試)

  再回看前面的日志,幾乎是出現“forking for cleanup of chunk data”的同一時刻就出現了“rangeDeleter deleted 1 documents for test_db.sharded_col from”,后者表明數據已經被刪除。而shell腳本0.1s才檢查一次,很可能在遷移過程已經完成之后才發出kill信號。於是將kill的時機提前,在shell腳本中檢查“moveChunk migrate commit accepted”(上述文檔中的第52行)

  對shell腳本的修改也很簡單,替換一下grep的內容:

check_loop()
{
    echo 'checking'
    ret=`grep -c 'moveChunk migrate commit accepted' /home/mongo_db/log/rs1_1.log`
    if [ $ret -gt 0 ]; then
         echo "will kill rs1 primary"
         kill -s 9 `ps aux | grep rs1_1 | awk '{print $2}'`
         exit 0
    fi

    ret=`grep -c 'moveChunk migrate commit accepted' /home/mongo_db/log/rs2_1.log`
    if [ $ret -gt 0 ]; then
         echo "will kill rs2 primary"
         kill -s 9 `ps aux | grep rs2_1 | awk '{print $2}'`
         exit 0
    fi

    sleep 0.1
    check_loop
}
check_loop
View Code

 

第二次嘗試

    在進行第二次嘗試之前,清空了sharded_col中的記錄,一遍更快產生chunk遷移。

    重復之間的步驟:啟動shell腳本,然后插入數據,等待shell腳本kill掉進程后終止

    很快,shell腳本就終止了,通過ps aux | grep mongo 也證實了rs1_1被kill掉了,登錄到mongos(mongo --port 27017)

  mongos> db.sharded_col.find().count()
  4
 
  再登錄到rs1 primary(此時由於rs1原來的primary被kill掉,新的primary是rs1-2,端口號27019)
  rs1:PRIMARY> db.sharded_col.find({}, {'_id': 1})
  { "_id" : ObjectId("595ef413d71ffd4a82dea30d") }
  { "_id" : ObjectId("595ef413d71ffd4a82dea30e") }
  { "_id" : ObjectId("595ef413d71ffd4a82dea30f") }
 
  再登錄到rs2 primary
  rs2:PRIMARY> db.sharded_col.find({}, {'_id': 1})
  { "_id" : ObjectId("595ef413d71ffd4a82dea30f") }
 
  很明顯,產生了orphaned docuemnt,ObjectId("595ef413d71ffd4a82dea30f") 這條記錄存在於兩個shard上,那么mongodb sharded cluter認為這條記錄應該存在於哪個shard上呢,簡單的辦法直接用 sh.status()查看,不過這里忘了截圖。另外一種方式,給這條記錄新加一個字段,然后分別在兩個shard上查詢
  mongos> db.sharded_col.update({'_id': ObjectId("595ef413d71ffd4a82dea30f") }, {$set:{'newattr': 10}})
  WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })
 
  rs1:PRIMARY> db.sharded_col.find({'_id': ObjectId("595ef413d71ffd4a82dea30f")}, {'newattr': 1})
  { "_id" : ObjectId("595ef413d71ffd4a82dea30f"), "newattr" : 10 }
 
  rs2:PRIMARY> db.sharded_col.find({'_id': ObjectId("595ef413d71ffd4a82dea30f")}, {'newattr': 1})
  { "_id" : ObjectId("595ef413d71ffd4a82dea30f") }
  
  這證實了這條記錄在rs1這個shard上
 

  此時,重新啟動rs1_1, 通過在rs.status()查看rs1這個shard正常之后,重新查看sh.status(),發現結果還是一樣的。據此推斷,並沒有journal信息恢復被終止的遷移過程。

     因此在本次實驗中,ObjectId("595ef413d71ffd4a82dea30f")這條記錄本來是要從rs遷移到rs2,由於人為kill掉了rs1的primary,導致遷移只進行了一部分,產生了orphaned document。回顧前面提到的遷移過程,本次實驗中對rs1 primary的kill發生在第6步之前(目標shard在config server上更新metadata之前)

使用cleanupOrphaned

   orphaned document的影響在於某些查詢會多出一些記錄:多出這些孤兒文檔,比如前面的count操作,事實上只有3條記錄,但返回的是4條記錄。如果查詢的時候沒用使用sharding key(這里的_id)精確匹配,也會返回多余的記錄,另外,即使使用了sharding key,但是如果使用了$in,或者范圍查詢,都可能出錯。比如: 

  mongos> db.sharded_col.find({'_id': {$in:[ ObjectId("595ef413d71ffd4a82dea30f")]}}).count()
  1
  mongos> db.sharded_col.find({'_id': {$in:[ ObjectId("595ef413d71ffd4a82dea30f"), ObjectId("595ef413d71ffd4a82dea30d")]}}).count()
  3

  上面第二條查詢語句,使用了$in,理論上應該返回兩條記錄,單因為孤兒文檔,返回了三條。

  本質上,如果一個操作要路由到多個shard,存在orphaned document的情況下都可能出錯。這讓應用開發人員防不勝防,也不可能在邏輯里面兼容孤兒文檔這種異常情況。

  MongoDB提供了解決這個問題的終極武器, cleanupOrphaned,那我們來試試看,按照官方的文檔( remove-all-orphaned-documents-from-a-shard),刪除所有的orphaned document。注意,cleanupOrphaned要在shard的primary上執行;
 
 
  Run  cleanupOrphaned in the  admin database directly on the  mongod instance that is the primary replica set member of the shard. Do not run  cleanupOrphaned on a  mongos instance.

  但是在哪一個shard上執行呢,是“正確的shard“,還是“錯誤的shard”呢,文檔里面並沒有寫清楚,我想在兩個shard上都執行一下應該沒問題吧。

  不管在rs1, 還是rs2上執行,結果都是一樣的: 

  {
          "ok" : 0,
          "errmsg" : "server is not part of a sharded cluster or the sharding metadata is not yet initialized."
  }

  errmsg:server is not part of a sharded cluster or the sharding metadata is not yet initialized.

  通過sh.status()查看:

   shards:
          {  "_id" : "rs1",  "host" : "rs1/127.0.0.1:27018,127.0.0.1:27019" }
          {  "_id" : "rs2",  "host" : "rs2/127.0.0.1:27021,127.0.0.1:27022" }

  顯然,rs1、rs2都是sharded cluster的一部分,那么可能的情況就是“sharding metadata is not yet initialized”

  關於這個錯誤,在https://groups.google.com/forum/#!msg/mongodb-user/cRr7SbE1xlU/VytVnX-ffp8J 這個討論中指出了同樣的問題,但似乎沒有完美的解決辦法。至少,對於討論中提到的方法,我都嘗試過,但都沒作用。

總結

  本文只是嘗試復現孤兒文檔,當然我相信也更多的方式可以發現孤兒文檔。另外,按照上面的日志,選擇在不同的時機kill掉shard(replica set)的primary應該有不同的效果,也就是在遷移過程中的不同階段終止遷移過程。另外,通過實驗,發現cleanupOrphaned指令並沒有想象中好使,對於某些情況下產生的孤兒文檔,並不一定能清理掉。當然,也有可能是我的姿勢不對,歡迎讀者親自實驗。

references

 orphaned document

cleanupOrphaned

通過一步步創建sharded cluster來認識mongodb

chunk-migration-procedure

chunk-migration-queuing 

remove-all-orphaned-documents-from-a-shard


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM