MongoDB 變更流(Change Stream)介紹


1. 什么是Change Stream
Change Stream 是MongoDB用於實現變更追蹤的解決方案,類似於關系數據庫的觸發器,但原理不完全相同:

| | Change Stream | 觸發器 |
|--------------|-----------------|---------------|
| 觸發方式 | 異步 | 同步(事務保證) |
| 觸發位置 | 應用回調事件 | 數據庫觸發器 |
| 觸發次數 | 每個訂閱事件的客戶端 | 1次(觸發器) |
| 故障恢復 | 從上次斷點重新觸發 | 事務回滾 |

2. Change Stream 實現原理
Change Stream 是基於oplog實現的。它在oplog上開啟一個tailable cursor 來追蹤復制集上的變更操作,
最終調用應用中定義的回調函數。被追蹤的變更事件主要包括:

· insert/update/delete:插入、更新、刪除;

. drop:集合被刪除;

. rename:集合被重命名;

. dropDatabase:數據庫被刪除;

. invalidate: drop/rename/dropDatabase 將導致invalidate被觸發,並關閉 change stream;

3. Change Stream 與可重復讀
Change Stream 只推送已經在大多數節點上提交的變更操作。即“可重復讀”的變更。
這個驗證是通過{readConcern:"majority"}實現的。因此:

· 未開啟majority readConcern的集群無法使用Change Stream;

· 當集群無法滿足{w:"majority"}時,不會觸發Change Stream(例如PSA架構中的S因故宕機)。

4. Change Stream 變更過濾
如果只對某些類型的變更事件感興趣,可以使用聚合管道的過濾步驟過濾事件。
例如:
```
var cs = db.collection.watch([{
$match: {
operationType: {
$in: ['insert', 'delete']
}
}
}])
```
5. Change Stream 故障恢復
假設在一系列寫入操作的過程中,訂閱Change Stream的應用在接收到“寫3”之后於t0時刻崩潰,重啟后后續的變更怎么辦?
[!qr](./images/026_t_1.png)
想要從上次中斷的地方繼續獲取變更流,只需要保留上次變更通知中的_id即可。
右側所示是一次Change Stream回調所返回的數據。每條這樣的數據都帶有一個_id,這個_id可以用於斷點恢復。例如:

var cs = db.collection.watch([],{resumeAfter:<_id>})
即可從上一條通知中斷處繼續獲取后續的變更通知。
6. Change Stream 使用場景
· 跨集群的變更復制——在源集群中訂閱Change Stream, 一旦得到任何變更立即寫入目標集群。

· 微服務聯動——當一個微服務變更數據庫時,其他微服務得到通知並做出響應的變更。

· 其他任何需要聯動的場景。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM