【源碼】Flink StreamFileSink 輸出文件名怎么命名的



本文基於 Flink 1.11.3

最近 flink 版本從 1.8 升級到 1.11,在輸出數據到 hdfs 的時候,發現輸出文件都是這樣命名的:

.part-0-0.inprogress.aa4a310c-7b48-4dee-b153-2a4f21ef10b3
.part-0-0.inprogress.b7e69438-6573-46c9-ae02-fab11db802cf
.part-0-0.inprogress.bcbf1657-4959-4c92-8dca-084346924f0c
.part-0-0.inprogress.cf18c5d5-bfcb-41d3-8177-5d450a1a469e

1.8 的時候是這樣的

part-0-0.pending
part-0-1.pending
part-0-2.pending

文件名是什么倒是不影響使用,但是多了個"."開頭就比較麻煩,因“.”開頭表示是隱藏文件,比如 hive 不讀到,flink 自己讀目錄的時候,也會忽略 "." 開頭的文件
注: 由於 flink 任務更在意時效性,沒有開啟 checkpoint,所以輸出文件不能提交(也就是不會從 .part-0-0.inprogress.cf18c5d5-bfcb-41d3-8177-5d450a1a469e 修改為 part-0-0 )

先看下 StreamFileSink 怎么使用的,24小時滾動依次,每個文件最大 128M

DateTimeBucketAssigner dateTimeBucketAssigner = new DateTimeBucketAssigner("yyyy-MM-dd", ZoneId.of("Asia/Shanghai"));

StreamingFileSink sink = StreamingFileSink
    .forRowFormat(new Path(path), new SimpleStringEncoder<String>("UTF-8"))
    .withBucketAssigner(dateTimeBucketAssigner)
    .withRollingPolicy(
            DefaultRollingPolicy.builder()
                    .withRolloverInterval(TimeUnit.HOURS.toMillis(24))
                    .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                    .withMaxPartSize(128 * 1024 * 1024)
                    .build())
    .build();

stream
        .addSink(sink)

 

調用了 StreamingFileSinkHelper.onElement 方法

public void invoke(IN value, Context context) throws Exception {
    this.helper.onElement(value, context.currentProcessingTime(), context.timestamp(), context.currentWatermark());
}

又調用了 Buckets.onElement 方法

@VisibleForTesting
    public Bucket<IN, BucketID> onElement(IN value, Context context) throws Exception {
        return this.onElement(value, context.currentProcessingTime(), context.timestamp(), context.currentWatermark());
    }

    public Bucket<IN, BucketID> onElement(IN value, long currentProcessingTime, @Nullable Long elementTimestamp, long currentWatermark) throws Exception {
        this.bucketerContext.update(elementTimestamp, currentWatermark, currentProcessingTime);
        BucketID bucketId = this.bucketAssigner.getBucketId(value, this.bucketerContext);
        Bucket<IN, BucketID> bucket = this.getOrCreateBucketForBucketId(bucketId);
        bucket.write(value, currentProcessingTime);
        this.maxPartCounter = Math.max(this.maxPartCounter, bucket.getPartCounter());
        return bucket;
    }

又調用了 Bucket.write 方法

void write(IN element, long currentTime) throws IOException {
    if (this.inProgressPart == null || this.rollingPolicy.shouldRollOnEvent(this.inProgressPart, element)) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Subtask {} closing in-progress part file for bucket id={} due to element {}.", new Object[]{this.subtaskIndex, this.bucketId, element});
        }

        this.rollPartFile(currentTime);
    }

    this.inProgressPart.write(element, currentTime);
}

debug 可以看到,輸出文件的生成是在 rollPartFile 方法中(簡單理解,每次滾動文件的時候,需要創建新的文件來放新的數據)

查看 rollPartFile 源碼

private void rollPartFile(long currentTime) throws IOException {
    this.closePartFile();
    Path partFilePath = this.assembleNewPartPath();
    this.inProgressPart = this.bucketWriter.openNewInProgressFile(this.bucketId, partFilePath, currentTime);
    if (LOG.isDebugEnabled()) {
        LOG.debug("Subtask {} opening new part file \"{}\" for bucket id={}.", new Object[]{this.subtaskIndex, partFilePath.getName(), this.bucketId});
    }

    ++this.partCounter;
}

自然就找到了 this.bucketWriter.openNewInProgressFile 方法

 public InProgressFileWriter<IN, BucketID> openNewInProgressFile(BucketID bucketID, Path path, long creationTime) throws IOException {
        return this.openNew(bucketID, this.recoverableWriter.open(path), path, creationTime);
    }

this.recoverableWriter.open(path) 是創建文件的地方, RecoverableWriter 有兩種實現 LocalRecoverableWriter 和 HadoopRecoverableWriter(基於本地的debug,是 LocalRecoverableWriter, Writer 的選擇是在創建輸出 Buckets 的時候,基於輸出文件的schame 選擇的, hdfs:// 或 file://)

RecoverableWriter 的兩種實現:

public RecoverableFsDataOutputStream open(Path filePath) throws IOException {
        File targetFile = this.fs.pathToFile(filePath);
        File tempFile = generateStagingTempFilePath(targetFile);
        File parent = tempFile.getParentFile();
        if (parent != null && !parent.mkdirs() && !parent.exists()) {
            throw new IOException("Failed to create the parent directory: " + parent);
        } else {
            return new LocalRecoverableFsDataOutputStream(targetFile, tempFile);
        }
    }

static File generateStagingTempFilePath(File targetFile) {
        Preconditions.checkArgument(!targetFile.isDirectory(), "targetFile must not be a directory");
        File parent = targetFile.getParentFile();
        String name = targetFile.getName();
        Preconditions.checkArgument(parent != null, "targetFile must not be the root directory");

        File candidate;
        do {
            candidate = new File(parent, "." + name + ".inprogress." + UUID.randomUUID().toString());
        } while(candidate.exists());

        return candidate;
    }

LocalRecoverableWriter 的 open 方法中創建了輸出流,指定了目標文件和臨時文件

臨時文件在輸出的過程中產生的文件,臨時文件的名由方法: generateStagingTempFilePath 創建,可以看到文件是這樣命名的: parent, "." + name + ".inprogress." + UUID.randomUUID().toString() ,所以看到的輸出文件名是這樣的
目標文件是輸出最終的文件,flink 任務在做 checkpoint 的時候,會將臨時文件 move(hadoop 是 rename)為目標文件,例如, LocalRecoverableWriter 的輸出流 LocalRecoverableFsDataOutputStream.LocalCommitter 的提交方法:

public void commit() throws IOException {
    File src = this.recoverable.tempFile();
    File dest = this.recoverable.targetFile();
    if (src.length() != this.recoverable.offset()) {
        throw new IOException("Cannot clean commit: File has trailing junk data.");
    } else {
        try {
            Files.move(src.toPath(), dest.toPath(), StandardCopyOption.ATOMIC_MOVE);
        } catch (AtomicMoveNotSupportedException | UnsupportedOperationException var4) {
            if (!src.renameTo(dest)) {
                throw new IOException("Committing file failed, could not rename " + src + " -> " + dest);
            }
        } catch (FileAlreadyExistsException var5) {
            throw new IOException("Committing file failed. Target file already exists: " + dest);
        }

    }
}

RecoverableFsDataOutputStream 的兩種實現:

從源碼看到臨時文件的命名和臨時文件提交的代碼,那修改就再簡單不過了

注: 這樣修改不會改變 flink 任務本身的端到端一致性,但是下游會更早讀到數據,一致性就會受到影響

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM