flink从kafka获取avro数据反序列化问题


使用 flume进行日志收集,然后通过kafkaSink并开启useFlumeEventFormat参数传输到kafka时,flume默认使用 avro序列化方式进行传输,在flink中从kafka获取数据时,使用

AvroDeserializationSchema 进行反序列化即可得到正确的数据对象:
Properties props= new Properties();
props.put("bootstrap.servers", "192.168.0.84:9092");
props.put("group.id", "jskj-project-group");
props.put("auto.offset.reset", "latest");
props.put("enable.auto.commit", "true");
AvroDeserializationSchema<AvroFlumeEvent> schema = AvroDeserializationSchema.forSpecific(AvroFlumeEvent.class);
FlinkKafkaConsumer<AvroFlumeEvent> consumer = new FlinkKafkaConsumer<AvroFlumeEvent>("topic", schema, props);


其中 AvroFlumeEvent 是flume 默认的avro序列化格式,我们在这引入就行,可以通过maven引入 flume-ng-core 依赖即可获取到这个类。


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM