Flink FileSink 自定義輸出路徑——StreamingFileSink、BucketingSink 和 StreamingFileSink簡單比較


接上篇:Flink FileSink 自定義輸出路徑——BucketingSink 

上篇使用BucketingSink 實現了自定義輸出路徑,現在來看看 StreamingFileSink( 據說是StreamingFileSink 是社區優化后添加的connector,推薦使用)

StreamingFileSink 實現起來會稍微麻煩一點(也是靈活,功能更強大),因為可以自己實現序列化方法(源碼里面有實例可以參考-復制)

StreamingFileSink 有兩個方法可以輸出到文件  forRowFormat 和  forBulkFormat,名字差不多代表的方法的含義:行編碼格式塊編碼格式

forRowFormat 比較簡單,只提供了 SimpleStringEncoder 寫文本文件,可以指定編碼,如下:

import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink

val input: DataStream[String] = ...

val sink: StreamingFileSink[String] = StreamingFileSink
    .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8")) // 所有數據都寫到同一個路徑
    .build()
    
input.addSink(sink)

當然我們的主題還是根據輸入數據自定義文件輸出路徑,就需要重寫 DayBucketAssigner,如下:

import java.io.IOException
import java.nio.charset.StandardCharsets
import org.apache.flink.core.io.SimpleVersionedSerializer
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner

class DayBucketAssigner extends BucketAssigner[ObjectNode, String] {

  /**
    * bucketId is the output path
    * @param element
    * @param context
    * @return
    */
  override def getBucketId(element: ObjectNode, context: BucketAssigner.Context): String = {
    //context.currentProcessingTime()
    val day = element.get("date").asText("19790101000000").substring(0, 8)
    // wrap can use day + "/" + xxx
 day
  }

  override def getSerializer: SimpleVersionedSerializer[String] = {

    StringSerializer
  }

  /**
    * 實現參考 : org.apache.flink.runtime.checkpoint.StringSerializer
    */
  object StringSerializer extends SimpleVersionedSerializer[String] {
    val VERSION = 77

    override def getVersion = 77

    @throws[IOException]
    override def serialize(checkpointData: String): Array[Byte] = checkpointData.getBytes(StandardCharsets.UTF_8)

    @throws[IOException]
    override def deserialize(version: Int, serialized: Array[Byte]): String = if (version != 77) throw new IOException("version mismatch")
    else new String(serialized, StandardCharsets.UTF_8)
  }
}

在初始化sink 的時候,指定 BucketAssigner 就可以了

val sinkRow = StreamingFileSink
      .forRowFormat(new Path("D:\\idea_out\\rollfilesink"), new SimpleStringEncoder[ObjectNode]("UTF-8"))
      .withBucketAssigner(new DayBucketAssigner) // .withBucketCheckInterval(60 * 60 * 1000l) // 1 hour
      .build()

執行結果如下:

 

2、 forBulkFormat 和forRowFormat 不太一樣,需要自己實現 BulkWriterFactory 和  DayBulkWriter,自定義程度高,可以實現自己的  FSDataOutputStream,寫出各種格式的文件(forRowFormat 自定義Encoder  也可以,但是如 forBuckFormat 靈活)

// use define BulkWriterFactory and DayBucketAssinger
    val sinkBuck = StreamingFileSink
      .forBulkFormat(new Path("D:\\idea_out\\rollfilesink"), new DayBulkWriterFactory)
      .withBucketAssigner(new DayBucketAssigner())
      .build()

實現如下:

import java.io.File
import java.nio.charset.StandardCharsets
import org.apache.flink.api.common.serialization.BulkWriter
import org.apache.flink.core.fs.FSDataOutputStream
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.flink.util.Preconditions

/**
  * 實現參考 : org.apache.flink.streaming.api.functions.sink.filesystem.BulkWriterTest
  */
class DayBulkWriter extends BulkWriter[ObjectNode] {

  val charset = StandardCharsets.UTF_8
  var stream: FSDataOutputStream = _

  def DayBulkWriter(inputStream: FSDataOutputStream): DayBulkWriter = {
    stream = Preconditions.checkNotNull(inputStream);
    this
  }

  /**
    * write element
    *
    * @param element
    */
  override def addElement(element: ObjectNode): Unit = {
    this.stream.write(element.toString.getBytes(charset))
    // wrapthis.stream.write('\n')
  }

  override def flush(): Unit = {
    this.stream.flush()
  }

  /**
    * output stream is input parameter, just flush, close is factory's job
    */
  override def finish(): Unit = {
    this.flush()
  }

}

/**
  * 實現參考 : org.apache.flink.streaming.api.functions.sink.filesystem.BulkWriterTest.TestBulkWriterFactory
  */
class DayBulkWriterFactory extends BulkWriter.Factory[ObjectNode] {
  override def create(out: FSDataOutputStream): BulkWriter[ObjectNode] = {
    val dayBulkWriter = new DayBulkWriter
    dayBulkWriter.DayBulkWriter(out)

  }
}

執行的結果就不贅述了

又遇到個問題,StreamFileSink 沒辦法指定輸出文件的名字。

BucketingSink 和 StreamingFileSink 的不同

從源碼位置來說:

BucketingSink 在 connector 下面,注重輸出數據
StreamingFileSink 在api 下面,注重與三方交互

從版本來說:

BucketingSink 比較早就有了
StreamingFileSink 是1.6版本推出的功能(據說是優化后推出的)

從支持的文件系統來說:

BucketingSink     支持Hadoop 文件系統支持的所有文件系統(原文:This connector provides a Sink that writes partitioned files to any filesystem supported by Hadoop FileSystem)
StreamingFileSink 支持Flink FileSystem 抽象文件系統   (原文:This connector provides a Sink that writes partitioned files to filesystems supported by the Flink FileSystem abstraction)

從寫數據的方式來說:

BucketingSink     默認的Writer是StringWriter,也提供SequenceFileWriter(字符)
StreamingFileSink 使用 OutputStream +  Encoder 對外寫數據 (字節)

從文件滾動策略來說:

BucketingSink     提供了時間、條數滾動 
StreamingFileSink 默認提供時間(官網有說條數,沒看到 This is also configurable but the default policy rolls files based on file size and a timeout,自己實現BulkWriter可以)

從目前(1.7.2)來說,BucketingSink 更開箱即用(功能相對簡單),StreamingFileSink更麻煩(更靈活、強大)

只是個初學者,還不太能理解 BucketingSink 和 StreamingFileSink 的差異,等了解之后,再來完善

結論:比較推薦使用StreamingFileSink

理由:功能強大,數據刷新時間更快(沒有,BucketingSink默認60S的問題,詳情見上篇,最后一段)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM