【源碼】Flink StreamingFileSink 處理流程


前兩天試了下 Flink SQL 寫 Hive,對 Sink 部分寫數據到 HDFS 的部分比較疑惑,特別是基於 checkpoint 的文件提交,所以看了下 StreamingFileSink 的源碼(Flink SQL 寫 hive 復用了這部分代碼)

StreamingFileSink 是 1.6 版本社區優化后推出的,為了替換 BucketingSink,BucketingSink 在 Flink 1.9 版本已經標記為 過期的

StreamingFileSink 的處理流程包含四級結構,依次往下調用:

* StreamingFIleSink 是 Sink,定義算子
* StreamingFileSinkHelper 是 sink 處理數據的工具類
* Buckets 對應所有桶
* Bucket 對應某一個桶

簡單的 Demo

object StreamingFileDemo {

  def main(args: Array[String]): Unit = {
    // environment
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //    env.enableCheckpointing(TimeUnit.MINUTES.toMillis(10))
    env.setParallelism(2)

    val source = env.addSource(new SimpleStringSource)

    val sink: StreamingFileSink[String] = StreamingFileSink
      // row-encoding
      .forRowFormat(new Path("/out/tmp"), new SimpleStringEncoder[String]("UTF-8"))
      // 桶分配器
      .withBucketAssigner(new DateTimeBucketAssigner[String]("yyyyMMdd", ZoneId.of("Asia/Shanghai")))
      // 方便hive 表直接加載對應目錄作為hive 分區
      //.withBucketAssigner(new DateTimeBucketAssigner[String]("'dt='yyyyMMdd", ZoneId.of("Asia/ShangHai")))
      // 滾動策略
      .withRollingPolicy(
      DefaultRollingPolicy.builder()
        // 2 minute 滾動一次
        .withRolloverInterval(TimeUnit.MINUTES.toMillis(2))
        // 2 minute 不活躍時間
        .withInactivityInterval(TimeUnit.MINUTES.toMillis(2))
        // 10mb
        .withMaxPartSize(10 * 1024 * 1024)
        .build())
      .build()

    source.addSink(sink)
    env.execute("StreamingFileDemo")

  }
}

寫數據部分

跳過算子處理輸入數據的通用邏輯,直接進入 StreamingFileSink

StreamingFileSink 初始化的時候調用 StreamingFileSink.initializeState 創建 Buckets 對象,作為參數傳入 StreamingFileSinkHelper 構造方法中,創建 StreamingFileSinkHelper 對象

public void initializeState(FunctionInitializationContext context) throws Exception {
    this.helper = new StreamingFileSinkHelper<>(
        bucketsBuilder.createBuckets(getRuntimeContext().getIndexOfThisSubtask()),
        context.isRestored(),
        context.getOperatorStateStore(),
        ((StreamingRuntimeContext) getRuntimeContext()).getProcessingTimeService(),
        bucketCheckInterval);
  }

StreamingFileSink 是在 invoke 中處理輸入數據

StreamingFileSink.invoke

@Override
  public void invoke(IN value, SinkFunction.Context context) throws Exception {
    this.helper.onElement(
      value,
      context.currentProcessingTime(),
      context.timestamp(),
      context.currentWatermark());
  }

從代碼中可以看到,invoke 直接調用 StreamingFileSinkHelper.onElement 方法處理數據,同時傳入 輸入數據、當前處理時間、timestamp(數據時間,數據沒有時間則為 null)、當前水印

StreamingFileSinkHelper

// 處理時間觸發
@Override
public void onProcessingTime(long timestamp) throws Exception {
  final long currentTime = procTimeService.getCurrentProcessingTime();
  buckets.onProcessingTime(currentTime);
  procTimeService.registerTimer(currentTime + bucketCheckInterval, this);
}
// 輸入數據觸發
public void onElement(
    IN value,
    long currentProcessingTime,
    @Nullable Long elementTimestamp,
    long currentWatermark) throws Exception {
  buckets.onElement(value, currentProcessingTime, elementTimestamp, currentWatermark);
}

從代碼可以看到, StreamingFileSinkHelper.onElement 方法也是直接調用 buckets.onElement 處理數據的,並且實現了 ProcessingTimeCallback 接口,使用定時器根據 bucketCheckInterval 定時調用 buckets.onProcessingTime (滾動 bucket ,可以看到是不支持事件時間滾動 bucket 的)

Buckets

@VisibleForTesting
  public Bucket<IN, BucketID> onElement(
      final IN value,
      final SinkFunction.Context context) throws Exception {
    return onElement(
        value,
        context.currentProcessingTime(),
        context.timestamp(),
        context.currentWatermark());
  }

  public Bucket<IN, BucketID> onElement(
      final IN value,
      final long currentProcessingTime,
      @Nullable final Long elementTimestamp,
      final long currentWatermark) throws Exception {
    // setting the values in the bucketer context
    bucketerContext.update(
        elementTimestamp,
        currentWatermark,
        currentProcessingTime);
    // 獲取 bucket id
    final BucketID bucketId = bucketAssigner.getBucketId(value, bucketerContext);
    // 根據 bucketId get 或者創建 bucket
    final Bucket<IN, BucketID> bucket = getOrCreateBucketForBucketId(bucketId);
    // 寫數據到 bucket
    bucket.write(value, currentProcessingTime);

    // we update the global max counter here because as buckets become inactive and
    // get removed from the list of active buckets, at the time when we want to create
    // another part file for the bucket, if we start from 0 we may overwrite previous parts.
    // part file 計數
    this.maxPartCounter = Math.max(maxPartCounter, bucket.getPartCounter());
    return bucket;
  }

  private Bucket<IN, BucketID> getOrCreateBucketForBucketId(final BucketID bucketId) throws IOException {
    // 從活躍 bucket 中取,null 就創建一個
    Bucket<IN, BucketID> bucket = activeBuckets.get(bucketId);
    if (bucket == null) {
      final Path bucketPath = assembleBucketPath(bucketId);
      bucket = bucketFactory.getNewBucket(
          subtaskIndex,
          bucketId,
          bucketPath,
          maxPartCounter,
          bucketWriter,
          rollingPolicy,
          outputFileConfig);
      activeBuckets.put(bucketId, bucket);
      notifyBucketCreate(bucket);
    }
    return bucket;
  }
  // 處理時間定時器調用
  public void onProcessingTime(long timestamp) throws Exception {
    // 調用活躍 bucket 的 onProcessTime 方法
    for (Bucket<IN, BucketID> bucket : activeBuckets.values()) {
      bucket.onProcessingTime(timestamp);
    }
  }

這里可以看到 數據數據進來,獲取 bucketId, 獲取 bucket 對象,並調用 bucket.write 寫數據 (終於看到 write 數據了)

// 寫數據
void write(IN element, long currentTime) throws IOException {
    // 根據 part 文件的 數據量判斷是否需要滾動文件,對應 主程序設置的 10M
    if (inProgressPart == null || rollingPolicy.shouldRollOnEvent(inProgressPart, element)) {

      if (LOG.isDebugEnabled()) {
        LOG.debug("Subtask {} closing in-progress part file for bucket id={} due to element {}.",
            subtaskIndex, bucketId, element);
      }
      // 滾動文件
      rollPartFile(currentTime);
    }
    // 寫數據到 inprocess part 
    inProgressPart.write(element, currentTime);
  }
  // 滾動 part 文件
  private void rollPartFile(final long currentTime) throws IOException {
    // 關閉當前 part 文件
    closePartFile();
    // 創建新的 part 文件 Path
    final Path partFilePath = assembleNewPartPath();
    // 打開 part Writer
    inProgressPart = bucketWriter.openNewInProgressFile(bucketId, partFilePath, currentTime);

    if (LOG.isDebugEnabled()) {
      LOG.debug("Subtask {} opening new part file \"{}\" for bucket id={}.",
          subtaskIndex, partFilePath.getName(), bucketId);
    }
    // part count ++
    partCounter++;
  }

  private Path assembleNewPartPath() {
    return new Path(bucketPath, outputFileConfig.getPartPrefix() + '-' + subtaskIndex + '-' + partCounter + outputFileConfig.getPartSuffix());
  }
  // 關閉 part 文件
  private InProgressFileWriter.PendingFileRecoverable closePartFile() throws IOException {
    InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = null;
    if (inProgressPart != null) {
      // close 文件
      pendingFileRecoverable = inProgressPart.closeForCommit();
      // 添加到 待 提交 列表
      pendingFileRecoverablesForCurrentCheckpoint.add(pendingFileRecoverable);
      inProgressPart = null;
    }
    return pendingFileRecoverable;
  
  // 處理時間 定時器調用,根據處理時間 滾動 part 文件
  void onProcessingTime(long timestamp) throws IOException {
    if (inProgressPart != null && rollingPolicy.shouldRollOnProcessingTime(inProgressPart, timestamp)) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Subtask {} closing in-progress part file for bucket id={} due to processing time rolling policy " +
            "(in-progress file created @ {}, last updated @ {} and current time is {}).",
            subtaskIndex, bucketId, inProgressPart.getCreationTime(), inProgressPart.getLastUpdateTime(), timestamp);
      }
      closePartFile();
    }
  

這里就是滾動文件和寫數據和代碼實現,真正的往文件系統寫數據會根據執行環境的不同,調用 RowWiseBucketWriter 根據傳入路徑創建不同的 RecoverableWriter

@Internal
        @Override
        public Buckets<IN, BucketID> createBuckets(int subtaskIndex) throws IOException {
            return new Buckets<>(
                    basePath,
                    bucketAssigner,
                    bucketFactory,
                    new RowWiseBucketWriter<>(FileSystem.get(basePath.toUri()).createRecoverableWriter(), encoder),
                    rollingPolicy,
                    subtaskIndex,
                    outputFileConfig);
        }

 

Checkpoint snapshotState 部分

引用官網一段
```
IMPORTANT: Checkpointing needs to be enabled when using the StreamingFileSink. Part files can only be finalized on successful checkpoints.

If checkpointing is disabled part files will forever stay in `in-progress` or `pending` state and cannot be safely read by downstream systems.

在使用 StreamingFileSink 的時候,需要啟動 checkpoint, Part 文件只有在 checkpoint 成功才能最終確定。

如果未啟用 checkpoint part 文件會永遠保持在 'in-progress' 或 'pending' 狀態,下游系統不能安全的讀取。

```

簡單來說就是,必須開啟 checkpoint,不然文件不能轉換成完成狀態(數據是完整的,文件狀態不完成),所以 checkpoint 是必須的

checkpoint 過程, 從收到 CheckpointCoordinator 的 snapshot 消息開始,跳過 StreamingFileSink/StreamingFileSinkHelper 直接看 Buckets.snapshotState 部分

public void snapshotState(
      final long checkpointId,
      final ListState<byte[]> bucketStatesContainer,
      final ListState<Long> partCounterStateContainer) throws Exception {

    Preconditions.checkState(
      bucketWriter != null && bucketStateSerializer != null,
        "sink has not been initialized");

    LOG.info("Subtask {} checkpointing for checkpoint with id={} (max part counter={}).",
        subtaskIndex, checkpointId, maxPartCounter);

    bucketStatesContainer.clear();
    partCounterStateContainer.clear();

    snapshotActiveBuckets(checkpointId, bucketStatesContainer);
    partCounterStateContainer.add(maxPartCounter);
  }

  private void snapshotActiveBuckets(
      final long checkpointId,
      final ListState<byte[]> bucketStatesContainer) throws Exception {
    // 獲取 活躍 bucket 的 status,放入 bucketStatesContainer
    for (Bucket<IN, BucketID> bucket : activeBuckets.values()) {
      final BucketState<BucketID> bucketState = bucket.onReceptionOfCheckpoint(checkpointId);

      final byte[] serializedBucketState = SimpleVersionedSerialization
          .writeVersionAndSerialize(bucketStateSerializer, bucketState);

      bucketStatesContainer.add(serializedBucketState);

      if (LOG.isDebugEnabled()) {
        LOG.debug("Subtask {} checkpointing: {}", subtaskIndex, bucketState);
      }
    }
  }

Bucket

BucketState<BucketID> onReceptionOfCheckpoint(long checkpointId) throws IOException {
    prepareBucketForCheckpointing(checkpointId);

    InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = null;
    long inProgressFileCreationTime = Long.MAX_VALUE;

    if (inProgressPart != null) {
      // 會 close 文件
      inProgressFileRecoverable = inProgressPart.persist();
      inProgressFileCreationTime = inProgressPart.getCreationTime();
      this.inProgressFileRecoverablesPerCheckpoint.put(checkpointId, inProgressFileRecoverable);
    }

    return new BucketState<>(bucketId, bucketPath, inProgressFileCreationTime, inProgressFileRecoverable, pendingFileRecoverablesPerCheckpoint);
  }

  private void prepareBucketForCheckpointing(long checkpointId) throws IOException {
    // 判斷時間根據 checkpoint 滾動文件
    if (inProgressPart != null && rollingPolicy.shouldRollOnCheckpoint(inProgressPart)) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Subtask {} closing in-progress part file for bucket id={} on checkpoint.", subtaskIndex, bucketId);
      }
      closePartFile();
    }

    if (!pendingFileRecoverablesForCurrentCheckpoint.isEmpty()) {
      // 添加到 pendingFileRecoverablesPerCheckpoint 中
      pendingFileRecoverablesPerCheckpoint.put(checkpointId, pendingFileRecoverablesForCurrentCheckpoint);
      pendingFileRecoverablesForCurrentCheckpoint = new ArrayList<>();
    }
  }

將活躍桶的狀態放入 bucketStatesContainer(ListState) 中

Checkpoint onCheckpointComplete

收到 收到 CheckpointCoordinator 的 complete 消息,跳過 StreamingFileSink/StreamingFileSinkHelper 直接看 Buckets.commitUpToCheckpoint 部分

  public void commitUpToCheckpoint(final long checkpointId) throws IOException {
    final Iterator<Map.Entry<BucketID, Bucket<IN, BucketID>>> activeBucketIt =
        activeBuckets.entrySet().iterator();

    LOG.info("Subtask {} received completion notification for checkpoint with id={}.", subtaskIndex, checkpointId);
    // 對 所有活躍 bucket 調用 bucket.onSuccessfulCompletionOfCheckpoint
    while (activeBucketIt.hasNext()) {
      final Bucket<IN, BucketID> bucket = activeBucketIt.next().getValue();
      bucket.onSuccessfulCompletionOfCheckpoint(checkpointId);

      if (!bucket.isActive()) {
        // We've dealt with all the pending files and the writer for this bucket is not currently open.
        // Therefore this bucket is currently inactive and we can remove it from our state.
        activeBucketIt.remove();
        notifyBucketInactive(bucket);
      }
    }
  }

Bucket

void onSuccessfulCompletionOfCheckpoint(long checkpointId) throws IOException {
    checkNotNull(bucketWriter);

    Iterator<Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>>> it =
        pendingFileRecoverablesPerCheckpoint.headMap(checkpointId, true)
            .entrySet().iterator();

    while (it.hasNext()) {
      Map.Entry<Long, List<InProgressFileWriter.PendingFileRecoverable>> entry = it.next();
      // 遍歷所有 pendingFileRecoverablesPerCheckpoint 中的 bucket
      for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable : entry.getValue()) {
        // 恢復 pending 文件,提交,文件名會修改成 完成狀態格式 (去掉 pending/in-process)
        bucketWriter.recoverPendingFile(pendingFileRecoverable).commit();
      }
      it.remove();
    }
    // 清理 當前 checkpoint
    cleanupInProgressFileRecoverables(checkpointId);
  }

  private void cleanupInProgressFileRecoverables(long checkpointId) throws IOException {
    Iterator<Map.Entry<Long, InProgressFileWriter.InProgressFileRecoverable>> it =
        inProgressFileRecoverablesPerCheckpoint.headMap(checkpointId, false)
            .entrySet().iterator();

    while (it.hasNext()) {
      final InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = it.next().getValue();

      // this check is redundant, as we only put entries in the inProgressFileRecoverablesPerCheckpoint map
      // list when the requiresCleanupOfInProgressFileRecoverableState() returns true, but having it makes
      // the code more readable.

      final boolean successfullyDeleted = bucketWriter.cleanupInProgressFileRecoverable(inProgressFileRecoverable);
      if (LOG.isDebugEnabled() && successfullyDeleted) {
        LOG.debug("Subtask {} successfully deleted incomplete part for bucket id={}.", subtaskIndex, bucketId);
      }
      it.remove();
    }
  }

 

bucketWriter.recoverPendingFile(pendingFileRecoverable).commit() 在不同的環境下有不同的表現:

本地: RowWiseBucketWriter    --> LocalCommitter  --> LocalCommitter.commit   本地直接修改文件名

HDFS: RowWiseBucketWriter  -->  RecoverableFsDataOutputStream.Committer --> HadoopFsCommitter.commit fs.rename 文件名

StreamingFlinkSink 處於一致性考慮在設計的時候就定義了必須開啟 checkpoint ,但是有的任務其實是不太在意一致性的,可以接受部分數據的丟失,最重要的是不想開啟 checkpoint(比如實時檢測場景,只關心最近很短時間內的數據,如果任務失敗了,直接拉起重新開始,而不是回到上次 checkpoint 時間點 )

 

直接在 Bucket.closePartFile 中提交文件,並且不放入 checkpoint 中,就可以在 滾動 part 文件的時候,提交文件

bucketWriter.recoverPendingFile(pendingFileRecoverable).commit();
// not add to checkpoint
// pendingFileRecoverablesForCurrentCheckpoint.add(pendingFileRecoverable);

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

     


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM