使用mongoshake進行oplog同步讀取,解決亂碼問題


mongoshake 是個開源的用戶mongo數據遷移和同步的工具,支持往各種目標源頭寫數據

具體:https://github.com/alibaba/MongoShake

 

有業務場景想把oplog 寫入到kafka 中,如果直接在collector.conf 設置kafka 信息會導致寫入kafka 中數據是亂碼

官方解釋是直接從collector 采集到的oplog是帶有控制信息的。直接寫入kafka 的內容使用時要進行剝離。

在下載mongoshake 包時他會提供receiver 進行控制信息剝離

mongo --> collector --> kafka --> receiver --> 業務

mongo --> collector --> receiver --> kafka --> 業務

這里更傾向於第二種

collector --> receiver 我采用的是tcp 

配置:

collector.conf

tunnel = tcp
tunnel.address = 127.0.0.1:9300

receiver.conf

tunnel = tcp
tunnel.address = 127.0.0.1:9300

這里會很奇怪,也沒有設置kafka 的地方啊,這樣所有oplog剝離信息都會放在receiver 的log下

這里官方解釋是要求我們對源碼進行修改、編譯,源碼是GO 寫的,改起來也比較熟悉

  下載官方源碼

src/mongoshake/receiver/replayer.go

在handler() 

/*
 * Users should modify this function according to different demands.
 */
func (er *ExampleReplayer) handler() {
	config := sarama.NewConfig()//kafka配置
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Return.Successes = true
	kafkaClient, err := sarama.NewSyncProducer([]string{conf.Options.KafkaHost}, config)
	if err != nil {
		LOG.Info("producer close,err:", err)
		return
	}

	defer kafkaClient.Close()


	for msg := range er.pendingQueue {
		count := uint64(len(msg.message.RawLogs))
		if count == 0 {
			// probe request
			continue
		}

		// parse batched message
		oplogs := make([]*oplog.PartialLog, len(msg.message.RawLogs))
		for i, raw := range msg.message.RawLogs {
			oplogs[i] = new(oplog.PartialLog)
			if err := bson.Unmarshal(raw, oplogs[i]); err != nil {
				// impossible switch, need panic and exit
				LOG.Crashf("unmarshal oplog[%v] failed[%v]", raw, err)
				return
			}
			oplogs[i].RawSize = len(raw)
                        //這里是對oplog 做了一些定制化內容
			kafkaOpLog := KafkaOpLog{}
			kafkaOpLog.Namespace = oplogs[i].Namespace
			kafkaOpLog.Query = oplogs[i].Query
			kafkaOpLog.Object = oplogs[i].Object.Map()
			kafkaOpLog.Operation = oplogs[i].Operation
			kafkaOpLog.Timestamp = oplogs[i].Timestamp

			msg := &sarama.ProducerMessage{}
			msg.Topic = conf.Options.KafkaTopic
			encode ,err := json.Marshal(kafkaOpLog)
			if err != nil {
				_ = LOG.Error("oplogs bson.MarshalJSON err",err)
				continue
			}
			msg.Value = sarama.StringEncoder(encode)
			msg.Key = sarama.StringEncoder(kafkaOpLog.Namespace)
			_, _, err = kafkaClient.SendMessage(msg)
			if err != nil {
				_ = LOG.Error("send message failed,", err)
				return
			}
                        //原來源碼中只是打印了log
			//LOG.Info(oplogs[i]) // just print for test, users can modify to fulfill different needs
		}

		if callback := msg.completion; callback != nil {
			callback() // exec callback
		}

		// get the newest timestamp
		n := len(oplogs)
		lastTs := utils.TimestampToInt64(oplogs[n-1].Timestamp)
		er.Ack = lastTs

		LOG.Debug("handle ack[%v]", er.Ack)

		// add logical code below
	}
}
   

然后go build   使用

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM