前兩天試了下 Flink SQL 寫 Hive,對 Sink 部分寫數據到 HDFS 的部分比較疑惑,特別是基於 checkpoint 的文件提交,所以看了下 StreamingFileSink 的源碼(Flink SQL 寫 hive 復用了這部分代碼)
StreamingFileSink 是 1.6 版本社區優化后推出的,為了替換 BucketingSink,BucketingSink 在 Flink 1.9 版本已經標記為 過期的
StreamingFileSink 的處理流程包含四級結構,依次往下調用:
* StreamingFIleSink 是 Sink,定義算子
* StreamingFileSinkHelper 是 sink 處理數據的工具類
* Buckets 對應所有桶
* Bucket 對應某一個桶
簡單的 Demo
object StreamingFileDemo { def main(args: Array[String]): Unit = { // environment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // env.enableCheckpointing(TimeUnit.MINUTES.toMillis(10)) env.setParallelism(2) val source = env.addSource(new SimpleStringSource) val sink: StreamingFileSink[String] = StreamingFileSink // row-encoding .forRowFormat(new Path("/out/tmp"), new SimpleStringEncoder[String]("UTF-8")) // 桶分配器 .withBucketAssigner(new DateTimeBucketAssigner[String]("yyyyMMdd", ZoneId.of("Asia/Shanghai"))) // 方便hive 表直接加載對應目錄作為hive 分區 //.withBucketAssigner(new DateTimeBucketAssigner[String]("'dt='yyyyMMdd", ZoneId.of("Asia/ShangHai"))) // 滾動策略 .withRollingPolicy( DefaultRollingPolicy.builder() // 2 minute 滾動一次 .withRolloverInterval(TimeUnit.MINUTES.toMillis(2)) // 2 minute 不活躍時間 .withInactivityInterval(TimeUnit.MINUTES.toMillis(2)) // 10mb .withMaxPartSize(10 * 1024 * 1024) .build()) .build() source.addSink(sink) env.execute("StreamingFileDemo") } }
寫數據部分
跳過算子處理輸入數據的通用邏輯,直接進入 StreamingFileSink
StreamingFileSink 初始化的時候調用 StreamingFileSink.initializeState 創建 Buckets 對象,作為參數傳入 StreamingFileSinkHelper 構造方法中,創建 StreamingFileSinkHelper 對象
public void initializeState(FunctionInitializationContext context) throws Exception { this.helper = new StreamingFileSinkHelper<>( bucketsBuilder.createBuckets(getRuntimeContext().getIndexOfThisSubtask()), context.isRestored(), context.getOperatorStateStore(), ((StreamingRuntimeContext) getRuntimeContext()).getProcessingTimeService(), bucketCheckInterval); }
StreamingFileSink 是在 invoke 中處理輸入數據
StreamingFileSink.invoke
@Override public void invoke(IN value, SinkFunction.Context context) throws Exception { this.helper.onElement( value, context.currentProcessingTime(), context.timestamp(), context.currentWatermark()); }
從代碼中可以看到,invoke 直接調用 StreamingFileSinkHelper.onElement 方法處理數據,同時傳入 輸入數據、當前處理時間、timestamp(數據時間,數據沒有時間則為 null)、當前水印
StreamingFileSinkHelper
// 處理時間觸發 @Override public void onProcessingTime(long timestamp) throws Exception { final long currentTime = procTimeService.getCurrentProcessingTime(); buckets.onProcessingTime(currentTime); procTimeService.registerTimer(currentTime + bucketCheckInterval, this); } // 輸入數據觸發 public void onElement( IN value, long currentProcessingTime, @Nullable Long elementTimestamp, long currentWatermark) throws Exception { buckets.onElement(value, currentProcessingTime, elementTimestamp, currentWatermark); }
從代碼可以看到, StreamingFileSinkHelper.onElement 方法也是直接調用 buckets.onElement 處理數據的,並且實現了 ProcessingTimeCallback 接口,使用定時器根據 bucketCheckInterval 定時調用 buckets.onProcessingTime (滾動 bucket ,可以看到是不支持事件時間滾動 bucket 的)
Buckets
@VisibleForTesting public Bucket<IN, BucketID> onElement( final IN value, final SinkFunction.Context context) throws Exception { return onElement( value, context.currentProcessingTime(), context.timestamp(), context.currentWatermark()); } public Bucket<IN, BucketID> onElement( final IN value, final long currentProcessingTime, @Nullable final Long elementTimestamp, final long currentWatermark) throws Exception { // setting the values in the bucketer context bucketerContext.update( elementTimestamp, currentWatermark, currentProcessingTime); // 獲取 bucket id final BucketID bucketId = bucketAssigner.getBucketId(value, bucketerContext); // 根據 bucketId get 或者創建 bucket final Bucket<IN, BucketID> bucket = getOrCreateBucketForBucketId(bucketId); // 寫數據到 bucket bucket.write(value, currentProcessingTime); // we update the global max counter here because as buckets become inactive and // get removed from the list of active buckets, at the time when we want to create // another part file for the bucket, if we start from 0 we may overwrite previous parts. // part file 計數 this.maxPartCounter = Math.max(maxPartCounter, bucket.getPartCounter()); return bucket; } private Bucket<IN, BucketID> getOrCreateBucketForBucketId(final BucketID bucketId) throws IOException { // 從活躍 bucket 中取,null 就創建一個 Bucket<IN, BucketID> bucket = activeBuckets.get(bucketId); if (bucket == null) { final Path bucketPath = assembleBucketPath(bucketId); bucket = bucketFactory.getNewBucket( subtaskIndex, bucketId, bucketPath, maxPartCounter, bucketWriter, rollingPolicy, outputFileConfig); activeBuckets.put(bucketId, bucket); notifyBucketCreate(bucket); } return bucket; } // 處理時間定時器調用 public void onProcessingTime(long timestamp) throws Exception { // 調用活躍 bucket 的 onProcessTime 方法 for (Bucket<IN, BucketID> bucket : activeBuckets.values()) { bucket.onProcessingTime(timestamp); } }
這里可以看到 數據數據進來,獲取 bucketId, 獲取 bucket 對象,並調用 bucket.write 寫數據 (終於看到 write 數據了)
// 寫數據 void write(IN element, long currentTime) throws IOException { // 根據 part 文件的 數據量判斷是否需要滾動文件,對應 主程序設置的 10M if (inProgressPart == null || rollingPolicy.shouldRollOnEvent(inProgressPart, element)) { if (LOG.isDebugEnabled()) { LOG.debug("Subtask {} closing in-progress part file for bucket id={} due to element {}.", subtaskIndex, bucketId, element); } // 滾動文件 rollPartFile(currentTime); } // 寫數據到 inprocess part inProgressPart.write(element, currentTime); } // 滾動 part 文件 private void rollPartFile(final long currentTime) throws IOException { // 關閉當前 part 文件 closePartFile(); // 創建新的 part 文件 Path final Path partFilePath = assembleNewPartPath(); // 打開 part Writer inProgressPart = bucketWriter.openNewInProgressFile(bucketId, partFilePath, currentTime); if (LOG.isDebugEnabled()) { LOG.debug("Subtask {} opening new part file \"{}\" for bucket id={}.", subtaskIndex, partFilePath.getName(), bucketId); } // part count ++ partCounter++; } private Path assembleNewPartPath() { return new Path(bucketPath, outputFileConfig.getPartPrefix() + '-' + subtaskIndex + '-' + partCounter + outputFileConfig.getPartSuffix()); } // 關閉 part 文件 private InProgressFileWriter.PendingFileRecoverable closePartFile() throws IOException { InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = null; if (inProgressPart != null) { // close 文件 pendingFileRecoverable = inProgressPart.closeForCommit(); // 添加到 待 提交 列表 pendingFileRecoverablesForCurrentCheckpoint.add(pendingFileRecoverable); inProgressPart = null; } return pendingFileRecoverable; // 處理時間 定時器調用,根據處理時間 滾動 part 文件 void onProcessingTime(long timestamp) throws IOException { if (inProgressPart != null && rollingPolicy.shouldRollOnProcessingTime(inProgressPart, timestamp)) { if (LOG.isDebugEnabled()) { LOG.debug("Subtask {} closing in-progress part file for bucket id={} due to processing time rolling policy " + "(in-progress file created @ {}, last updated @ {} and current time is {}).", subtaskIndex, bucketId, inProgressPart.getCreationTime(), inProgressPart.getLastUpdateTime(), timestamp); } closePartFile(); }
這里就是滾動文件和寫數據和代碼實現,真正的往文件系統寫數據會根據執行環境的不同,調用 RowWiseBucketWriter 根據傳入路徑創建不同的 RecoverableWriter
@Internal @Override public Buckets<IN, BucketID> createBuckets(int subtaskIndex) throws IOException { return new Buckets<>( basePath, bucketAssigner, bucketFactory, new RowWiseBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), encoder), rollingPolicy, subtaskIndex, outputFileConfig); }
Checkpoint snapshotState 部分
引用官網一段
```
IMPORTANT: Checkpointing needs to be enabled when using the StreamingFileSink. Part files can only be finalized on successful checkpoints.
If checkpointing is disabled part files will forever stay in `in-progress` or `pending` state and cannot be safely read by downstream systems.
在使用 StreamingFileSink 的時候,需要啟動 checkpoint, Part 文件只有在 checkpoint 成功才能最終確定。
如果未啟用 checkpoint part 文件會永遠保持在 'in-progress' 或 'pending' 狀態,下游系統不能安全的讀取。
```
簡單來說就是,必須開啟 checkpoint,不然文件不能轉換成完成狀態(數據是完整的,文件狀態不完成),所以 checkpoint 是必須的
checkpoint 過程, 從收到 CheckpointCoordinator 的 snapshot 消息開始,跳過 StreamingFileSink/StreamingFileSinkHelper 直接看 Buckets.snapshotState 部分
public void snapshotState( final long checkpointId, final ListState<byte[]> bucketStatesContainer, final ListState<Long> partCounterStateContainer) throws Exception { Preconditions.checkState( bucketWriter != null && bucketStateSerializer != null, "sink has not been initialized"); LOG.info("Subtask {} checkpointing for checkpoint with id={} (max part counter={}).", subtaskIndex, checkpointId, maxPartCounter); bucketStatesContainer.clear(); partCounterStateContainer.clear(); snapshotActiveBuckets(checkpointId, bucketStatesContainer); partCounterStateContainer.add(maxPartCounter); } private void snapshotActiveBuckets( final long checkpointId, final ListState<byte[]> bucketStatesContainer) throws Exception { // 獲取 活躍 bucket 的 status,放入 bucketStatesContainer for (Bucket<IN, BucketID> bucket : activeBuckets.values()) { final BucketState<BucketID> bucketState = bucket.onReceptionOfCheckpoint(checkpointId); final byte[] serializedBucketState = SimpleVersionedSerialization .writeVersionAndSerialize(bucketStateSerializer, bucketState); bucketStatesContainer.add(serializedBucketState); if (LOG.isDebugEnabled()) { LOG.debug("Subtask {} checkpointing: {}", subtaskIndex, bucketState); } } }
Bucket
BucketState<BucketID> onReceptionOfCheckpoint(long checkpointId) throws IOException { prepareBucketForCheckpointing(checkpointId); InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = null; long inProgressFileCreationTime = Long.MAX_VALUE; if (inProgressPart != null) { // 會 close 文件 inProgressFileRecoverable = inProgressPart.persist(); inProgressFileCreationTime = inProgressPart.getCreationTime(); this.inProgressFileRecoverablesPerCheckpoint.put(checkpointId, inProgressFileRecoverable); } return new BucketState<>(bucketId, bucketPath, inProgressFileCreationTime, inProgressFileRecoverable, pendingFileRecoverablesPerCheckpoint); } private void prepareBucketForCheckpointing(long checkpointId) throws IOException { // 判斷時間根據 checkpoint 滾動文件 if (inProgressPart != null && rollingPolicy.shouldRollOnCheckpoint(inProgressPart)) { if (LOG.isDebugEnabled()) { LOG.debug("Subtask {} closing in-progress part file for bucket id={} on checkpoint.", subtaskIndex, bucketId); } closePartFile(); } if (!pendingFileRecoverablesForCurrentCheckpoint.isEmpty()) { // 添加到 pendingFileRecoverablesPerCheckpoint 中 pendingFileRecoverablesPerCheckpoint.put(checkpointId, pendingFileRecoverablesForCurrentCheckpoint); pendingFileRecoverablesForCurrentCheckpoint = new ArrayList<>(); } }
將活躍桶的狀態放入 bucketStatesContainer(ListState) 中
Checkpoint onCheckpointComplete
收到 收到 CheckpointCoordinator 的 complete 消息,跳過 StreamingFileSink/StreamingFileSinkHelper 直接看 Buckets.commitUpToCheckpoint 部分
public void commitUpToCheckpoint(final long checkpointId) throws IOException { final Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>> activeBucketIt = activeBuckets.entrySet().iterator(); LOG.info("Subtask {} received completion notification for checkpoint with id={}.", subtaskIndex, checkpointId); // 對 所有活躍 bucket 調用 bucket.onSuccessfulCompletionOfCheckpoint while (activeBucketIt.hasNext()) { final Bucket<IN, BucketID> bucket = activeBucketIt.next().getValue(); bucket.onSuccessfulCompletionOfCheckpoint(checkpointId); if (!bucket.isActive()) { // We've dealt with all the pending files and the writer for this bucket is not currently open. // Therefore this bucket is currently inactive and we can remove it from our state. activeBucketIt.remove(); notifyBucketInactive(bucket); } } }
Bucket
void onSuccessfulCompletionOfCheckpoint(long checkpointId) throws IOException { checkNotNull(bucketWriter); Iterator<Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>>> it = pendingFileRecoverablesPerCheckpoint.headMap(checkpointId, true) .entrySet().iterator(); while (it.hasNext()) { Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>> entry = it.next(); // 遍歷所有 pendingFileRecoverablesPerCheckpoint 中的 bucket for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable : entry.getValue()) { // 恢復 pending 文件,提交,文件名會修改成 完成狀態格式 (去掉 pending/in-process) bucketWriter.recoverPendingFile(pendingFileRecoverable).commit(); } it.remove(); } // 清理 當前 checkpoint cleanupInProgressFileRecoverables(checkpointId); } private void cleanupInProgressFileRecoverables(long checkpointId) throws IOException { Iterator<Map.Entry<Long, InProgressFileWriter.InProgressFileRecoverable>> it = inProgressFileRecoverablesPerCheckpoint.headMap(checkpointId, false) .entrySet().iterator(); while (it.hasNext()) { final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = it.next().getValue(); // this check is redundant, as we only put entries in the inProgressFileRecoverablesPerCheckpoint map // list when the requiresCleanupOfInProgressFileRecoverableState() returns true, but having it makes // the code more readable. final boolean successfullyDeleted = bucketWriter.cleanupInProgressFileRecoverable(inProgressFileRecoverable); if (LOG.isDebugEnabled() && successfullyDeleted) { LOG.debug("Subtask {} successfully deleted incomplete part for bucket id={}.", subtaskIndex, bucketId); } it.remove(); } }
bucketWriter.recoverPendingFile(pendingFileRecoverable).commit() 在不同的環境下有不同的表現:
本地: RowWiseBucketWriter --> LocalCommitter --> LocalCommitter.commit 本地直接修改文件名
HDFS: RowWiseBucketWriter --> RecoverableFsDataOutputStream.Committer --> HadoopFsCommitter.commit fs.rename 文件名
StreamingFlinkSink 處於一致性考慮在設計的時候就定義了必須開啟 checkpoint ,但是有的任務其實是不太在意一致性的,可以接受部分數據的丟失,最重要的是不想開啟 checkpoint(比如實時檢測場景,只關心最近很短時間內的數據,如果任務失敗了,直接拉起重新開始,而不是回到上次 checkpoint 時間點 )
直接在 Bucket.closePartFile 中提交文件,並且不放入 checkpoint 中,就可以在 滾動 part 文件的時候,提交文件
bucketWriter.recoverPendingFile(pendingFileRecoverable).commit(); // not add to checkpoint // pendingFileRecoverablesForCurrentCheckpoint.add(pendingFileRecoverable);
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文