原文:https://blog.csdn.net/hxcaifly/article/details/84673292
https://blog.csdn.net/zero__007/article/details/88201498
https://www.jianshu.com/p/8e74c7cdd463
https://blog.csdn.net/u013014724/article/details/84800255
第一部分:Flink的Checkpoint
1. Flink Checkpoint原理介紹
Checkpoint是Flink實現容錯機制最核心的功能,它能夠根據配置周期性地基於Stream中各個Operator的狀態來生成Snapshot,從而將這些狀態數據定期持久化存儲下來,當Flink程序一旦意外崩潰時,重新運行程序時可以有選擇地從這些Snapshot進行恢復,從而修正因為故障帶來的程序數據狀態中斷。這里,我們簡單理解一下Flink Checkpoint機制,如官網下圖所示:

Checkpoint指定觸發生成時間間隔后,每當需要觸發Checkpoint時,會向Flink程序運行時的多個分布式的Stream Source中插入一個Barrier標記,這些Barrier會根據Stream中的數據記錄一起流向下游的各個Operator。當一個Operator接收到一個Barrier時,它會暫停處理Steam中新接收到的數據記錄。因為一個Operator可能存在多個輸入的Stream,而每個Stream中都會存在對應的Barrier,該Operator要等到所有的輸入Stream中的Barrier都到達。當所有Stream中的Barrier都已經到達該Operator,這時所有的Barrier在時間上看來是同一個時刻點(表示已經對齊),在等待所有Barrier到達的過程中,Operator的Buffer中可能已經緩存了一些比Barrier早到達Operator的數據記錄(Outgoing Records),這時該Operator會將數據記錄(Outgoing Records)發射(Emit)出去,作為下游Operator的輸入,最后將Barrier對應Snapshot發射(Emit)出去作為此次Checkpoint的結果數據。
2. Checkpoint的簡單設置
開啟Checkpoint功能,有兩種方式。其一是在conf/flink_conf.yaml中做系統設置;其二是針對任務再代碼里靈活配置。但是我個人推薦第二種方式,針對當前任務設置,設置代碼如下所示:
1 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 2 // 設置保存點的保存路徑,這里是保存在hdfs中 3 env.setStateBackend(new FsStateBackend("hdfs://namenode01.td.com/flink-1.5.3/flink-checkpoints")); 4 CheckpointConfig config = env.getCheckpointConfig(); 5 // 任務流取消和故障應保留檢查點 6 config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); 7 // 保存點模式:exactly_once 8 config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); 9 // 觸發保存點的時間間隔 10 config.setCheckpointInterval(60000);
上面調用enableExternalizedCheckpoints設置為ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION,表示一旦Flink處理程序被cancel后,會保留Checkpoint數據,以便根據實際需要恢復到指定的Checkpoint處理。上面代碼配置了執行Checkpointing的時間間隔為1分鍾。
3. 保存多個Checkpoint
默認情況下,如果設置了Checkpoint選項,則Flink只保留最近成功生成的1個Checkpoint,而當Flink程序失敗時,可以從最近的這個Checkpoint來進行恢復。但是,如果我們希望保留多個Checkpoint,並能夠根據實際需要選擇其中一個進行恢復,這樣會更加靈活,比如,我們發現最近4個小時數據記錄處理有問題,希望將整個狀態還原到4小時之前。
Flink可以支持保留多個Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的個數:
state.checkpoints.num-retained: 20
這樣設置以后,運行Flink Job,並查看對應的Checkpoint在HDFS上存儲的文件目錄,示例如下所示:
1 hdfs dfs -ls /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/ 2 Found 22 items 3 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:23 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-858 4 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:24 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-859 5 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:25 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-860 6 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:26 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-861 7 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:27 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-862 8 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:28 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-863 9 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:29 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-864 10 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:30 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-865 11 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:31 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-866 12 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:32 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-867 13 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:33 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-868 14 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:34 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-869 15 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:35 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-870 16 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:36 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-871 17 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:37 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-872 18 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:38 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-873 19 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:39 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-874 20 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:40 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-875 21 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:41 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-876 22 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:42 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-877 23 drwxr-xr-x - hadoop supergroup 0 2018-08-31 20:05 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/shared 24 drwxr-xr-x - hadoop supergroup 0 2018-08-31 20:05 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/taskowned
可見,我們配置了state.checkpoints.num-retained的值為20,只保留了最近的20個Checkpoint。如果希望會退到某個Checkpoint點,只需要指定對應的某個Checkpoint路徑即可實現。
4.從Checkpoint進行恢復
如果Flink程序異常失敗,或者最近一段時間內數據處理錯誤,我們可以將程序從某一個Checkpoint點,比如chk-860進行回放,執行如下命令:
bin/flink run -s hdfs://namenode01.td.com/flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-860/_metadata flink-app-jobs.jar
程序正常運行后,還會按照Checkpoint配置進行運行,繼續生成Checkpoint數據,如下所示:
1 hdfs dfs -ls /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e 2 Found 6 items 3 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:56 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/chk-861 4 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:57 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/chk-862 5 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:58 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/chk-863 6 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:59 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/chk-864 7 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:55 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/shared 8 drwxr-xr-x - hadoop supergroup 0 2018-09-01 10:55 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/taskowned
從上面我們可以看到,前面Flink Job的ID為582e17d2cc343e6c56255d111bae0191,所有的Checkpoint文件都在以Job ID為名稱的目錄里面,當Job停掉后,重新從某個Checkpoint點(chk-860)進行恢復時,重新生成Job ID(這里是11bbc5d9933e4ff7e25198a760e9792e),而對應的Checkpoint編號會從該次運行基於的編號繼續連續生成:chk-861、chk-862、chk-863等等。
第二部分: Flink的Savepoint
1.Flink的Savepoint原理介紹
Savepoint會在Flink Job之外存儲自包含(self-contained)結構的Checkpoint,它使用Flink的Checkpoint機制來創建一個非增量的Snapshot,里面包含Streaming程序的狀態,並將Checkpoint的數據存儲到外部存儲系統中。
Flink程序中包含兩種狀態數據,一種是用戶定義的狀態(User-defined State),他們是基於Flink的Transformation函數來創建或者修改得到的狀態數據;另一種是系統狀態(System State),他們是指作為Operator計算一部分的數據Buffer等狀態數據,比如在使用Window Function時,在Window內部緩存Streaming數據記錄。為了能夠在創建Savepoint過程中,唯一識別對應的Operator的狀態數據,Flink提供了API來為程序中每個Operator設置ID,這樣可以在后續更新/升級程序的時候,可以在Savepoint數據中基於Operator ID來與對應的狀態信息進行匹配,從而實現恢復。當然,如果我們不指定Operator ID,Flink也會我們自動生成對應的Operator狀態ID。
而且,強烈建議手動為每個Operator設置ID,即使未來Flink應用程序可能會改動很大,比如替換原來的Operator實現、增加新的Operator、刪除Operator等等,至少我們有可能與Savepoint中存儲的Operator狀態對應上。另外,保存的Savepoint狀態數據,畢竟是基於當時程序及其內存數據結構生成的,所以如果未來Flink程序改動比較大,尤其是對應的需要操作的內存數據結構都變化了,可能根本就無法從原來舊的Savepoint正確地恢復。
下面,我們以Flink官網文檔中給定的例子,來看下如何設置Operator ID,代碼如下所示:
1 DataStream<String> stream = env. 2 // 有狀態的source ID (例如:Kafka) 3 .addSource(new StatefulSource()) 4 .uid("source-id") // source操作的ID 5 .shuffle() 6 // 有狀態的mapper ID 7 .map(new StatefulMapper()) 8 .uid("mapper-id") // mapper的ID 9 // 無狀態sink打印 10 .print(); // 自動生成ID
2.創建Savepoint
創建一個Savepoint,需要指定對應Savepoint目錄,有兩種方式來指定:
一種是,需要配置Savepoint的默認路徑,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,設置Savepoint存儲目錄,例如如下所示:
state.savepoints.dir: hdfs://namenode01.td.com/flink-1.5.3/flink-savepoints
另一種是,在手動執行savepoint命令的時候,指定Savepoint存儲目錄,命令格式如下所示:
bin/flink savepoint :jobId [:targetDirectory]
例如,正在運行的Flink Job對應的ID為40dcc6d2ba90f13930abce295de8d038,使用默認state.savepoints.dir配置指定的Savepoint目錄,執行如下命令:
bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038
可以看到,在目錄hdfs://namenode01.td.com/flink-1.5.3/flink-savepoints/savepoint-40dcc6-4790807da3b0下面生成了ID為40dcc6d2ba90f13930abce295de8d038的Job的Savepoint數據。
為正在運行的Flink Job指定一個目錄存儲Savepoint數據,執行如下命令:
bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038 hdfs://namenode01.td.com/tmp/flink/savepoints
可以看到,在目錄 hdfs://namenode01.td.com/tmp/flink/savepoints/savepoint-40dcc6-a90008f0f82f下面生成了ID為40dcc6d2ba90f13930abce295de8d038的Job的Savepoint數據。
3.從Savepoint恢復
現在,我們可以停掉Job 40dcc6d2ba90f13930abce295de8d038,然后通過Savepoint命令來恢復Job運行,命令格式如下所示:
bin/flink run -s :savepointPath [:runArgs]
以上面保存的Savepoint為例,恢復Job運行,執行如下命令:
bin/flink run -s hdfs://namenode01.td.com/tmp/flink/savepoints/savepoint-40dcc6-a90008f0f82f flink-app-jobs.jar
可以看到,啟動一個新的Flink Job,ID為cdbae3af1b7441839e7c03bab0d0eefd。
4. Savepoint目錄結構
下面,我們看一下Savepoint目錄下面存儲內容的結構,如下所示:
hdfs dfs -ls /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b Found 5 items -rw-r--r-- 3 hadoop supergroup 4935 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/50231e5f-1d05-435f-b288-06d5946407d6 -rw-r--r-- 3 hadoop supergroup 4599 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/7a025ad8-207c-47b6-9cab-c13938939159 -rw-r--r-- 3 hadoop supergroup 4976 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/_metadata -rw-r--r-- 3 hadoop supergroup 4348 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/bd9b0849-aad2-4dd4-a5e0-89297718a13c -rw-r--r-- 3 hadoop supergroup 4724 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/be8c1370-d10c-476f-bfe1-dd0c0e7d498a
如上面列出的HDFS路徑中,11bbc5是Flink Job ID字符串前6個字符,后面bd967f90709b是隨機生成的字符串,然后savepoint-11bbc5-bd967f90709b作為存儲此次Savepoint數據的根目錄,最后savepoint-11bbc5-bd967f90709b目錄下面_metadata文件包含了Savepoint的元數據信息,其中序列化包含了savepoint-11bbc5-bd967f90709b目錄下面其它文件的路徑,這些文件內容都是序列化的狀態信息。
參考
http://shiyanjun.cn/archives/1855.html
https://www.colabug.com/2259405.html
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/state/savepoints.html
