mongoshake 是個開源的用戶mongo數據遷移和同步的工具,支持往各種目標源頭寫數據
具體:https://github.com/alibaba/MongoShake
有業務場景想把oplog 寫入到kafka 中,如果直接在collector.conf 設置kafka 信息會導致寫入kafka 中數據是亂碼
官方解釋是直接從collector 采集到的oplog是帶有控制信息的。直接寫入kafka 的內容使用時要進行剝離。
在下載mongoshake 包時他會提供receiver 進行控制信息剝離
mongo --> collector --> kafka --> receiver --> 業務
mongo --> collector --> receiver --> kafka --> 業務
這里更傾向於第二種
collector --> receiver 我采用的是tcp
配置:
collector.conf
tunnel = tcp
tunnel.address = 127.0.0.1:9300
receiver.conf
tunnel = tcp
tunnel.address = 127.0.0.1:9300
這里會很奇怪,也沒有設置kafka 的地方啊,這樣所有oplog剝離信息都會放在receiver 的log下
這里官方解釋是要求我們對源碼進行修改、編譯,源碼是GO 寫的,改起來也比較熟悉
下載官方源碼
src/mongoshake/receiver/replayer.go
在handler()
/* * Users should modify this function according to different demands. */ func (er *ExampleReplayer) handler() { config := sarama.NewConfig()//kafka配置 config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Return.Successes = true kafkaClient, err := sarama.NewSyncProducer([]string{conf.Options.KafkaHost}, config) if err != nil { LOG.Info("producer close,err:", err) return } defer kafkaClient.Close() for msg := range er.pendingQueue { count := uint64(len(msg.message.RawLogs)) if count == 0 { // probe request continue } // parse batched message oplogs := make([]*oplog.PartialLog, len(msg.message.RawLogs)) for i, raw := range msg.message.RawLogs { oplogs[i] = new(oplog.PartialLog) if err := bson.Unmarshal(raw, oplogs[i]); err != nil { // impossible switch, need panic and exit LOG.Crashf("unmarshal oplog[%v] failed[%v]", raw, err) return } oplogs[i].RawSize = len(raw) //這里是對oplog 做了一些定制化內容 kafkaOpLog := KafkaOpLog{} kafkaOpLog.Namespace = oplogs[i].Namespace kafkaOpLog.Query = oplogs[i].Query kafkaOpLog.Object = oplogs[i].Object.Map() kafkaOpLog.Operation = oplogs[i].Operation kafkaOpLog.Timestamp = oplogs[i].Timestamp msg := &sarama.ProducerMessage{} msg.Topic = conf.Options.KafkaTopic encode ,err := json.Marshal(kafkaOpLog) if err != nil { _ = LOG.Error("oplogs bson.MarshalJSON err",err) continue } msg.Value = sarama.StringEncoder(encode) msg.Key = sarama.StringEncoder(kafkaOpLog.Namespace) _, _, err = kafkaClient.SendMessage(msg) if err != nil { _ = LOG.Error("send message failed,", err) return } //原來源碼中只是打印了log //LOG.Info(oplogs[i]) // just print for test, users can modify to fulfill different needs } if callback := msg.completion; callback != nil { callback() // exec callback } // get the newest timestamp n := len(oplogs) lastTs := utils.TimestampToInt64(oplogs[n-1].Timestamp) er.Ack = lastTs LOG.Debug("handle ack[%v]", er.Ack) // add logical code below } }
然后go build 使用