基於flink和drools的實時日志處理


1、背景

日志系統接入的日志種類多、格式復雜多樣,主流的有以下幾種日志:

  • filebeat采集到的文本日志,格式多樣
  • winbeat采集到的操作系統日志
  • 設備上報到logstash的syslog日志
  • 接入到kafka的業務日志

以上通過各種渠道接入的日志,存在2個主要的問題:

  • 格式不統一、不規范、標准化不夠
  • 如何從各類日志中提取出用戶關心的指標,挖掘更多的業務價值

為了解決上面2個問題,我們基於flink和drools規則引擎做了實時的日志處理服務。

2、系統架構

架構比較簡單,架構圖如下:

 

各類日志都是通過kafka匯總,做日志中轉。

flink消費kafka的數據,同時通過API調用拉取drools規則引擎,對日志做解析處理后,將解析后的數據存儲到Elasticsearch中,用於日志的搜索和分析等業務。

為了監控日志解析的實時狀態,flink會將日志處理的統計數據,如每分鍾處理的日志量,每種日志從各個機器IP來的日志量寫到Redis中,用於監控統計。

3、模塊介紹

系統項目命名為eagle。

eagle-api:基於springboot,作為drools規則引擎的寫入和讀取API服務。

eagle-common:通用類模塊。

eagle-log:基於flink的日志處理服務。

重點講一下eagle-log:

對接kafka、ES和Redis

對接kafka和ES都比較簡單,用的官方的connector(flink-connector-kafka-0.10和flink-connector-elasticsearch6),詳見代碼。

特定日志數據側輸出(side output)到kafka

private static final OutputTag<LogEntry> kafkaOutputTag = new OutputTag<LogEntry>("log-kafka-output",
            TypeInformation.of(LogEntry.class)) {
    };
DataStream<LogEntry> kafkaOutputStream = processedStream.getSideOutput(kafkaOutputTag);
sinkLogToKafka(parameter, kafkaOutputStream);

日志統計數據寫入到redis

對接Redis,最開始用的是org.apache.bahir提供的redis connector,后來發現靈活度不夠,就使用了Jedis。

在將統計數據寫入redis的時候,最開始用的keyby分組后緩存了分組數據,在sink中做統計處理后寫入,參考代碼如下:

        String name = "redis-agg-log";
        DataStream<Tuple2<String, List<LogEntry>>> keyedStream = dataSource.keyBy((KeySelector<LogEntry, String>) log -> log.getIndex())
                .timeWindow(Time.seconds(windowTime)).trigger(new CountTriggerWithTimeout<>(windowCount, TimeCharacteristic.ProcessingTime))
                .process(new ProcessWindowFunction<LogEntry, Tuple2<String, List<LogEntry>>, String, TimeWindow>() {
                    @Override
                    public void process(String s, Context context, Iterable<LogEntry> iterable, Collector<Tuple2<String, List<LogEntry>>> collector) {
                        ArrayList<LogEntry> logs = Lists.newArrayList(iterable);
                        if (logs.size() > 0) {
                            collector.collect(new Tuple2(s, logs));
                        }
                    }
                }).setParallelism(redisSinkParallelism).name(name).uid(name);

后來發現這樣做對內存消耗比較大,其實不需要緩存整個分組的原始數據,只需要一個統計數據就OK了,優化后:

        String name = "redis-agg-log";
        DataStream<LogStatWindowResult> keyedStream = dataSource.keyBy((KeySelector<LogEntry, String>) log -> log.getIndex())
                .timeWindow(Time.seconds(windowTime))
                .trigger(new CountTriggerWithTimeout<>(windowCount, TimeCharacteristic.ProcessingTime))
                .aggregate(new LogStatAggregateFunction(), new LogStatWindowFunction())
                .setParallelism(redisSinkParallelism).name(name).uid(name);

這里使用了flink的聚合函數和Accumulator,通過flink的agg操作做統計,減輕了內存消耗的壓力。

使用broadcast廣播drools規則引擎

1、drools規則流通過broadcast map state廣播出去。

2、kafka的數據流connect規則流處理日志。

//廣播規則流
env.addSource(new RuleSourceFunction(ruleUrl)).name(ruleName).uid(ruleName).setParallelism(1)
                .broadcast(ruleStateDescriptor);

//kafka數據流
FlinkKafkaConsumer010<LogEntry> source = new FlinkKafkaConsumer010<>(kafkaTopic, new LogSchema(), properties);
env.addSource(source).name(kafkaTopic).uid(kafkaTopic).setParallelism(kafkaParallelism);
//數據流connect規則流處理日志 BroadcastConnectedStream<LogEntry, RuleBase> connectedStreams = dataSource.connect(ruleSource); connectedStreams.process(new LogProcessFunction(ruleStateDescriptor, ruleBase)).setParallelism(processParallelism).name(name).uid(name);

具體細節參考開源代碼。

4、小結

本系統提供了一個基於flink的實時數據處理參考,對接了kafka、redis和elasticsearch,通過可配置的drools規則引擎,將數據處理邏輯配置化和動態化。

對於處理后的數據,也可以對接到其他sink,為其他各類業務平台提供數據的解析、清洗和標准化服務。

 

項目地址:

https://github.com/luxiaoxun/eagle

 

參考:

http://www.drools.org.cn/

https://flink.apache.org/

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/kafka.html

https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/elasticsearch.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM