使用 flume进行日志收集,然后通过kafkaSink并开启useFlumeEventFormat参数传输到kafka时,flume默认使用 avro序列化方式进行传输,在flink中从kafka获取数据时,使用
AvroDeserializationSchema 进行反序列化即可得到正确的数据对象:
Properties props= new Properties();
props.put("bootstrap.servers", "192.168.0.84:9092");
props.put("group.id", "jskj-project-group");
props.put("auto.offset.reset", "latest");
props.put("enable.auto.commit", "true");
AvroDeserializationSchema<AvroFlumeEvent> schema = AvroDeserializationSchema.forSpecific(AvroFlumeEvent.class);
FlinkKafkaConsumer<AvroFlumeEvent> consumer = new FlinkKafkaConsumer<AvroFlumeEvent>("topic", schema, props);
其中 AvroFlumeEvent 是flume 默认的avro序列化格式,我们在这引入就行,可以通过maven引入 flume-ng-core 依赖即可获取到这个类。