DataStreamUtils 連續keyBy 優化


經常會有這樣的業務需求,需要對一個 stream 連續分區,比如:

source
      .keyBy(0)
      .process(new TmpKeyedProcessFunction2)
      .keyBy(0)
      .process(new TmpKeyedProcessFunction2)
      .keyBy(0)
      .process(new TmpKeyedProcessFunction2)

注: keyBy 算子有 shuffle

org.apache.flink.streaming.api.scala.KeyedStream 的 process 方法聲明如下:

@PublicEvolving
  def process[R: TypeInformation](
    keyedProcessFunction: KeyedProcessFunction[K, T, R]): DataStream[R] = {

    if (keyedProcessFunction == null) {
      throw new NullPointerException("KeyedProcessFunction must not be null.")
    }

    asScalaStream(javaStream.process(keyedProcessFunction, implicitly[TypeInformation[R]]))
  }

從 KeyedStream 的 process 源碼可以看到,process 方法后, KeyedStream 變為 DataStream,如果還想在后面使用 process 方法,就只能使用 DataStream 的 process 方法。如果算子中不使用狀態,是無所謂 key 或 非 key 的。但是想在process 方法中使用鍵控狀態,就需要將 stream 轉為 KeyedStream,所以就有了前面的連續 keyBy。

算子執行圖如下:

對應官網地址: https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/stream/experimental.html

DataStreamUtils#reinterpretAsKeyedStream API 的作用是: re-interpret a pre-partitioned data stream as a keyed stream to avoid shuffling. (將預分區的流重新解釋為鍵控流)

官網案例如下:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val source = ...
new DataStreamUtils(source).reinterpretAsKeyedStream((in) => in)
  .timeWindow(Time.seconds(1))
  .reduce((a, b) => a + b)
  .addSink(new DiscardingSink[Int])
env.execute()

官網的例子感覺不出來轉為鍵控流,看下面的例子:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

val topic = "randon_string"
val kafkaSource = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), Common.getProp)

val source: DataStream[(String, String, String)] = env.addSource(kafkaSource)
  .map(str => {
    val arr = str.split(",")
    (arr(0), arr(1), arr(2))
  })

val keyStream0 = source.keyBy(0)
  .process(new TmpKeyedProcessFunction2)

val keyedStream = new DataStreamUtils(keyStream0)
  .reinterpretAsKeyedStream(element => element._1)
  .process(new TmpKeyedProcessFunction("11"))

val keyedStream2 = new DataStreamUtils(keyedStream)
  .reinterpretAsKeyedStream(element => element._1)
  .process(new TmpKeyedProcessFunction3("22"))

env.execute("multiKeyBy")

這樣就能很清晰的看出來,講一個 DataStream 解釋為 KeyedStream 了

執行圖如下:

 

警告:重新解釋的 DataStream 必須已經完全按照 Flink 的 keyBy 將數據按隨機順序進行分區的相同方式進行了預分區。 如: key-group 分配。 (來自官網)

如果解釋的流不是預分區的,在使用狀態的時候,不同分區的數據進來,會報NullPointException

完整代碼見: https://github.com/springMoon/flink-rookie.git    src/main/scala/com/venn/demo/MultipleKeyByProcess.scala

 

歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM