借用官網的一個例子:
假設存在一個序列,序列中的元素是具有不同顏色與形狀的圖形,我們希望在序列里相同顏色的圖形中尋找滿足一定順序模式的圖形對(比如在紅色的圖形里,有一個長方形跟着一個三角形)。 同時,我們希望尋找的模式也會隨着時間而改變。
在這個例子中,我們定義兩個流,一個流包含圖形(Item),具有顏色和形狀兩個屬性。另一個流包含特定的規則(Rule),代表希望尋找的模式。
Flink 開發的時候,經常會遇到這種情況,數據的輸入源有多個,需要將一些流先關聯起來(比如:清洗規則、動態配置),再做后續的計算。
對於這樣的場景,可能很容易就想到使用 join api ,直接將兩個流 join 起來。
實際上,這樣個需求,使用 join api 是不太適合的, join 是基於窗口的,要在窗口內有關聯的數據,才能進行后續的計算。 這個需要中,規則流的某些規則在整個程序的執行周期里,可能只會有一次。
對應這樣的需要,Flink 提供了 “Broadcast State” 來解決,具體請查看官網:The Broadcast State Pattern https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/stream/state/broadcast_state.html (現在有中文翻譯了)
調用DateStream 的 broadcast 方法,將一個流解釋成廣播流,再調用非廣播流(keyed 或者 non-keyed)的 connect() 關聯, 將 BroadcastStream 當做參數傳入。 這個方法的返回參數是 BroadcastConnectedStream,具有類型方法 process(),傳入一個特殊的 CoProcessFunction 來書寫我們的關聯邏輯。
一般來說使用廣播流的時候,在每個並發中都會保留廣播的全部數據(可能沒辦法區分那些是需要的,那些不需要),這樣就會導致廣播狀態越來越大,如果廣播狀態更新比較頻率的,就不太適用了。
注: 廣播狀態使用的是 Operator State,運行時保存在內存中。
所以就進入了今天的重點是 connect 在非廣播流中的使用。
官網關於 connect 算子,只在 算子概覽(https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/stream/operators/) 中簡單的描述了 connect 算子
DataStream,DataStream → ConnectedStreams "Connects" two data streams retaining their types. Connect allowing for shared state between the two streams. DataStream<Integer> someStream = //... DataStream<String> otherStream = //... ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
看過上面廣播狀態內容的同學應該知道,connect 的順序是很重要的,那非廣播流的 connect 呢?
進入主題前,先思考這幾個問題:
1、兩個 non keyed 流 connect 的時候,數據是怎么分配的(不同並發的數據怎么分,隨機分配、循環?)
2、keyed 流 connect non keyed 流 的時候,數據是怎么分配的
3、non keyed 流 connect keyed 流 的時候,數據是怎么分配的
4、兩個 keyed 流 connect 的時候,數據是怎么分配的
對應問題4,很容易就想到: 兩個流的 keyBy 都是對下游 CoProcessFunction 的並發做的分區,所以相同 key 的數據一定會發到一起。
為了解決其他問題,有了如下程序:
大致業務: 數據流關聯配置流,獲取編碼對應轉碼值,關聯不上的就用默認值。
為了方便數據源就實現個 SourceFunction 生成 0 到 300 的數值,拼接 5 位隨機的字符串
class RadomSourceFunction extends SourceFunction[String] { var flag = true override def cancel(): Unit = { flag = false } override def run(ctx: SourceFunction.SourceContext[String]): Unit = { while (flag) { for (i <- 0 to 300) { var nu = i.toString while (nu.length < 3) { nu = "0" + nu } // code + other ctx.collect(nu + "," + StringUtil.getRandomString(5)) Thread.sleep(2000) } } } }
主程序如下:
val config = new FlinkKafkaConsumer[String]("dynamic_config", new SimpleStringSchema, Common.getProp) val configStream = env .addSource(config) .name("configStream") val input = env.addSource(new RadomSourceFunction) .name("radomFunction") // 非並發的souce function 不能添加並發 .map(str => str) .setParallelism(4) val stream = input.connect(configStream) .process(new CoProcessFunction[String, String, String] { var mapState: MapState[String, String] = _ //var map: util.HashMap[String, String] = null override def open(parameters: Configuration): Unit = { // thinking broken, if use keyed state, must keyby upstream // Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation. mapState = getRuntimeContext.getMapState(new MapStateDescriptor[String, String]("mapState", classOf[String], classOf[String])) //map = new util.HashMap[String, String]() } override def processElement1(element: String, context: CoProcessFunction[String, String, String]#Context, out: Collector[String]): Unit = { // checkouk map keys val it = mapState.keys().iterator() var size = 0 while (it.hasNext) { val key = it.next() size += 1 } //val size = map.size() println("keys size : " + size) val citeInfo = element.split(",") val code = citeInfo(0) var va = mapState.get(code) // var va = map.get(code) // 不能轉碼的數據默認輸出 中國(code=xxx) if (va == null) { va = "中國(code=" + code + ")"; } else { va = va + "(code=" + code + ")" } out.collect(va + "," + citeInfo(1)) } override def processElement2(element: String, context: CoProcessFunction[String, String, String]#Context, collector: Collector[String]): Unit = { println(getRuntimeContext.getIndexOfThisSubtask + ", " + element) val param = element.split(",") // update mapState mapState.put(param(0), param(1)) //map.put(param(0), param(1)) } override def close(): Unit = { mapState.clear() } }).setParallelism(4) val sink = new FlinkKafkaProducer[String]("non_key_connect_demo", new SimpleStringSchema(), Common.getProp) stream.print() env.execute("NonKeyedConnectDemo")
然后就。。。。
Caused by: java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation. at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:223) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:216) at com.venn.stream.api.connect.KeyedConnectDemo$$anon$1.open(KeyedConnectDemo.scala:67) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.co.CoProcessOperator.open(CoProcessOperator.java:59) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:745)
直接賞我個 NullPointerException,突然想起來,如果不做 keyBy,不能用 keyed state
這點小小的麻煩,當然難不倒我
直接上 使用 hashmap 存更新流的數,一樣看結果(注: 數據量不大可以使用 Operator State)
代碼基本一樣,就不重復添加了,具體請查看 github: https://github.com/springMoon/flink-rookie/tree/master/src/main/scala/com/venn/stream/api/connect
執行圖如下:
配置流是1個並發,下游 Co-Process 是 4 個並發,特意在處理更新流的procesElement2 中打印出接收到數據對應的 subtask id
println(getRuntimeContext.getIndexOfThisSubtask + ", " + element)
從 kafka 輸入數據,查看打印的數據,隨意截取部分數據貼在這里:
subtask id : 2, 031,營口市 subtask id : 3, 016,寧波市 subtask id : 1, 030,錦州市 subtask id : 0, 041,黃州市 subtask id : 1, 034,朝陽市 subtask id : 3, 020,麗水市 subtask id : 1, 038,襄城市
對應 configStream 往下游發數據的策略是 REBALANCE ( 數據循環的往下游算子發 官網: https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/stream/operators/#physical-partitioning)
數據在各個並發之間循環發送,但是啊,沒有做 key 的流,只有一個並發拿到了配置數據,其他的怎么辦呢?
只有在不用 broadcast 和 keyed 的時候,需要考慮這個問題,代碼里面已經給了解決辦法,感興趣的同學可以去看下
總結下 connect 算子的內容,一般場景下,使用 broadcast connnect 和 keyed connect,能保證每個並發(key)能拿到需要的數據,廣播是通過廣播數據到下游的所有並發來保證的,keyed 是通過 對相同鍵做 key 來保證的。
keyed 、 non keyd 、 broadcast 的例子 都可以在 github : https://github.com/springMoon/flink-rookie/tree/master/src/main/scala/com/venn/stream/api 可以找到
歡迎關注Flink菜鳥公眾號,會不定期更新Flink(開發技術)相關的推文