flink從kafka獲取avro數據反序列化問題


使用 flume進行日志收集,然后通過kafkaSink並開啟useFlumeEventFormat參數傳輸到kafka時,flume默認使用 avro序列化方式進行傳輸,在flink中從kafka獲取數據時,使用

AvroDeserializationSchema 進行反序列化即可得到正確的數據對象:
Properties props= new Properties();
props.put("bootstrap.servers", "192.168.0.84:9092");
props.put("group.id", "jskj-project-group");
props.put("auto.offset.reset", "latest");
props.put("enable.auto.commit", "true");
AvroDeserializationSchema<AvroFlumeEvent> schema = AvroDeserializationSchema.forSpecific(AvroFlumeEvent.class);
FlinkKafkaConsumer<AvroFlumeEvent> consumer = new FlinkKafkaConsumer<AvroFlumeEvent>("topic", schema, props);


其中 AvroFlumeEvent 是flume 默認的avro序列化格式,我們在這引入就行,可以通過maven引入 flume-ng-core 依賴即可獲取到這個類。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM