Flink-1.10中的StreamingFileSink相關特性


一切新知識的學習,都離不開官網得相關閱讀,那么StreamingFileSink的官網介紹呢?

https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/connectors/streamfile_sink.html

flink在被阿里收購之后,官網也有了相當多的中文文檔,英文不好的同學可以直接看中文版的,不過還是建議時間充足的同學直接閱讀英文文檔,畢竟現在的chrome中的划詞翻譯很是方便了,哪里不會點哪里,慢慢的開發中常見詞匯也就能看個大概了。

1. 寫出文件的狀態

img

看這個圖片應該能明白,文件會分在不同的桶中,bucket中存在不同狀態的文件:

  1. In-progress :當前文件正在寫入中
  2. Pending :當處於 In-progress 狀態的文件關閉(closed)了,就變為 Pending 狀態
  3. Finished :在成功的 Checkpoint 后,Pending 狀態將變為 Finished 狀態

2. 簡單的字符串寫出示例

DataStreamSource<String> lines = FlinkUtil.createSocketStream("localhost", 8888);

        StreamExecutionEnvironment env = FlinkUtil.getEnv();
        // 設置checkpoint
        env.enableCheckpointing(TimeUnit.SECONDS.toMillis(10));

        OutputFileConfig config = OutputFileConfig
                .builder()
                .withPartPrefix("prefix")
                .withPartSuffix(".txt")
                .build();


        final StreamingFileSink<String> sink = StreamingFileSink
                .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
                /**
                 * 設置桶分配政策
                 * DateTimeBucketAssigner--默認的桶分配政策,默認基於時間的分配器,每小時產生一個桶,格式如下yyyy-MM-dd--HH
                 * BasePathBucketAssigner :將所有部分文件(part file)存儲在基本路徑中的分配器(單個全局桶)
                 */
                .withBucketAssigner(new DateTimeBucketAssigner<>())
                /**
                 * 有三種滾動政策
                 *  CheckpointRollingPolicy
                 *  DefaultRollingPolicy
                 *  OnCheckpointRollingPolicy
                 */
                .withRollingPolicy(
                        /**
                         * 滾動策略決定了寫出文件的狀態變化過程
                         * 1. In-progress :當前文件正在寫入中
                         * 2. Pending :當處於 In-progress 狀態的文件關閉(closed)了,就變為 Pending 狀態
                         * 3. Finished :在成功的 Checkpoint 后,Pending 狀態將變為 Finished 狀態
                         *
                         * 觀察到的現象
                         * 1.會根據本地時間和時區,先創建桶目錄
                         * 2.文件名稱規則:part-<subtaskIndex>-<partFileIndex>
                         * 3.在macos中默認不顯示隱藏文件,需要顯示隱藏文件才能看到處於In-progress和Pending狀態的文件,因為文件是按照.開頭命名的
                         *
                         */
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(TimeUnit.SECONDS.toMillis(2)) //設置滾動間隔
                                .withInactivityInterval(TimeUnit.SECONDS.toMillis(1)) //設置不活動時間間隔
                                .withMaxPartSize(1024 * 1024 * 1024) // 最大零件尺寸
                                .build())
                .withOutputFileConfig(config)
                .build();

        lines.addSink(sink).setParallelism(1);

3. 寫出文件的滾動策略

數據寫入文件時,查看源碼可以知道
滾動策略是這么判斷的:
沒有處於inProgressPart狀態的文件 或者 DefaultRollingPolicy.shouldRollOnEvent成立,即打開的文件大小超過了滾動器中設置的大小
滾動文件時,首先關閉當前處於progress的part文件,然后創建一個新的 assembleNewPartPath,並且partCounter++(計數器)

StreamingFileSink繼承自RichSinkFunction,顯然之后執行一次,
該方法中注冊了一個定時器,定時器的執行時間為currentProcessingTime + bucketCheckInterval
其中bucketCheckInterval為調用StreamingFileSink.forRowFormat()時,默認創建的,其默認值為60000,也就是一分鍾

onProcessingTime方法繼承自ProcessingTimeCallback,此方法使用調度觸發器的時間戳調用。
該方法中設定了60秒的定時器,定時每60秒執行一次該方法
該方法中會調用buckets.onProcessingTime(currentTime)
里面判斷是否需要關閉part文件,注意是關閉而不是滾動
判斷條件為:part文件不為空 並且 DefaultRollingPolicy.shouldRollOnProcessingTime條件成立
即part文件存在,並且 (當前時間-part的創建時間 >= 滾動時間 或者 當前時間-part的最后修改時間 >= 不活躍時間)

snapshotState和initializeState方法繼承自CheckpointedFunction,用來構建快照或者恢復歷史狀態
其中snapshotState方法會調用buckets.snapshotState()方法,對桶的狀態進行快照處理
將所有處理活躍狀態的桶全部進行快照處理,做快照時會檢查是否需要滾動,滾動條件為:
part文件不為空 並且 DefaultRollingPolicy.shouldRollOnCheckpoint成立,即文件大小超過設定
滿足該條件時,就會關閉partFile

notifyCheckpointComplete方法繼承自CheckpointListener,用來通知檢查點完成
該方法中會調用onSuccessfulCompletionOfCheckpoint方法
會將已經關閉的(其實是處於Pending狀態的文件)part文件重命名


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM