一切新知識的學習,都離不開官網得相關閱讀,那么StreamingFileSink的官網介紹呢?
https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/connectors/streamfile_sink.html
flink在被阿里收購之后,官網也有了相當多的中文文檔,英文不好的同學可以直接看中文版的,不過還是建議時間充足的同學直接閱讀英文文檔,畢竟現在的chrome中的划詞翻譯很是方便了,哪里不會點哪里,慢慢的開發中常見詞匯也就能看個大概了。
1. 寫出文件的狀態
看這個圖片應該能明白,文件會分在不同的桶中,bucket中存在不同狀態的文件:
- In-progress :當前文件正在寫入中
- Pending :當處於 In-progress 狀態的文件關閉(closed)了,就變為 Pending 狀態
- Finished :在成功的 Checkpoint 后,Pending 狀態將變為 Finished 狀態
2. 簡單的字符串寫出示例
DataStreamSource<String> lines = FlinkUtil.createSocketStream("localhost", 8888);
StreamExecutionEnvironment env = FlinkUtil.getEnv();
// 設置checkpoint
env.enableCheckpointing(TimeUnit.SECONDS.toMillis(10));
OutputFileConfig config = OutputFileConfig
.builder()
.withPartPrefix("prefix")
.withPartSuffix(".txt")
.build();
final StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
/**
* 設置桶分配政策
* DateTimeBucketAssigner--默認的桶分配政策,默認基於時間的分配器,每小時產生一個桶,格式如下yyyy-MM-dd--HH
* BasePathBucketAssigner :將所有部分文件(part file)存儲在基本路徑中的分配器(單個全局桶)
*/
.withBucketAssigner(new DateTimeBucketAssigner<>())
/**
* 有三種滾動政策
* CheckpointRollingPolicy
* DefaultRollingPolicy
* OnCheckpointRollingPolicy
*/
.withRollingPolicy(
/**
* 滾動策略決定了寫出文件的狀態變化過程
* 1. In-progress :當前文件正在寫入中
* 2. Pending :當處於 In-progress 狀態的文件關閉(closed)了,就變為 Pending 狀態
* 3. Finished :在成功的 Checkpoint 后,Pending 狀態將變為 Finished 狀態
*
* 觀察到的現象
* 1.會根據本地時間和時區,先創建桶目錄
* 2.文件名稱規則:part-<subtaskIndex>-<partFileIndex>
* 3.在macos中默認不顯示隱藏文件,需要顯示隱藏文件才能看到處於In-progress和Pending狀態的文件,因為文件是按照.開頭命名的
*
*/
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.SECONDS.toMillis(2)) //設置滾動間隔
.withInactivityInterval(TimeUnit.SECONDS.toMillis(1)) //設置不活動時間間隔
.withMaxPartSize(1024 * 1024 * 1024) // 最大零件尺寸
.build())
.withOutputFileConfig(config)
.build();
lines.addSink(sink).setParallelism(1);
3. 寫出文件的滾動策略
數據寫入文件時,查看源碼可以知道
滾動策略是這么判斷的:
沒有處於inProgressPart狀態的文件 或者 DefaultRollingPolicy.shouldRollOnEvent成立,即打開的文件大小超過了滾動器中設置的大小
滾動文件時,首先關閉當前處於progress的part文件,然后創建一個新的 assembleNewPartPath,並且partCounter++(計數器)
StreamingFileSink繼承自RichSinkFunction,顯然之后執行一次,
該方法中注冊了一個定時器,定時器的執行時間為currentProcessingTime + bucketCheckInterval
其中bucketCheckInterval為調用StreamingFileSink.forRowFormat()時,默認創建的,其默認值為60000,也就是一分鍾
onProcessingTime方法繼承自ProcessingTimeCallback,此方法使用調度觸發器的時間戳調用。
該方法中設定了60秒的定時器,定時每60秒執行一次該方法
該方法中會調用buckets.onProcessingTime(currentTime)
里面判斷是否需要關閉part文件,注意是關閉而不是滾動
判斷條件為:part文件不為空 並且 DefaultRollingPolicy.shouldRollOnProcessingTime條件成立
即part文件存在,並且 (當前時間-part的創建時間 >= 滾動時間 或者 當前時間-part的最后修改時間 >= 不活躍時間)
snapshotState和initializeState方法繼承自CheckpointedFunction,用來構建快照或者恢復歷史狀態
其中snapshotState方法會調用buckets.snapshotState()方法,對桶的狀態進行快照處理
將所有處理活躍狀態的桶全部進行快照處理,做快照時會檢查是否需要滾動,滾動條件為:
part文件不為空 並且 DefaultRollingPolicy.shouldRollOnCheckpoint成立,即文件大小超過設定
滿足該條件時,就會關閉partFile
notifyCheckpointComplete方法繼承自CheckpointListener,用來通知檢查點完成
該方法中會調用onSuccessfulCompletionOfCheckpoint方法
會將已經關閉的(其實是處於Pending狀態的文件)part文件重命名