今天看到有小伙伴在問,就想着自己實現一下。
問題: Flink FileSink根據輸入數據指定輸出位置,比如講對應日期的數據輸出到對應目錄
輸入數據:
20190716 輸出到路徑 20190716
20190717 輸出到路徑 20190717
20190718 輸出到路徑 20190718
目前flink 對與輸出到文件有兩種實現(write 算子不算,只能指定目錄):Rolling File Sink 和 Streaming File Sink,
Rolling File Sink 的實現就是 BucketingSink,使用也很簡單,直接指定路徑就可以了,
也可以設置如:目錄名稱格式(按時間格式滾動),輸出文件格式,文件大小、滾動間隔、文件前綴、后綴一類的
// the SequenceFileWriter only works with Flink Tuples import org.apache.flink.api.java.tuple.Tuple2 val input: DataStream[Tuple2[A, B]] = ... val sink = new BucketingSink[Tuple2[IntWritable, Text]]("/base/path") sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm", ZoneId.of("America/Los_Angeles"))) sink.setWriter(new SequenceFileWriter[IntWritable, Text]) sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB, sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins input.addSink(sink)
當然,如果是這么簡單,就不會有這篇博客了,下面進入主題
--------------------------------------
默認的 DateTimeBucketer 只能根據時間指定文件名的滾動是規則,沒辦法根據數據指定文件的輸出位置,這需要實現 BasePathBucketer 自定義輸出路徑
實現如下:
import java.io.File import org.apache.flink.streaming.connectors.fs.Clock import org.apache.flink.streaming.connectors.fs.bucketing.BasePathBucketer import org.apache.hadoop.fs.Path /** * 根據實際數據返回數據輸出的路徑 */ class DayBasePathBucketer extends BasePathBucketer[String]{ /** * 返回路徑 * @param clock * @param basePath * @param element * @return */ override def getBucketPath(clock: Clock, basePath: Path, element: String): Path = { // yyyyMMdd val day = element.substring(1, 9) new Path(basePath + File.separator + day) } }
調用如下:
import java.io.File import java.text.SimpleDateFormat import com.venn.index.conf.Common import org.apache.flink.formats.json.JsonNodeDeserializationSchema import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.fs.StringWriter import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.api.scala._ /** * 使用BucketingSink 實現 根據‘數據’自定義輸出目錄 */ object RollingFileSinkDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val sdf = new SimpleDateFormat("yyyyMMddHHmmss") val source = new FlinkKafkaConsumer[ObjectNode]("roll_file_sink", new JsonNodeDeserializationSchema, Common.getProp) val sink = new BucketingSink[String]("D:\\idea_out\\rollfilesink") sink.setBucketer(new DayBasePathBucketer) sink.setWriter(new StringWriter[String]) sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB, // sink.setBatchRolloverInterval(24 * 60 * 60 * 1000) // this is 24 hour
// sink.setInProgressPrefix("inProcessPre")
// sink.setPendingPrefix("pendingpre")
// sink.setPartPrefix("partPre")
env.addSource(source) .assignAscendingTimestamps(json => { sdf.parse(json.get("date").asText()).getTime }) .map(json => { json.get("date") + "-" + json.toString // 將日期拼接到前面,方便后面使用 }) .addSink(sink) env.execute("rollingFileSink") } }
測試數據如下:
{"id" : 1, "name" : "venn1563288621091", "date" : "20190716230020"} {"id" : 2, "name" : "venn1563288621310", "date" : "20190716231020"} ... {"id" : 263, "name" : "venn1563288648926", "date" : "20190718184020"} {"id" : 264, "name" : "venn1563288649029", "date" : "20190718185020"} {"id" : 265, "name" : "venn1563288649132", "date" : "20190718190020"}
測試結果如下:
可以看到,當天的數據都輸出到當天對應的目錄中。
遇到個問題:
這里有個問題,因為重寫了BasePathBucketer,自定義了輸出文件,所有會同時打開多個輸出文件,帶來文件刷新的問題,在當前文件寫完后(這里的表現是:當天的數據以及全部流過,
下一天的文件以及開始寫了),會發現當天的文件中的數據不全,因為數據還沒有全部刷到文件,這個時候下一個文件又開始寫了,會發現上一個文件還沒刷完
猜想:
猜想:每個文件都有個輸出緩沖,上一個文件最后一點數據還在緩沖區,下一個文件又使用新的緩沖區,沒辦法刷到上一個文件的數據,只有等緩沖區數據滿、超時一類的操作觸發刷寫 ??
驗證:
源碼BucketingSink.closePartFilesByTime 默認每60秒或大於滾動時間間隔(batchRolloverInterval)(系統時間) 將當前park文件,將狀態從 in-process 修改為 pending,隨后關閉當前的part 文件,數據刷到磁盤
代碼如下:
private void closePartFilesByTime(long currentProcessingTime) throws Exception { synchronized (state.bucketStates) { for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) { if ((entry.getValue().lastWrittenToTime < currentProcessingTime - inactiveBucketThreshold) || (entry.getValue().creationTime < currentProcessingTime - batchRolloverInterval)) { LOG.debug("BucketingSink {} closing bucket due to inactivity of over {} ms.", getRuntimeContext().getIndexOfThisSubtask(), inactiveBucketThreshold); closeCurrentPartFile(entry.getValue()); } } } }
下篇: Flink FileSink 自定義輸出路徑——StreamingFileSink、BucketingSink 和 StreamingFileSink簡單比較
搞定