使用Flink時從Kafka中讀取Array[Byte]類型的Schema


使用Flink時,如果從Kafka中讀取輸入流,默認提供的是String類型的Schema:

val myConsumer = new FlinkKafkaConsumer08[String]("Topic名稱", new SimpleStringSchema(), properties);

如果存入Kafka中的數據不是JSON,而是Protobuf類型的數據,需要用二進制的Schema進行接收,可以自己實現一個類,很簡單,只有一行代碼

 class ByteArrayDeserializationSchema[T] extends AbstractDeserializationSchema[Array[Byte]]{
  @throws[IOException]
  override def deserialize(message: Array[Byte]): Array[Byte] = message
}

然后使用時,如下所示:

val myConsumer = new FlinkKafkaConsumer08[String]("Topic名稱", new ByteArrayDeserializationSchema[Array[Byte]](), properties);

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM