Flink廣播流使用舉例-過濾字符串流


1. 參考資料

https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/datastream/fault-tolerance/broadcast_state/

 

2. 過濾字符串流

需求:

  • 打開一個SocketStream實時接收用戶的輸入,過濾掉數據流中不想要的字符串,把剩余的結果打印出來。
  • 用另外一個SocketStream接收我們不想看到的字符串,為了簡化實現,在同一時刻最多只能設置一個不想看到的字符串。

 

3. 實現步驟

  • 定義兩個輸入流,接受輸入的字符串
 val wordStream: DataStream[String] = env.socketTextStream("localhost", 12345, '\n')
 val ruleStream: DataStream[String] = env.socketTextStream("localhost", 12346, '\n')
  • 定義一個MapStateDescriptor來描述我們要廣播的數據的格式,此處是String類型,我們想要的廣播字符串
val ruleStateDescriptor: MapStateDescriptor[String, String] = new MapStateDescriptor[String, String](
    "RuleBroadcastState",
    BasicTypeInfo.STRING_TYPE_INFO,
    BasicTypeInfo.STRING_TYPE_INFO
)
  • 把ruleStream注冊成廣播流
val broadcastStream: BroadcastStream[String] = ruleStream.broadcast(ruleStateDescriptor)
  • 非廣播流使用connect方法連接廣播流,在process方法中提供BroadcastProcessFunction來處理廣播流和非廣播流中的數據
val output: DataStream[String] = wordStream.connect(broadcastStream).process(
  new BroadcastProcessFunction[String, String, String] {
    override def processElement(in1: String, readOnlyContext: BroadcastProcessFunction[String, String, String]#ReadOnlyContext, collector: Collector[String]): Unit = {
      val filter: String = readOnlyContext.getBroadcastState(ruleStateDescriptor).get("filter")
      if (filter == null || !in1.equals(filter)) {
        collector.collect(in1)
      }
    }

    override def processBroadcastElement(in2: String, context: BroadcastProcessFunction[String, String, String]#Context, collector: Collector[String]): Unit = {
      context.getBroadcastState(ruleStateDescriptor).put("filter", in2)
    }
  }
)
  • 實現 BroadcastProcessFunction 中的 processElement和processBroadcastElement方法
    • processElement用來處理非廣播流中的數據,傳入的參數中有一個readOnlyContext可以獲取broadState,不過是只讀的。此處從broadcastState中查找key等於"filter"的值,如果沒有查找到,或者當前元素不等於查找到的元素,則輸出,否則不做任何操作。
    • processBroadcastElement用來處理廣播流中的數據,此處僅僅是把收到的元素加入到broadState中,key固定為"filter"。

 

4. 完整代碼

import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.streaming.api.datastream.BroadcastStream
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object WordCountWithPatternJob {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val wordStream: DataStream[String] = env.socketTextStream("localhost", 12345, '\n')
    val ruleStream: DataStream[String] = env.socketTextStream("localhost", 12346, '\n')

    val ruleStateDescriptor: MapStateDescriptor[String, String] = new MapStateDescriptor[String, String](
      "RulesBroadcastState",
      BasicTypeInfo.STRING_TYPE_INFO,
      BasicTypeInfo.STRING_TYPE_INFO
    )

    val broadcastStream: BroadcastStream[String] = ruleStream.broadcast(ruleStateDescriptor)

    val output: DataStream[String] = wordStream.connect(broadcastStream).process(
      new BroadcastProcessFunction[String, String, String] {
        override def processElement(in1: String, readOnlyContext: BroadcastProcessFunction[String, String, String]#ReadOnlyContext, collector: Collector[String]): Unit = {
          val filter: String = readOnlyContext.getBroadcastState(ruleStateDescriptor).get("filter")
          if (filter == null || !in1.equals(filter)) {
            collector.collect(in1)
          }
        }
    
        override def processBroadcastElement(in2: String, context: BroadcastProcessFunction[String, String, String]#Context, collector: Collector[String]): Unit = {
          context.getBroadcastState(ruleStateDescriptor).put("filter", in2)
        }
      }
    )

    output.print()
    env.execute("filter word streaming")
  }
}

 

5. 總結

稱做廣播流應該不太嚴謹,從實現來看實際上的把一個MapState給廣播出去,其他流通過connect一個廣播流可以獲取到這個廣播的狀態,並且是只讀的。

這里有一些 broadcast state 的重要注意事項,在使用它時需要時刻清楚:

  • 沒有跨 task 通訊:如上所述,這就是為什么只有在 (Keyed)-BroadcastProcessFunction 中處理廣播流元素的方法里可以更改 broadcast state 的內容。 同時,用戶需要保證所有 task 對於 broadcast state 的處理方式是一致的,否則會造成不同 task 讀取 broadcast state 時內容不一致的情況,最終導致結果不一致。

  • **broadcast state 在不同的 task 的事件順序可能是不同的:**雖然廣播流中元素的過程能夠保證所有的下游 task 全部能夠收到,但在不同 task 中元素的到達順序可能不同。 所以 broadcast state 的更新不能依賴於流中元素到達的順序

  • 所有的 task 均會對 broadcast state 進行 checkpoint:雖然所有 task 中的 broadcast state 是一致的,但當 checkpoint 來臨時所有 task 均會對 broadcast state 做 checkpoint。 這個設計是為了防止在作業恢復后讀文件造成的文件熱點。當然這種方式會造成 checkpoint 一定程度的寫放大,放大倍數為 p(=並行度)。Flink 會保證在恢復狀態/改變並發的時候數據沒有重復且沒有缺失。 在作業恢復時,如果與之前具有相同或更小的並發度,所有的 task 讀取之前已經 checkpoint 過的 state。在增大並發的情況下,task 會讀取本身的 state,多出來的並發(p_new - p_old)會使用輪詢調度算法讀取之前 task 的 state。

  • 不使用 RocksDB state backend: broadcast state 在運行時保存在內存中,需要保證內存充足。這一特性同樣適用於所有其他 Operator State。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM