實時同步MongoDB Oplog開發指南


轉載請注明joymufeng,歡迎訪問PlayScala社區(http://www.playscala.cn/)

Capped Collections

MongoDB有一種特殊的Collection叫Capped collections,它的插入速度非常快,基本和磁盤的寫入速度差不多,並且支持按照插入順序高效的查詢操作。Capped collections的大小是固定的,它的工作方式很像環形緩沖器(circular buffers), 當剩余空間不足時,會覆蓋最先插入的數據。

Capped collections的特點是高效插入和檢索,所以最好不要在Capped collections上添加額外的索引,否則會影響插入速度。Capped collections可以用於以下場景:

  • 存儲日志: Capped collections的first-in-first-out特性剛好滿足日志事件的存儲順序;
  • 緩存小量數據:因為緩存的特點是讀多寫少,所以可以適當使用索引提高讀取速度。

Capped collections的使用限制:

  • 如果更新數據,你需要為之創建索引以防止collection scan;
  • 更新數據時,文檔的大小不能改變。比如說name屬性為'abc',則只能修改成3個字符的字符串,否則操作將會失敗;
  • 數據不允許刪除,如果非刪除不可,只能drop collection
  • 不支持sharding
  • 默認只支持按自然順序(即插入順序)返回結果

Capped collections可以使用$natural操作符按插入順序的正序或反序返回結果:

db['oplog.rs'].find({}).sort({$natural: -1}) 

Oplog

Oplog是一種特殊的Capped collections,特殊之處在於它是系統級Collection,記錄了數據庫的所有操作,集群之間依靠Oplog進行數據同步。Oplog的全名是local.oplog.rs,位於local數據下。由於local數據不允許創建用戶,如果要訪問Oplog需要借助其它數據庫的用戶,並且賦予該用戶訪問local數據庫的權限,例如:

db.createUser({
   user: "play-community",
   pwd: "******",    "roles" : [     {       "role" : "readWrite",        "db" : "play-community"     },      {       "role" : "read",        "db" : "local"     }   ] }) 

Oplog記錄的操作記錄是冪等的(idempotent),這意味着你可以多次執行這些操作而不會導致數據丟失或不一致。例如對於$inc操作,Oplog會自動將其轉換為$set操作,例如原始數據如下:

{ 
  "_id" : "0",    "count" : 1.0 } 

執行如下$inc操作:

db.test.update({_id: "0"}, {$inc: {count: 1}}) 

Oplog記錄的日志為:

{ 
  "ts" : Timestamp(1503110518, 1),    "t" : NumberLong(8),    "h" : NumberLong(-3967772133090765679),    "v" : NumberInt(2),    "op" : "u",    "ns" : "play-community.test",    "o2" : {     "_id" : "0"   },    "o" : {     "$set" : {       "count" : 2.0     }   } } 

這種轉換可以保證Oplog的冪等性。另外Oplog為了保證插入性能,不允許額外創建索引。

Timestamps格式

MongoDB有一種特殊的時間格式Timestamps,僅用於內部使用,例如上面Oplog記錄:

Timestamp(1503110518, 1) 

Timestamps長度為64位:

  • 前32位是time_t值,表示從epoch時間至今的秒數
  • 后32位是ordinal值,該值是一個順序增長的序數,表示某一秒內的第幾次操作

開始同步Oplog

在開始同步Oplog之前,我們需要注意以下幾點:

  • 由於Oplog不使用索引,所以初始查詢代價可能很大
  • 當Oplog數據量很大時,可以保存ts,系統重啟時利用該ts可以減少首次查詢開銷
  • oplogReplay標志可以顯著加快包含ts條件過濾的查詢,但是只對oplog查詢有效
val tailingCursor =
 oplogCol
  .find(Json.obj("ns" -> Json.obj("$in" -> Set(s"${db}.common-doc", s"${db}.common-article")), "ts" -> Json.obj("$gte" -> lastTS)))   .options(QueryOpts().tailable.oplogReplay.awaitData.noCursorTimeout)   .cursor[BSONDocument]() tailingCursor.fold(()){ (_, doc) =>  try {   val jsObj = doc.as[JsObject]   jsObj("op").as[String] match {    case "i" => // 插入    case "u" => // 更新    case "d" => // 刪除   }   // 保存ts值,以備后用   if (tailCount.get() % 10 == 0) { }  } catch {   case t: Throwable =>    Logger.error("Tail oplog Error: " + t.getMessage, t)  } } 

另外提醒大家注意,ReactiveMongo-Streaming的Akka Stream實現有bug,如果首次查詢沒有數據返回,則會持續發送查詢請求,大約每秒中發送幾十次至幾百次請求,因為Oplog的查詢開銷很大,最終會導致MongoDB內存溢出。詳情參考Keep sending queries while the initial query result of a tailable cursor is empty.

參考


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM