完美數據遷移-MongoDB Stream的應用


一、背景介紹

最近微服務架構火的不行,但本質上也只是風口上的一個熱點詞匯。
作為筆者的經驗來說,想要應用一個新的架構需要帶來的變革成本是非常高的。

盡管如此,目前還是有許多企業踏上了服務化改造的道路,這其中則免不了"舊改"的各種繁雜事。
所謂的"舊改",就是把現有的系統架構來一次重構,拆分成多個細粒度的服務后,然后找時間
升級割接一把,讓新系統上線。這其中,數據的遷移往往會成為一個非常重要且繁雜的活兒。

拆分服務時數據遷移的挑戰在哪?

  1. 首先是難度大,做一個遷移方案需要了解項目的前身今世,評估遷移方案、技術工具等等;

  2. 其次是成本高。由於新舊系統數據結構是不一樣的,需要定制開發遷移轉化功能。很難有一個通用的工具能一鍵遷移;

  3. 再者,對於一些容量大、可靠性要求高的系統,要能夠不影響業務,出了問題還能追溯,因此方案上還得往復雜了想。

二、常見方案

按照遷移的方案及流程,可將數據遷移分為三類:

1. 停機遷移

最簡單的方案,停機遷移的順序如下:

采用停機遷移的好處是流程操作簡單,工具成本低;然而缺點也很明顯,
遷移過程中業務是無法訪問的,因此只適合於規格小、允許停服的場景。

2. 業務雙寫

業務雙寫是指對現有系統先進行改造升級,支持同時對新庫和舊庫進行寫入。
之后再通過數據遷移工具對舊數據做全量遷移,待所有數據遷移轉換完成后切換到新系統。

示意圖:

業務雙寫的方案是平滑的,對線上業務影響極小;在出現問題的情況下可重新來過,操作壓力也會比較小。

筆者在早些年前嘗試過這樣的方案,整個遷移過程確實非常順利,但實現該方案比較復雜,
需要對現有的代碼進行改造並完成新數據的轉換及寫入,對於開發人員的要求較高。
在業務邏輯清晰、團隊對系統有足夠的把控能力的場景下適用。

3. 增量遷移

增量遷移的基本思路是先進行全量的遷移轉換,待完成后持續進行增量數據的處理,直到數據追平后切換系統。

示意圖:

關鍵點

  • 要求系統支持增量數據的記錄。
    對於MongoDB可以利用oplog實現這點,為避免全量遷移過程中oplog被沖掉,
    在開始遷移前就必須開始監聽oplog,並將變更全部記錄下來。
    如果沒有辦法,需要從應用層上考慮,比如為所有的表(集合)記錄下updateTime這樣的時間戳,
    或者升級應用並支持將修改操作單獨記錄下來。

  • 增量數據的回放是持續的。
    在所有的增量數據回放轉換過程中,系統仍然會產生新的增量數據,這要求遷移工具
    能做到將增量數據持續回放並將之追平,之后才能做系統切換。

MongoDB 3.6版本開始便提供了Change Stream功能,支持對數據變更記錄做監聽。
這為實現數據同步及轉換處理提供了更大的便利,下面將探討如何利用Change Stream實現數據的增量遷移。

三、Change Stream 介紹

Chang Stream(變更記錄流) 是指collection(數據庫集合)的變更事件流,應用程序通過db.collection.watch()這樣的命令可以獲得被監聽對象的實時變更。
在該特性出現之前,你可以通過拉取 oplog達到同樣的目的;但 oplog 的處理及解析相對復雜且存在被回滾的風險,如果使用不當的話還會帶來性能問題。
Change Stream 可以與aggregate framework結合使用,對變更集進行進一步的過濾或轉換。

由於Change Stream 利用了存儲在 oplog 中的信息,因此對於單進程部署的MongoDB無法支持Change Stream功能,
其只能用於啟用了副本集的獨立集群或分片集群

監聽的目標

名稱 說明
單個集合 除系統庫(admin/local/config)之外的集合,3.6版本支持
單個數據庫 除系統庫(admin/local/config)之外的數據庫集合,4.0版本支持
整個集群 整個集群內除去系統庫( (admin/local/config)之外的集合 ,4.0版本支持

變更事件

一個Change Stream Event的基本結構如下所示:

{
   _id : { <BSON Object> },
   "operationType" : "<operation>",
   "fullDocument" : { <document> },
   "ns" : {
      "db" : "<database>",
      "coll" : "<collection"
   },
   "documentKey" : { "_id" : <ObjectId> },
   "updateDescription" : {
      "updatedFields" : { <document> },
      "removedFields" : [ "<field>", ... ]
   }
   "clusterTime" : <Timestamp>,
   "txnNumber" : <NumberLong>,
   "lsid" : {
      "id" : <UUID>,
      "uid" : <BinData>
   }
}

字段說明

名稱 說明
_id 變更事件的Token對象
operationType 變更類型(見下面介紹)
fullDocument 文檔內容
ns 監聽的目標
ns.db 變更的數據庫
ns.coll 變更的集合
documentKey 變更文檔的鍵值,含_id字段
updateDescription 變更描述
updateDescription.updatedFields 變更中更新字段
updateDescription.removedFields 變更中刪除字段
clusterTime 對應oplog的時間戳
txnNumber 事務編號,僅在多文檔事務中出現,4.0版本支持
lsid 事務關聯的會話編號,僅在多文檔事務中出現,4.0版本支持

Change Steram支持的變更類型有以下幾個:

類型 說明
insert 插入文檔
delete 刪除文檔
replace 替換文檔,當執行replace操作指定upsert時,可能是insert事件
update 更新文檔,當執行update操作指定upsert時,可能是insert事件
invalidate 失效事件,比如執行了collection.drop或collection.rename

利用以下的shell腳本,可以打印出集合 T_USER上的變更事件:

watchCursor=db.T_USER.watch()
while (!watchCursor.isExhausted()){
   if (watchCursor.hasNext()){
      printjson(watchCursor.next());
   }
}

下面提供一些樣例,感受一下

insert 事件

{
    "_id": {
        "_data": "825B5826D10000000129295A10046A31C593902B4A9C9907FC0AB1E3C0DA46645F696400645B58272321C4761D1338F4860004"
    },
    "operationType": "insert",
    "clusterTime": Timestamp(1532503761, 1),
    "fullDocument": {
        "_id": ObjectId("5b58272321c4761d1338f486"),
        "name": "LiLei",
        "createTime": ISODate("2018-07-25T07:30:43.398Z")
    },
    "ns": {
        "db": "appdb",
        "coll": "T_USER"
    },
    "documentKey": {
        "_id": ObjectId("5b58272321c4761d1338f486")
    }
}

update事件

{
 "_id" : {
  "_data" : "825B5829DF0000000129295A10046A31C593902B4A9C9907FC0AB1E3C0DA46645F696400645B582980ACEC5F345DB998EE0004"
 },
 "operationType" : "update",
 "clusterTime" : Timestamp(1532504543, 1),
 "ns" : {
  "db" : "appdb",
  "coll" : "T_USER"
 },
 "documentKey" : {
  "_id" : ObjectId("5b582980acec5f345db998ee")
 },
 "updateDescription" : {
  "updatedFields" : {
   "age" : 15
  },
  "removedFields" : [ ]
 }
}

replace事件

{
    "_id" : {
        "_data" : "825B58299D0000000129295A10046A31C593902B4A9C9907FC0AB1E3C0DA46645F696400645B582980ACEC5F345DB998EE0004"
    },
    "operationType" : "replace",
    "clusterTime" : Timestamp(1532504477, 1),
    "fullDocument" : {
        "_id" : ObjectId("5b582980acec5f345db998ee"),
        "name" : "HanMeimei",
        "age" : 12
    },
    "ns" : {
        "db" : "appdb",
        "coll" : "T_USER"
    },
    "documentKey" : {
        "_id" : ObjectId("5b582980acec5f345db998ee")
    }
}

delete事件

{
    "_id" : {
        "_data" : "825B5827A90000000229295A10046A31C593902B4A9C9907FC0AB1E3C0DA46645F696400645B58272321C4761D1338F4860004"
    },
    "operationType" : "delete",
    "clusterTime" : Timestamp(1532503977, 2),
    "ns" : {
        "db" : "appdb",
        "coll" : "T_USER"
    },
    "documentKey" : {
        "_id" : ObjectId("5b58272321c4761d1338f486")
    }
}

invalidate 事件
執行db.T_USER.drop() 可輸出

{
    "_id" : {
        "_data" : "825B582D620000000329295A10046A31C593902B4A9C9907FC0AB1E3C0DA04"
    },
    "operationType" : "invalidate",
    "clusterTime" : Timestamp(1532505442, 3)
}

更多的Change Event 信息可以參考這里

四、實現增量遷移

本次設計了一個簡單的論壇帖子遷移樣例,用於演示如何利用Change Stream實現完美的增量遷移方案。
背景如下:
現有的系統中有一批帖子,每個帖子都屬於一個頻道(channel),如下表

頻道名 英文簡稱
美食 Food
情感 Emotion
寵物 Pet
家居 House
征婚 Marriage
教育 Education
旅游 Travel

新系統中頻道字段將采用英文簡稱,同時要求能支持平滑升級。
根據前面篇幅的敘述,我們將使用Change Stream 功能實現一個增量遷移的方案。

相關表的轉換如下圖:

原理
topic 是帖子原表,在遷移開始前將開啟watch任務持續獲得增量數據,並記錄到 topic_incr表中;
接着執行全量的遷移轉換,之后再持續對增量表數據進行遷移,直到無新的增量為止。

接下來我們使用Java程序來完成相關代碼,mongodb-java--driver 在 3.6 版本后才支持 watch 功能
需要確保升級到對應版本:

<dependency>
     <groupId>org.mongodb</groupId>
     <artifactId>mongo-java-driver</artifactId>
     <version>3.6.4</version>
</dependency>
  1. 定義Channel頻道的轉換表
public static enum Channel {
    Food("美食"),
    Emotion("情感"),
    Pet("寵物"),
    House("家居"),
    Marriage("征婚"),
    Education("教育"),
    Travel("旅游")
    ;
    private final String oldName;

    public String getOldName() {
        return oldName;
    }

    private Channel(String oldName) {
        this.oldName = oldName;
    }

    /**
     * 轉換為新的名稱
     * 
     * @param oldName
     * @return
     */
    public static String toNewName(String oldName) {
        for (Channel channel : values()) {
            if (channel.oldName.equalsIgnoreCase(oldName)) {
                return channel.name();
            }
        }
        return "";
    }

    /**
     * 返回一個隨機頻道
     * 
     * @return
     */
    public static Channel random() {
        Channel[] channels = values();
        int idx = (int) (Math.random() * channels.length);
        return channels[idx];
    }
}
  1. 為 topic 表預寫入1w條記錄
private static void preInsertData() {
    MongoCollection<Document> topicCollection = getCollection(coll_topic);

    // 分批寫入,共寫入1w條數據
    int current = 0;
    int batchSize = 100;

    while (current < 10000) {
        List<Document> topicDocs = new ArrayList<Document>();

        for (int j = 0; j < batchSize; j++) {
            Document topicDoc = new Document();

            Channel channel = Channel.random();
            topicDoc.append(field_channel, channel.getOldName());
            topicDoc.append(field_nonce, (int) (Math.random() * nonce_max));

            topicDoc.append("title", "This is the tilte -- " + UUID.randomUUID().toString());
            topicDoc.append("author", "LiLei");
            topicDoc.append("createTime", new Date());
            topicDocs.add(topicDoc);
        }

        topicCollection.insertMany(topicDocs);
        current += batchSize;
        logger.info("now has insert {} records", current);
    }
}

上述實現中,每個帖子都分配了隨機的頻道(channel)

  1. 開啟監聽任務,將topic上的所有變更寫入到增量表
MongoCollection<Document> topicCollection = getCollection(coll_topic);
MongoCollection<Document> topicIncrCollection = getCollection(coll_topic_incr);

// 啟用 FullDocument.update_lookup 選項
cursor = topicCollection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).iterator();
while (cursor.hasNext()) {

    ChangeStreamDocument<Document> changeEvent = cursor.next();
    OperationType type = changeEvent.getOperationType();
    logger.info("{} operation detected", type);

    if (type == OperationType.INSERT || type == OperationType.UPDATE || type == OperationType.REPLACE
            || type == OperationType.DELETE) {

        Document incrDoc = new Document(field_op, type.getValue());
        incrDoc.append(field_key, changeEvent.getDocumentKey().get("_id"));
        incrDoc.append(field_data, changeEvent.getFullDocument());
        topicIncrCollection.insertOne(incrDoc);
    }
}

代碼中通過watch 命令獲得一個MongoCursor對象,用於遍歷所有的變更。
FullDocument.UPDATE_LOOKUP選項啟用后,在update變更事件中將攜帶完整的文檔數據(FullDocument)。

watch()命令提交后,mongos會與分片上的mongod(主節點)建立訂閱通道,這可能需要花費一點時間。

  1. 為了模擬線上業務的真實情況,啟用幾個線程對topic表進行持續寫操作;
private static void startMockChanges() {

    threadPool.submit(new ChangeTask(OpType.insert));
    threadPool.submit(new ChangeTask(OpType.update));
    threadPool.submit(new ChangeTask(OpType.replace));
    threadPool.submit(new ChangeTask(OpType.delete));
}

ChangeTask 實現邏輯如下:

while (true) {
    logger.info("ChangeTask {}", opType);
    if (opType == OpType.insert) {
        doInsert();
    } else if (opType == OpType.update) {
        doUpdate();
    } else if (opType == OpType.replace) {
        doReplace();
    } else if (opType == OpType.delete) {
        doDelete();
    }
    sleep(200);
    long currentAt = System.currentTimeMillis();
    if (currentAt - startAt > change_during) {
        break;
    }
}

每一個變更任務會不斷對topic產生寫操作,觸發一系列ChangeEvent產生。

  • doInsert:生成隨機頻道的topic后,執行insert
  • doUpdate:隨機取得一個topic,將其channel字段改為隨機值,執行update
  • doReplace:隨機取得一個topic,將其channel字段改為隨機值,執行replace
  • doDelete:隨機取得一個topic,執行delete

doUpdate為例,實現代碼如下:

private void doUpdate() {
    MongoCollection<Document> topicCollection = getCollection(coll_topic);

    Document random = getRandom();
    if (random == null) {
        logger.info("update skip");
        return;
    }

    String oldChannel = random.getString(field_channel);
    Channel channel = Channel.random();

    random.put(field_channel, channel.getOldName());
    random.put("createTime", new Date());
    topicCollection.updateOne(new Document("_id", random.get("_id")), new Document("$set", random));

    counter.onChange(oldChannel, channel.getOldName());
}
  1. 啟動一個全量遷移任務,將 topic 表中數據遷移到 topic_new 新表
final MongoCollection<Document> topicCollection = getCollection(coll_topic);
final MongoCollection<Document> topicNewCollection = getCollection(coll_topic_new);

Document maxDoc = topicCollection.find().sort(new Document("_id", -1)).first();
if (maxDoc == null) {
    logger.info("FullTransferTask detect no data, quit.");
    return;
}

ObjectId maxID = maxDoc.getObjectId("_id");
logger.info("FullTransferTask maxId is {}..", maxID.toHexString());

AtomicInteger count = new AtomicInteger(0);

topicCollection.find(new Document("_id", new Document("$lte", maxID)))
        .forEach(new Consumer<Document>() {

            @Override
            public void accept(Document topic) {
                Document topicNew = new Document(topic);
                // channel轉換
                String oldChannel = topic.getString(field_channel);
                topicNew.put(field_channel, Channel.toNewName(oldChannel));

                topicNewCollection.insertOne(topicNew);
                if (count.incrementAndGet() % 100 == 0) {
                    logger.info("FullTransferTask progress: {}", count.get());
                }
            }

        });
logger.info("FullTransferTask finished, count: {}", count.get());

在全量遷移開始前,先獲得當前時刻的的最大 _id 值(可以將此值記錄下來)作為終點。
隨后逐個完成遷移轉換。

  1. 在全量遷移完成后,便開始最后一步:增量遷移

注:增量遷移過程中,變更操作仍然在進行

final MongoCollection<Document> topicIncrCollection = getCollection(coll_topic_incr);
final MongoCollection<Document> topicNewCollection = getCollection(coll_topic_new);

ObjectId currentId = null;
Document sort = new Document("_id", 1);
MongoCursor<Document> cursor = null;

// 批量大小
int batchSize = 100;
AtomicInteger count = new AtomicInteger(0);

try {
    while (true) {

        boolean isWatchTaskStillRunning = watchFlag.getCount() > 0;

        // 按ID增量分段拉取
        if (currentId == null) {
            cursor = topicIncrCollection.find().sort(sort).limit(batchSize).iterator();
        } else {
            cursor = topicIncrCollection.find(new Document("_id", new Document("$gt", currentId)))
                    .sort(sort).limit(batchSize).iterator();
        }

        boolean hasIncrRecord = false;

        while (cursor.hasNext()) {
            hasIncrRecord = true;

            Document incrDoc = cursor.next();

            OperationType opType = OperationType.fromString(incrDoc.getString(field_op));
            ObjectId docId = incrDoc.getObjectId(field_key);

            // 記錄當前ID
            currentId = incrDoc.getObjectId("_id");

            if (opType == OperationType.DELETE) {

                topicNewCollection.deleteOne(new Document("_id", docId));
            } else {

                Document doc = incrDoc.get(field_data, Document.class);

                // channel轉換
                String oldChannel = doc.getString(field_channel);
                doc.put(field_channel, Channel.toNewName(oldChannel));

                // 啟用upsert
                UpdateOptions options = new UpdateOptions().upsert(true);

                topicNewCollection.replaceOne(new Document("_id", docId),
                        incrDoc.get(field_data, Document.class), options);
            }

            if (count.incrementAndGet() % 10 == 0) {
                logger.info("IncrTransferTask progress, count: {}", count.get());
            }
        }

        // 當watch停止工作(沒有更多變更),同時也沒有需要處理的記錄時,跳出
        if (!isWatchTaskStillRunning && !hasIncrRecord) {
            break;
        }

        sleep(200);
    }
} catch (Exception e) {
    logger.error("IncrTransferTask ERROR", e);
}

增量遷移的實現是一個**不斷 tail **的過程,利用 **_id 字段的有序特性 ** 進行分段遷移;
即記錄下當前處理的 _id 值,循環拉取在 該 _id 值之后的記錄進行處理。

增量表(topic_incr)中除了DELETE變更之外,其余的類型都保留了整個文檔,
因此可直接利用 replace + upsert 追加到新表。

  1. 最后,運行整個程序
[2018-07-26 19:44:16] INFO ~ IncrTransferTask progress, count: 2160
[2018-07-26 19:44:16] INFO ~ IncrTransferTask progress, count: 2170
[2018-07-26 19:44:27] INFO ~ all change task has stop, watch task quit.
[2018-07-26 19:44:27] INFO ~ IncrTransferTask finished, count: 2175
[2018-07-26 19:44:27] INFO ~ TYPE 美食:1405
[2018-07-26 19:44:27] INFO ~ TYPE 寵物:1410
[2018-07-26 19:44:27] INFO ~ TYPE 征婚:1428
[2018-07-26 19:44:27] INFO ~ TYPE 家居:1452
[2018-07-26 19:44:27] INFO ~ TYPE 教育:1441
[2018-07-26 19:44:27] INFO ~ TYPE 情感:1434
[2018-07-26 19:44:27] INFO ~ TYPE 旅游:1457
[2018-07-26 19:44:27] INFO ~ ALLCHANGE 12175
[2018-07-26 19:44:27] INFO ~ ALLWATCH 2175

查看 topic 表和 topic_new 表,發現兩者數量是相同的。
為了進一步確認一致性,我們對兩個表的分別做一次聚合統計:

topic表

db.topic.aggregate([{
    "$group":{
        "_id":"$channel",
        "total": {"$sum": 1}
        }
    },
    {
        "$sort": {"total":-1}
        }
    ])

topic_new表

db.topic_new.aggregate([{
    "$group":{
        "_id":"$channel",
        "total": {"$sum": 1}
        }
    },
    {
        "$sort": {"total":-1}
        }
    ])

前者輸出結果:

后者輸出結果:

前后對比的結果是一致的!

五、后續優化

前面的章節演示了一個增量遷移的樣例,在投入到線上運行之前,這些代碼還得繼續優化:

  • 寫入性能,線上的數據量可能會達到億級,在全量、增量遷移時應采用合理的批量化處理;
    另外可以通過增加並發線程,添置更多的Worker,分別對不同業務庫、不同表進行處理以提升效率。
    增量表存在冪等性,即回放多次其最終結果還是一致的,但需要保證表級有序,即一個表同時只有一個線程在進行增量回放。

  • 容錯能力,一旦 watch 監聽任務出現異常,要能夠從更早的時間點開始(使用startAtOperationTime參數),
    而如果寫入時發生失敗,要支持重試。

  • 回溯能力,做好必要的跟蹤記錄,比如將轉換失敗的ID號記錄下來,舊系統的數據需要保留,
    以免在事后追究某個數據問題時找不着北。

  • 數據轉換,新舊業務的差異不會很簡單,通常需要借助大量的轉換表來完成。

  • 一致性檢查,需要根據業務特點開發自己的一致性檢查工具,用來證明遷移后數據達到想要的一致性級別。

BTW,數據遷移一定要結合業務特性、架構差異來做考慮,否則還是在耍流氓。

示例代碼

小結

服務化系統中擴容、升級往往會進行數據遷移,對於業務量大,中斷敏感的系統通常會采用平滑遷移的方式。
MongoDB 3.6 版本后提供了 Change Stream 功能以支持應用訂閱數據的變更事件流,
本文使用 Stream 功能實現了增量平滑遷移的例子,這是一次嘗試,相信后續這樣的應用場景會越來越多。
歡迎關注"美碼師的公眾號" -- 唯美食與技術不可辜負" ,期待更多精彩內容-

附參考文檔

百億級數據遷移-58沈劍
MongoDB-ChangeStream
Use-ChangeStream To Handle Temperature


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM