使用 flume進行日志收集,然后通過kafkaSink並開啟useFlumeEventFormat參數傳輸到kafka時,flume默認使用 avro序列化方式進行傳輸,在flink中從kafka獲取數據時,使用
AvroDeserializationSchema 進行反序列化即可得到正確的數據對象:
Properties props= new Properties();
props.put("bootstrap.servers", "192.168.0.84:9092");
props.put("group.id", "jskj-project-group");
props.put("auto.offset.reset", "latest");
props.put("enable.auto.commit", "true");
AvroDeserializationSchema<AvroFlumeEvent> schema = AvroDeserializationSchema.forSpecific(AvroFlumeEvent.class);
FlinkKafkaConsumer<AvroFlumeEvent> consumer = new FlinkKafkaConsumer<AvroFlumeEvent>("topic", schema, props);
其中 AvroFlumeEvent 是flume 默認的avro序列化格式,我們在這引入就行,可以通過maven引入 flume-ng-core 依賴即可獲取到這個類。