Flink Uid設計剖析
首先我們先了解一下flink 是如何將應用狀態映射到數據集的
一個flink job通常由一個或多個source operators、一些處理計算的operators、和一個或者多個sink oper組成。每個operators在一個或者多個task中並行運行,並且使用不同的類型的state。
如果operators應用於key steam,它可以有零個、一個或者多個“key state”,它的作用是從每一條記錄中提取出它的key值,可以將它理解為處理了一個分布式的map。
下圖顯示了應用程序“ MyApp”,該應用程序由稱為“ Src”,“ Proc”和“ Snk”的三個運算符組成。 Src具有一個操作員狀態(os1),Proc具有一個操作員狀態(os2)和兩個鍵控狀態(ks1,ks2),而Snk是無狀態的。
MyApp的checkpoint 或者 savepoint都是由所有的狀態數據組成,這些數據的結構可以讓每個任務從checkpoint 或者savepoint恢復。在使用批處理作業保存恢復點的時候,其實際就是把每個任務的state映射到一個數據集或者一個表上,可以認為這個保存點其實就是一個數據庫。每個operators(由其UID唯一標識)都代表了一個名稱空間。operators的每個操作state都是有一個列映射到名稱空間的專有表,改列包含所有任務的state數據。operators的所有的key states都會映射到單個表,該表的由一個k-v列組成。改列每一個key值對應一個列。如下圖:
該圖顯示了Src的operators state值如何映射到一個表,該表具有一列五行,跨Src的所有並行任務的每個列表條目為一行。 運算符“ Proc”的運算符狀態os2類似地映射到單個表。 鍵狀態ks1和ks2被組合到具有三列的單個表中,一列用於鍵,一列用於ks1,一列用於ks2。 鍵控表為兩個鍵控state的每個不同鍵保持一行。 由於運算符“ Snk”沒有任何狀態,因此其名稱空間為空。
所以我們可以總結savepoint和database的關系如下:
- 一個savepoint是一個數據庫
- operators是其uid命名的namespace
- 每一個operator state是一個單獨的表
- operator state中每一個元素代表一行
- 每個keyed state表都有一個鍵列,該鍵列映射operator的鍵值
- 每個注冊狀態代表表中的單個列
- 表中的每一行都映射到一個鍵
為所有的operators去設置UID
如上所述,Flink將operators state映射到operators時,使用的是uid,這對於savepoint至關重要。默認情況下,uid是通過遍歷JobGraph並hash特定operators屬性來生成運算符uid。盡管對於使用者來說很方便,但是他也非常的脆弱,因為對於JobGraph的更改會導致新的UUID,為了建立穩定的映射時,我們必須setUid提供穩定的uid。
問題:
如果使用在FLink上層建立一層解析器,通過類似於SQL的簡單的語法提供給未學習過的FLink的用戶,這個時候導致解析器去映射成實際的FLink的uid並不能很穩定的去設置,會導致上個savepoint的operators即使部分相同,也會導致大部分的丟失,所以我們需要去了解UID怎么去設置的,並去做修改。
讓我們看看flink是如何去設置UID的
可以通過運行這一段demo去debug發現:
StreamGraph streamGraph = env.getStreamGraph(jobName);
JobGraph jobGraph = streamGraph.getJobGraph();
如果用戶有設置uid則設置為用戶的uid
DEBUG main org.apache.flink.streaming.api.graph.StreamGraphHasherV2 StreamGraphHasherV2.java:188: User defined hash 'LogRiverEvent,d1494b6528fcd7578379234466c9feef' for node 'Source: LogRiverEvent-1' {id: 1, parallelism: 4, user function: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011}
java.lang.Thread.getStackTrace(Thread.java:1559)
org.apache.flink.streaming.api.graph.StreamGraphHasherV2.generateNodeHash(StreamGraphHasherV2.java:185)
org.apache.flink.streaming.api.graph.StreamGraphHasherV2.traverseStreamGraphAndGenerateHashes(StreamGraphHasherV2.java:112)
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:94)
org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:737)
org.apache.flink.optimizer.plan.StreamingPlan.getJobGraph(StreamingPlan.java:40)
cn.yottabyte.pipe.flink.source.LogRiverSourceTest.main(LogRiverSourceTest.java:136)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:604)
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:466)
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1008)
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1081)
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1081)
沒有uid則會自動生成一個uid
2020-04-15 15:37:07,248 DEBUG main org.apache.flink.streaming.api.graph.StreamGraphHasherV2 StreamGraphHasherV2.java:264: Generated hash '10c893aa189fa29399b8c379dbfeca05' for node 'ignore_null_event-3' {id: 3, parallelism: 4, user function: cn.yottabyte.pipe.flink.source.LogRiverSourceTest$1}
java.lang.Thread.getStackTrace(Thread.java:1559)
org.apache.flink.streaming.api.graph.StreamGraphHasherV2.generateDeterministicHash(StreamGraphHasherV2.java:260)
org.apache.flink.streaming.api.graph.StreamGraphHasherV2.generateNodeHash(StreamGraphHasherV2.java:166)
org.apache.flink.streaming.api.graph.StreamGraphHasherV2.traverseStreamGraphAndGenerateHashes(StreamGraphHasherV2.java:112)
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:94)
org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:737)
org.apache.flink.optimizer.plan.StreamingPlan.getJobGraph(StreamingPlan.java:40)
cn.yottabyte.pipe.flink.source.LogRiverSourceTest.main(LogRiverSourceTest.java:136)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:604)
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:466)
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1008)
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1081)
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1081)
實際生成uid代碼為:flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
再去了解代碼層面:
Hasher hasher = hashFunction.newHasher();
byte[] hash = generateDeterministicHash(node, hasher, hashes, isChainingEnabled, streamGraph);
private byte[] generateDeterministicHash(
StreamNode node,
Hasher hasher,
Map<Integer, byte[]> hashes,
boolean isChainingEnabled,
StreamGraph streamGraph) {
……
for (StreamEdge outEdge : node.getOutEdges()) {
generateNodeLocalHash(hasher, hashes.size());
}
……
byte[] hash = hasher.hash().asBytes();
……
for (StreamEdge inEdge : node.getInEdges()) {
byte[] otherHash = hashes.get(inEdge.getSourceId());
for (int j = 0; j < hash.length; j++) {
hash[j] = (byte) (hash[j] * 37 ^ otherHash[j]);
}
}
……
}
最終發現最后生成的hashcode與當前的id,out個數,int的hash有關。
實際輸出一個uid為例:
如圖的flink DAG:
Node:Filter-4, #3. in edges#ea632d67b7d595e5b851708ae9ad79d6#ignore_null_event-3.
解釋:
- 第一個"#"后面的id是算hash用的id,也就是遍歷時該Node的id。
- in edges表示前向依賴的Node,后面的#接這個node的hash值,再#后面是該node的name
- out edges表示后續的Node,后面的#接這個node的hash值,再#后面是該node的name
整個鏈路如下:
- Node:Source: LogRiverEvent-1, #0.
- Node:extractTimestamp-2, #1. in edges#bc764cd8ddf7a0cff126f51c16239658#Source: LogRiverEvent-1.
- Node:ignore_null_event-3, #2. in edges#0a448493b4782967b150582570326227#extractTimestamp-2.
#並不是算完一條再算另一條
-
Node:Filter-4, #3. in edges#ea632d67b7d595e5b851708ae9ad79d6#ignore_null_event-3.
-
Node:Filter-11, #4. in edges#ea632d67b7d595e5b851708ae9ad79d6#ignore_null_event-3.
-
Node:Map-5, #5. in edges#6d2677a0ecc3fd8df0b72ec675edf8f4#Filter-4.
-
Node:Map-12, #6. in edges#5af0c26a4e78bae94addfaa227a406c1#Filter-11.
-
Node:Flat Map-6, #7. in edges#f66b9c09d172b1c19fff9288e5d53f49#Map-5.
-
Node:Flat Map-13, #8. in edges#227ab9be2ca77a534fd0e93a93e67e8b#Map-12.
-
Node:Window(TumblingEventTimeWindows(600000), EventTimeTrigger, ReduceFunction$1, PassThroughWindowFunction)-8, #9. in edges#b15a823f9f5f129a46d82a43a93f3613#Flat Map-6.
-
Node:Window(TumblingEventTimeWindows(600000), EventTimeTrigger, AggregateFunction$1, ProcessWindowFunction$1)-15, #10. in edges#d6b993c9e4e18029649e604955b50858#Flat Map-13.
-
Node:Window(TumblingEventTimeWindows(1000), EventTimeTrigger, AggregateFunction$1, ProcessWindowFunction$1)-10, #11. in edges#ce95ba768fcf87e260ab71a244cbfdb9#Window(TumblingEventTimeWindows(600000), EventTimeTrigger, ReduceFunction$1, PassThroughWindowFunction)-8.
-
Node:Map-16, #12. in edges#0ca94585fe097875e38fc49c1eb9af32#Window(TumblingEventTimeWindows(600000), EventTimeTrigger, AggregateFunction$1, ProcessWindowFunction$1)-15.
-
Node:Window(TumblingEventTimeWindows(1000), EventTimeTrigger, ProcessWindowFunction$1)-18, #13. in edges#0929bdfbc119b2e3d9f65c52d507beb4#Map-16.
-
Node:Sink: Unnamed-20, #14. in edges#9dc9fccc5cacedb2297bb9e1d19bbe5d#Window(TumblingEventTimeWindows(1000), EventTimeTrigger, AggregateFunction$1, ProcessWindowFunction$1)-10. in edges#8b25afa3fd1186f5175e9605d9e9cf94#Window(TumblingEventTimeWindows(1000), EventTimeTrigger, ProcessWindowFunction$1)-18.
了解到上面UID的設計情況以后,如果我們有需求進行更改,可以根據自己需求去在用戶層面去設計UID,避免重復計算。