Flink FileSink 自定義輸出路徑——BucketingSink


今天看到有小伙伴在問,就想着自己實現一下。

問題: Flink FileSink根據輸入數據指定輸出位置,比如講對應日期的數據輸出到對應目錄

輸入數據:
20190716 輸出到路徑 20190716
20190717 輸出到路徑 20190717
20190718 輸出到路徑 20190718

目前flink 對與輸出到文件有兩種實現(write 算子不算,只能指定目錄):Rolling File Sink 和 Streaming File Sink

Rolling File Sink 的實現就是 BucketingSink,使用也很簡單,直接指定路徑就可以了,也可以設置如:目錄名稱格式(按時間格式滾動),輸出文件格式,文件大小、滾動間隔、文件前綴、后綴一類的

// the SequenceFileWriter only works with Flink Tuples
import org.apache.flink.api.java.tuple.Tuple2
val input: DataStream[Tuple2[A, B]] = ... 

val sink = new BucketingSink[Tuple2[IntWritable, Text]]("/base/path")
sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm", ZoneId.of("America/Los_Angeles")))
sink.setWriter(new SequenceFileWriter[IntWritable, Text])
sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins

input.addSink(sink)

當然,如果是這么簡單,就不會有這篇博客了,下面進入主題

--------------------------------------

默認的 DateTimeBucketer 只能根據時間指定文件名的滾動是規則,沒辦法根據數據指定文件的輸出位置,這需要實現 BasePathBucketer 自定義輸出路徑

實現如下:

import java.io.File
import org.apache.flink.streaming.connectors.fs.Clock
import org.apache.flink.streaming.connectors.fs.bucketing.BasePathBucketer
import org.apache.hadoop.fs.Path

/**
  * 根據實際數據返回數據輸出的路徑
  */
class DayBasePathBucketer extends BasePathBucketer[String]{

  /**
    * 返回路徑
    * @param clock
    * @param basePath
    * @param element
    * @return
    */ override def getBucketPath(clock: Clock, basePath: Path, element: String): Path = { // yyyyMMdd val day = element.substring(1, 9) new Path(basePath + File.separator + day) }
}

調用如下:

import java.io.File
import java.text.SimpleDateFormat
import com.venn.index.conf.Common
import org.apache.flink.formats.json.JsonNodeDeserializationSchema
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.fs.StringWriter
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.scala._

/**
  * 使用BucketingSink 實現 根據‘數據’自定義輸出目錄
  */
object RollingFileSinkDemo {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val sdf = new SimpleDateFormat("yyyyMMddHHmmss")
    val source = new FlinkKafkaConsumer[ObjectNode]("roll_file_sink", new JsonNodeDeserializationSchema, Common.getProp)

    val sink = new BucketingSink[String]("D:\\idea_out\\rollfilesink")
    sink.setBucketer(new DayBasePathBucketer)
    sink.setWriter(new StringWriter[String])
    sink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
    //    sink.setBatchRolloverInterval(24 * 60 * 60 * 1000) // this is 24 hour
//    sink.setInProgressPrefix("inProcessPre")
// sink.setPendingPrefix("pendingpre")
// sink.setPartPrefix("partPre")

env.addSource(source) .assignAscendingTimestamps(json => { sdf.parse(json.get("date").asText()).getTime }) .map(json => { json.get("date") + "-" + json.toString // 將日期拼接到前面,方便后面使用 }) .addSink(sink) env.execute("rollingFileSink") } }

測試數據如下:

{"id" : 1, "name" : "venn1563288621091", "date" : "20190716230020"}
{"id" : 2, "name" : "venn1563288621310", "date" : "20190716231020"}
...
{"id" : 263, "name" : "venn1563288648926", "date" : "20190718184020"}
{"id" : 264, "name" : "venn1563288649029", "date" : "20190718185020"}
{"id" : 265, "name" : "venn1563288649132", "date" : "20190718190020"}

測試結果如下:

可以看到,當天的數據都輸出到當天對應的目錄中。

 

遇到個問題:

這里有個問題,因為重寫了BasePathBucketer,自定義了輸出文件,所有會同時打開多個輸出文件,帶來文件刷新的問題,在當前文件寫完后(這里的表現是:當天的數據以及全部流過,
下一天的文件以及開始寫了),會發現當天的文件中的數據不全,因為數據還沒有全部刷到文件,這個時候下一個文件又開始寫了,會發現上一個文件還沒刷完

猜想:

猜想:每個文件都有個輸出緩沖,上一個文件最后一點數據還在緩沖區,下一個文件又使用新的緩沖區,沒辦法刷到上一個文件的數據,只有等緩沖區數據滿、超時一類的操作觸發刷寫 ??

驗證:

源碼BucketingSink.closePartFilesByTime 默認每60秒或大於滾動時間間隔(batchRolloverInterval)(系統時間) 將當前park文件,將狀態從 in-process 修改為 pending,隨后關閉當前的part 文件,數據刷到磁盤

代碼如下:

private void closePartFilesByTime(long currentProcessingTime) throws Exception {

        synchronized (state.bucketStates) {
            for (Map.Entry<String, BucketState<T>> entry : state.bucketStates.entrySet()) {
                if ((entry.getValue().lastWrittenToTime < currentProcessingTime - inactiveBucketThreshold)
                        || (entry.getValue().creationTime < currentProcessingTime - batchRolloverInterval)) {
                    LOG.debug("BucketingSink {} closing bucket due to inactivity of over {} ms.",
                        getRuntimeContext().getIndexOfThisSubtask(), inactiveBucketThreshold);
                    closeCurrentPartFile(entry.getValue());
                }
            }
        }
    }

 下篇: Flink FileSink 自定義輸出路徑——StreamingFileSink、BucketingSink 和 StreamingFileSink簡單比較

搞定

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM