用canal同步binlog到kafka,spark streaming消費kafka topic亂碼問題


canal 1.1.1版本之后, 默認支持將canal server接收到的binlog數據直接投遞到MQ, 目前默認支持的MQ系統有kafka和RocketMQ。

在投遞的時候我們使用的是非壓平的消息模式(canal.mq.flatMessage =false //是否為flat json格式對象),然后消費topic的時候就一直無法正常顯示和序列化,通過kafka-console-consumer.sh命令收到的消息如下圖

 

在github上也能找到相關問題

canal-kafka 數據同步到kafka之后,kafka topic亂碼:https://github.com/alibaba/canal/issues/898

canal.kafka 用bin/kafka-console-consumer.sh命令收到亂碼:https://github.com/alibaba/canal/issues/1013

 在非flatmessage模式下向kafka數據投遞傳輸的是數據包,收到數據后還要解包成對應的message,可參考canal client中的kafka實現, github地址為 https://github.com/alibaba/canal/tree/master/client/src/main/java/com/alibaba/otter/canal/client/kafka

打開連接后 kafkaConsumer = new KafkaConsumer<String, Message>(properties);

參考這種操作只是簡單的kafka能夠收消息,結合spark streaming收消息也差不多。

在kafkaparam中設置key和value的反序列化方式

"key.deserializer" -> classOf[StringDeserializer].getName
"value.deserializer" -> classOf[MessageDeserializer].getName

 在拉取消息的時候設置接受格式為Array[Byte]

val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, topics)

在處理每個RDD的時候再對內容進行反序列化:

val parData = rdd.mapPartitions(t => {
val mesDesc = new MessageDeserializer
var list = List[consumerUser]()
while (t.hasNext) {
try {
val value = t.next()._2
val message = mesDesc.deserialize("", value)
//val listMaps = CanalParse.parseData(message)
//邏輯
} catch {
case e: Exception => log.error(e)
}
}
list.iterator
})

這樣就拿到了message對象。

依賴jar包

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.3</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.8.2.1</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.kafka.client</artifactId>
<version>1.1.0</version>
</dependency>


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM