使用Flink時,如果從Kafka中讀取輸入流,默認提供的是String類型的Schema:
val myConsumer = new FlinkKafkaConsumer08[String]("Topic名稱", new SimpleStringSchema(), properties);
如果存入Kafka中的數據不是JSON,而是Protobuf類型的數據,需要用二進制的Schema進行接收,可以自己實現一個類,很簡單,只有一行代碼:
class ByteArrayDeserializationSchema[T] extends AbstractDeserializationSchema[Array[Byte]]{ @throws[IOException] override def deserialize(message: Array[Byte]): Array[Byte] = message }
然后使用時,如下所示:
val myConsumer = new FlinkKafkaConsumer08[String]("Topic名稱", new ByteArrayDeserializationSchema[Array[Byte]](), properties);