Flink 寫入 HDFS 動態 路徑


該demo基於1.14版本

核心代碼: withBucketAssigner(new BucketAssigner<String, String>() ......
下面是完整代碼:

@SneakyThrows
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(500);
        DataStreamSource<String> stream = env.socketTextStream("localhost", 9999);

        // 指定文件前綴 🥶
        OutputFileConfig config = OutputFileConfig
                .builder()
                .withPartPrefix("prefix")
                .withPartSuffix(".ext")
                .build();
       // 根目錄前綴
        String outputPath = "hdfs://xxxxxxx:8020/xxx/file";
        final FileSink<String> sink = FileSink
                .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>(Charset.defaultCharset().name()))
                .withRollingPolicy(DefaultRollingPolicy.builder()
                        //  時長 滾動切割
                        .withRolloverInterval(TimeUnit.SECONDS.toMillis(10))
                        // 空閑,滾動切割
                        .withInactivityInterval(TimeUnit.HOURS.toMillis(1))
                        // 大小 滾動切割
                        .withMaxPartSize(1024 * 1024 * 1024)
                        .build())
                // 按自定義字段划分目錄
                .withBucketAssigner(new BucketAssigner<String, String>() {
                    @Override
                    public String getBucketId(String element, Context context) {
                        // date/account/dataSourceId/result.txt
                        try {
                            // 重點!!! 根據內容,自定義路徑位置
                            return LocalDate.now() + "/aa/" + (Integer.parseInt(element) % 3) + "";
                        } catch (NumberFormatException e) {
                            return LocalDate.now() + "/aa/" + "1";
                        }
                    }

                    @Override
                    public SimpleVersionedSerializer<String> getSerializer() {
                        return SimpleVersionedStringSerializer.INSTANCE;
                    }
                })
                .withOutputFileConfig(config)
                // 判斷是否結束文件 的間隔時間
                .withBucketCheckInterval(TimeUnit.SECONDS.toMillis(10))
                .build();
        stream.sinkTo(sink);

        env.execute();
    }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM