1. 參考資料
https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/datastream/fault-tolerance/broadcast_state/
2. 過濾字符串流
需求:
- 打開一個SocketStream實時接收用戶的輸入,過濾掉數據流中不想要的字符串,把剩余的結果打印出來。
- 用另外一個SocketStream接收我們不想看到的字符串,為了簡化實現,在同一時刻最多只能設置一個不想看到的字符串。
3. 實現步驟
- 定義兩個輸入流,接受輸入的字符串
val wordStream: DataStream[String] = env.socketTextStream("localhost", 12345, '\n')
val ruleStream: DataStream[String] = env.socketTextStream("localhost", 12346, '\n')
- 定義一個MapStateDescriptor來描述我們要廣播的數據的格式,此處是String類型,我們想要的廣播字符串
val ruleStateDescriptor: MapStateDescriptor[String, String] = new MapStateDescriptor[String, String]( "RuleBroadcastState", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO )
- 把ruleStream注冊成廣播流
val broadcastStream: BroadcastStream[String] = ruleStream.broadcast(ruleStateDescriptor)
- 非廣播流使用connect方法連接廣播流,在process方法中提供BroadcastProcessFunction來處理廣播流和非廣播流中的數據
val output: DataStream[String] = wordStream.connect(broadcastStream).process( new BroadcastProcessFunction[String, String, String] { override def processElement(in1: String, readOnlyContext: BroadcastProcessFunction[String, String, String]#ReadOnlyContext, collector: Collector[String]): Unit = { val filter: String = readOnlyContext.getBroadcastState(ruleStateDescriptor).get("filter") if (filter == null || !in1.equals(filter)) { collector.collect(in1) } } override def processBroadcastElement(in2: String, context: BroadcastProcessFunction[String, String, String]#Context, collector: Collector[String]): Unit = { context.getBroadcastState(ruleStateDescriptor).put("filter", in2) } } )
- 實現 BroadcastProcessFunction 中的 processElement和processBroadcastElement方法
- processElement用來處理非廣播流中的數據,傳入的參數中有一個readOnlyContext可以獲取broadState,不過是只讀的。此處從broadcastState中查找key等於"filter"的值,如果沒有查找到,或者當前元素不等於查找到的元素,則輸出,否則不做任何操作。
- processBroadcastElement用來處理廣播流中的數據,此處僅僅是把收到的元素加入到broadState中,key固定為"filter"。
4. 完整代碼
import org.apache.flink.api.common.state.MapStateDescriptor import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.streaming.api.datastream.BroadcastStream import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector object WordCountWithPatternJob { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val wordStream: DataStream[String] = env.socketTextStream("localhost", 12345, '\n') val ruleStream: DataStream[String] = env.socketTextStream("localhost", 12346, '\n') val ruleStateDescriptor: MapStateDescriptor[String, String] = new MapStateDescriptor[String, String]( "RulesBroadcastState", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO ) val broadcastStream: BroadcastStream[String] = ruleStream.broadcast(ruleStateDescriptor) val output: DataStream[String] = wordStream.connect(broadcastStream).process( new BroadcastProcessFunction[String, String, String] { override def processElement(in1: String, readOnlyContext: BroadcastProcessFunction[String, String, String]#ReadOnlyContext, collector: Collector[String]): Unit = { val filter: String = readOnlyContext.getBroadcastState(ruleStateDescriptor).get("filter") if (filter == null || !in1.equals(filter)) { collector.collect(in1) } } override def processBroadcastElement(in2: String, context: BroadcastProcessFunction[String, String, String]#Context, collector: Collector[String]): Unit = { context.getBroadcastState(ruleStateDescriptor).put("filter", in2) } } ) output.print() env.execute("filter word streaming") } }
5. 總結
稱做廣播流應該不太嚴謹,從實現來看實際上的把一個MapState給廣播出去,其他流通過connect一個廣播流可以獲取到這個廣播的狀態,並且是只讀的。
這里有一些 broadcast state 的重要注意事項,在使用它時需要時刻清楚:
-
沒有跨 task 通訊:如上所述,這就是為什么只有在
(Keyed)-BroadcastProcessFunction
中處理廣播流元素的方法里可以更改 broadcast state 的內容。 同時,用戶需要保證所有 task 對於 broadcast state 的處理方式是一致的,否則會造成不同 task 讀取 broadcast state 時內容不一致的情況,最終導致結果不一致。 -
**broadcast state 在不同的 task 的事件順序可能是不同的:**雖然廣播流中元素的過程能夠保證所有的下游 task 全部能夠收到,但在不同 task 中元素的到達順序可能不同。 所以 broadcast state 的更新不能依賴於流中元素到達的順序。
-
所有的 task 均會對 broadcast state 進行 checkpoint:雖然所有 task 中的 broadcast state 是一致的,但當 checkpoint 來臨時所有 task 均會對 broadcast state 做 checkpoint。 這個設計是為了防止在作業恢復后讀文件造成的文件熱點。當然這種方式會造成 checkpoint 一定程度的寫放大,放大倍數為 p(=並行度)。Flink 會保證在恢復狀態/改變並發的時候數據沒有重復且沒有缺失。 在作業恢復時,如果與之前具有相同或更小的並發度,所有的 task 讀取之前已經 checkpoint 過的 state。在增大並發的情況下,task 會讀取本身的 state,多出來的並發(
p_new
-p_old
)會使用輪詢調度算法讀取之前 task 的 state。 -
不使用 RocksDB state backend: broadcast state 在運行時保存在內存中,需要保證內存充足。這一特性同樣適用於所有其他 Operator State。