Flink使用SideOutPut替換Split實現分流


以前的數據分析項目(版本1.4.2),對從Kafka讀取的原始數據流,調用split接口實現分流.
新項目決定使用Flink 1.7.2,使用split接口進行分流的時候,發現接口被標記為depracted(后續可能會被移除).
搜索相關文檔,發現新版本Flink中推薦使用帶外數據進行分流.

預先建立OutputTag實例(LogEntity是從kafka讀取的日志實例類).

private static final OutputTag<LogEntity> APP_LOG_TAG = new OutputTag<>("appLog", TypeInformation.of(LogEntity.class));
private static final OutputTag<LogEntity> ANALYZE_METRIC_TAG = new OutputTag<>("analyzeMetricLog", TypeInformation.of(LogEntity.class));

kafka讀取的原始數據,通過process接口,打上相應標記.

    private static SingleOutputStreamOperator<LogEntity> sideOutStream(DataStream<LogEntity> rawLogStream) {
        return rawLogStream
                .process(new ProcessFunction<LogEntity, LogEntity>() {
                    @Override
                    public void processElement(LogEntity entity, Context ctx, Collector<LogEntity> out) throws Exception {
                        // 根據日志等級,給對象打上不同的標記
                        if (entity.getLevel().equals(ANALYZE_LOG_LEVEL)) {
                            ctx.output(ANALYZE_METRIC_TAG, entity);
                        } else {
                            ctx.output(APP_LOG_TAG, entity);
                        }
                    }
                })
                .name("RawLogEntitySplitStream");
    }

    // 調用函數,對原始數據流中的對象進行標記
    SingleOutputStreamOperator<LogEntity> sideOutLogStream = sideOutStream(rawLogStream);
    // 根據標記,獲取不同的數據流,以便后續進行進一步分析
    DataStream<LogEntity> appLogStream = sideOutLogStream.getSideOutput(APP_LOG_TAG);
    DataStream<LogEntity> rawAnalyzeMetricLogStream = sideOutLogStream.getSideOutput(ANALYZE_METRIC_TAG);

通過以上步驟,就實現了數據流的切分.

PS:
如果您覺得我的文章對您有幫助,請關注我的微信公眾號,謝謝!
程序員打怪之路


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM