該demo基於1.14
版本
核心代碼: withBucketAssigner(new BucketAssigner<String, String>() ......
下面是完整代碼:
@SneakyThrows
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(500);
DataStreamSource<String> stream = env.socketTextStream("localhost", 9999);
// 指定文件前綴 🥶
OutputFileConfig config = OutputFileConfig
.builder()
.withPartPrefix("prefix")
.withPartSuffix(".ext")
.build();
// 根目錄前綴
String outputPath = "hdfs://xxxxxxx:8020/xxx/file";
final FileSink<String> sink = FileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>(Charset.defaultCharset().name()))
.withRollingPolicy(DefaultRollingPolicy.builder()
// 時長 滾動切割
.withRolloverInterval(TimeUnit.SECONDS.toMillis(10))
// 空閑,滾動切割
.withInactivityInterval(TimeUnit.HOURS.toMillis(1))
// 大小 滾動切割
.withMaxPartSize(1024 * 1024 * 1024)
.build())
// 按自定義字段划分目錄
.withBucketAssigner(new BucketAssigner<String, String>() {
@Override
public String getBucketId(String element, Context context) {
// date/account/dataSourceId/result.txt
try {
// 重點!!! 根據內容,自定義路徑位置
return LocalDate.now() + "/aa/" + (Integer.parseInt(element) % 3) + "";
} catch (NumberFormatException e) {
return LocalDate.now() + "/aa/" + "1";
}
}
@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}
})
.withOutputFileConfig(config)
// 判斷是否結束文件 的間隔時間
.withBucketCheckInterval(TimeUnit.SECONDS.toMillis(10))
.build();
stream.sinkTo(sink);
env.execute();
}