mongoDB oplog 說明
ts:8字節的時間戳,由4字節unix timestamp + 4字節自增計數表示。這個值很重要,在選舉(如master宕機時)新primary時,會選擇ts最大的那個secondary作為新primary。
op:1字節的操作類型,例如i表示insert,d表示delete。
ns:操作所在的namespace。
o:操作所對應的document,即當前操作的內容(比如更新操作時要更新的的字段和值)
o2: 在執行更新操作時的where條件,僅限於update時才有該屬性
"i": insert
"u": update
"d": delete
"c": db cmd
"db":聲明當前數據庫 (其中ns 被設置成為=>數據庫名稱+ '.')
"n": no op,即空操作,其會定期執行以確保時效性
節選自:http://www.cnblogs.com/daizhj/archive/2011/06/27/mongodb_sourcecode_oplog.html
在實踐的時候還得注意一個點:mongoDB的副本集(replSet)
當使用復制集(Replica sets)模式時,其會使用下面的local數據庫:
local.system.replset 用於復制集配置對象存儲 (通過shell下的rs.conf()或直接查詢)
local.oplog.rs 一個capped collection集合.可在命令行下使用--oplogSize 選項設置該集合大小尺寸.
local.replset.minvalid 通常在復制集內使用,用於跟蹤同步狀態(sync status)
mongoDB oplog 的應用
如果需要及時獲取mongoDB的增量信息,就可以應用oplog了!
常用的場景模式:索引更新,主動更新緩存等。
通常一個服務監控oplog,將“增量信息”經過一定的處理后塞到ActiveMQ中,相關的應用程序再從ActiveMQ中獲取消息進行消費。
概要的實例代碼:
public static void main(String[] args) throws UnknownHostException { Mongo mongo = new Mongo("199.155.122.226", 11112); DB db = mongo.getDB("local"); DBCollection collection = db.getCollection("oplog.rs"); if (collection != null) { BSONTimestamp ts = new BSONTimestamp(1, 1); final BasicDBObject query = new BasicDBObject(); query.append("ts", new BasicDBObject(QueryOperators.GT, ts)); DBCursor cursor = collection.find(query).sort( new BasicDBObject("$natural", 1)); while (cursor != null && cursor.hasNext()) { DBObject obj = cursor.next(); System.out.println(obj.toString()); } } }
注意點:
1)如果當前服務斷掉了,要有個斷點機制,下次可以接着來。這個主要是對ts字段進行控制。
因此需要實時記錄讀到了什么位置。
2)如果應用端是異構的,采用一種跨語言的協議,例如ProtoBuf。
3)消息中間件(ActiveMQ)起到解耦、緩沖的作用。