Flink 写入 HDFS 动态 路径


该demo基于1.14版本

核心代码: withBucketAssigner(new BucketAssigner<String, String>() ......
下面是完整代码:

@SneakyThrows
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(500);
        DataStreamSource<String> stream = env.socketTextStream("localhost", 9999);

        // 指定文件前缀 🥶
        OutputFileConfig config = OutputFileConfig
                .builder()
                .withPartPrefix("prefix")
                .withPartSuffix(".ext")
                .build();
       // 根目录前缀
        String outputPath = "hdfs://xxxxxxx:8020/xxx/file";
        final FileSink<String> sink = FileSink
                .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>(Charset.defaultCharset().name()))
                .withRollingPolicy(DefaultRollingPolicy.builder()
                        //  时长 滚动切割
                        .withRolloverInterval(TimeUnit.SECONDS.toMillis(10))
                        // 空闲,滚动切割
                        .withInactivityInterval(TimeUnit.HOURS.toMillis(1))
                        // 大小 滚动切割
                        .withMaxPartSize(1024 * 1024 * 1024)
                        .build())
                // 按自定义字段划分目录
                .withBucketAssigner(new BucketAssigner<String, String>() {
                    @Override
                    public String getBucketId(String element, Context context) {
                        // date/account/dataSourceId/result.txt
                        try {
                            // 重点!!! 根据内容,自定义路径位置
                            return LocalDate.now() + "/aa/" + (Integer.parseInt(element) % 3) + "";
                        } catch (NumberFormatException e) {
                            return LocalDate.now() + "/aa/" + "1";
                        }
                    }

                    @Override
                    public SimpleVersionedSerializer<String> getSerializer() {
                        return SimpleVersionedStringSerializer.INSTANCE;
                    }
                })
                .withOutputFileConfig(config)
                // 判断是否结束文件 的间隔时间
                .withBucketCheckInterval(TimeUnit.SECONDS.toMillis(10))
                .build();
        stream.sinkTo(sink);

        env.execute();
    }


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM