该demo基于1.14
版本
核心代码: withBucketAssigner(new BucketAssigner<String, String>() ......
下面是完整代码:
@SneakyThrows
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(500);
DataStreamSource<String> stream = env.socketTextStream("localhost", 9999);
// 指定文件前缀 🥶
OutputFileConfig config = OutputFileConfig
.builder()
.withPartPrefix("prefix")
.withPartSuffix(".ext")
.build();
// 根目录前缀
String outputPath = "hdfs://xxxxxxx:8020/xxx/file";
final FileSink<String> sink = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>(Charset.defaultCharset().name()))
.withRollingPolicy(DefaultRollingPolicy.builder()
// 时长 滚动切割
.withRolloverInterval(TimeUnit.SECONDS.toMillis(10))
// 空闲,滚动切割
.withInactivityInterval(TimeUnit.HOURS.toMillis(1))
// 大小 滚动切割
.withMaxPartSize(1024 * 1024 * 1024)
.build())
// 按自定义字段划分目录
.withBucketAssigner(new BucketAssigner<String, String>() {
@Override
public String getBucketId(String element, Context context) {
// date/account/dataSourceId/result.txt
try {
// 重点!!! 根据内容,自定义路径位置
return LocalDate.now() + "/aa/" + (Integer.parseInt(element) % 3) + "";
} catch (NumberFormatException e) {
return LocalDate.now() + "/aa/" + "1";
}
}
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
})
.withOutputFileConfig(config)
// 判断是否结束文件 的间隔时间
.withBucketCheckInterval(TimeUnit.SECONDS.toMillis(10))
.build();
stream.sinkTo(sink);
env.execute();
}