分布式緩存
熟悉 Hadoop 的你應該知道,分布式緩存最初的思想誕生於 Hadoop 框架,Hadoop 會將一些數據或者文件緩存在 HDFS 上,在分布式環境中讓所有的計算節點調用同一個配置文件。在 Flink 中,Flink 框架開發者們同樣將這個特性進行了實現。
Flink 提供的分布式緩存類型 Hadoop,目的是為了在分布式環境中讓每一個 TaskManager 節點保存一份相同的數據或者文件,當前計算節點的 task 就像讀取本地文件一樣拉取這些配置。
分布式緩存在我們實際生產環境中最廣泛的一個應用,就是在進行表與表 Join 操作時,如果一個表很大,另一個表很小,那么我們就可以把較小的表進行緩存,在每個 TaskManager 都保存一份,然后進行 Join 操作。
那么我們應該怎樣使用 Flink 的分布式緩存呢?舉例如下:
package wyh.flinkcache; import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.operators.MapOperator; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.windowing.time.Time; import java.io.File; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; public class DistributedCache { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //1、注冊一個文件,可以使用HDFS上的文件,也可以是本地文件進行測試,並且為文件取一個名字
env.registerCachedFile("F:\\Flink-lagou\\lagou\\data\\distributedcache.txt", "distributedCache"); //無重啟策略 // env.setRestartStrategy(RestartStrategies.noRestart()); //固定延遲重啟策略模式 // env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Time.of(5,TimeUnit.SECONDS))); //每個時間間隔的最大故障次數 // env.setRestartStrategy(RestartStrategies.failureRateRestart(Time.of(5,TimeUnit.MINUTES) // ,Time.of(5,TimeUnit.SECONDS)));
DataSource<String> data = env.fromElements("Linea", "Lineb", "Linec", "Lined"); MapOperator<String, String> result = data.map(new RichMapFunction<String, String>() { ArrayList<String> dataList = new ArrayList<>(); @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //2、getRuntimeContext() 使用該緩存文件,根據注冊的名字直接獲取文件
File myFlie = getRuntimeContext().getDistributedCache().getFile("distributedCache"); List<String> lines = FileUtils.readLines(myFlie); for (String line : lines) { dataList.add(line); System.out.println("分布式緩存為:" + line); } } @Override public String map(String s) throws Exception { //在這里就可以使用dataList
System.out.println("使用dataList" + dataList + "------" + s); return dataList + ":" + s; } }); result.printToErr(); } }
從上面的例子中可以看出,使用分布式緩存有兩個步驟。
第一步:首先需要在 env 環境中注冊一個文件,該文件可以來源於本地,也可以來源於 HDFS ,並且為該文件取一個名字。
第二步:在使用分布式緩存時,可根據注冊的名字直接獲取。
可以看到,在上述案例中,我們把一個本地的 distributedcache.txt 文件注冊為 distributedCache,在下面的 map 算子中直接通過這個名字將緩存文件進行讀取並且進行了處理。
我們直接運行該程序,在控制台可以看到如下輸出:
在使用分布式緩存時也需要注意一些問題,需要我們緩存的文件在任務運行期間最好是只讀狀態,否則會造成數據的一致性問題。另外,緩存的文件和數據不宜過大,否則會影響 Task 的執行速度,在極端情況下會造成 OOM。
故障恢復和重啟策略
自動故障恢復是 Flink 提供的一個強大的功能,在實際運行環境中,我們會遇到各種各樣的問題從而導致應用掛掉,比如我們經常遇到的非法數據、網絡抖動等。
Flink 提供了強大的可配置故障恢復和重啟策略來進行自動恢復。
故障恢復
我們在上一課時中介紹過 Flink 的配置文件,其中有一個參數 jobmanager.execution.failover-strategy: region。
Flink 支持了不同級別的故障恢復策略,jobmanager.execution.failover-strategy 的可配置項有兩種:full 和 region。
當我們配置的故障恢復策略為 full 時,集群中的 Task 發生故障,那么該任務的所有 Task 都會發生重啟。而在實際生產環境中,我們的大作業可能有幾百個 Task,出現一次異常如果進行整個任務重啟,那么經常會導致長時間任務不能正常工作,導致數據延遲。
但是事實上,我們可能只是集群中某一個或幾個 Task 發生了故障,只需要重啟有問題的一部分即可,這就是 Flink 基於 Region 的局部重啟策略。在這個策略下,Flink 會把我們的任務分成不同的 Region,當某一個 Task 發生故障時,Flink 會計算需要故障恢復的最小 Region。
Flink 在判斷需要重啟的 Region 時,采用了以下的判斷邏輯:
發生錯誤的 Task 所在的 Region 需要重啟;
如果當前 Region 的依賴數據出現損壞或者部分丟失,那么生產數據的 Region 也需要重啟;
為了保證數據一致性,當前 Region 的下游 Region 也需要重啟。
重啟策略
Flink 提供了多種類型和級別的重啟策略,常用的重啟策略包括:
固定延遲重啟策略模式
失敗率重啟策略模式
無重啟策略模式
Flink 在判斷使用的哪種重啟策略時做了默認約定,如果用戶配置了 checkpoint,但沒有設置重啟策略,那么會按照固定延遲重啟策略模式進行重啟;如果用戶沒有配置 checkpoint,那么默認不會重啟。
下面我們分別對這三種模式進行詳細講解。
無重啟策略模式
在這種情況下,如果我們的作業發生錯誤,任務會直接退出。
我們可以在 flink-conf.yaml 中配置:
restart-strategy: none
也可以在程序中使用代碼指定:
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setRestartStrategy(RestartStrategies.noRestart());
固定延遲重啟策略模式
固定延遲重啟策略會通過在 flink-conf.yaml 中設置如下配置參數,來啟用此策略:
restart-strategy: fixed-delay
固定延遲重啟策略模式需要指定兩個參數,首先 Flink 會根據用戶配置的重試次數進行重試,每次重試之間根據配置的時間間隔進行重試,如下表所示:
舉個例子,假如我們需要任務重試 3 次,每次重試間隔 5 秒,那么需要進行一下配置:
restart-strategy.fixed-delay.attempts: 3 restart-strategy.fixed-delay.delay: 5 s
當前我們也可以在代碼中進行設置:
env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, // 重啟次數
Time.of(5, TimeUnit.SECONDS) // 時間間隔
));
失敗率重啟策略模式
首先我們在 flink-conf.yaml 中指定如下配置:
restart-strategy: failure-rate
這種重啟模式需要指定三個參數,如下表所示。失敗率重啟策略在 Job 失敗后會重啟,但是超過失敗率后,Job 會最終被認定失敗。在兩個連續的重啟嘗試之間,重啟策略會等待一個固定的時間。
這種策略的配置理解較為困難,我們舉個例子,假如 5 分鍾內若失敗了 3 次,則認為該任務失敗,每次失敗的重試間隔為 5 秒。
那么我們的配置應該是:
restart-strategy.failure-rate.max-failures-per-interval: 3 restart-strategy.failure-rate.failure-rate-interval: 5 min restart-strategy.failure-rate.delay: 5 s
當然,也可以在代碼中直接指定:
env.setRestartStrategy(RestartStrategies.failureRateRestart( 3, // 每個時間間隔的最大故障次數
Time.of(5, TimeUnit.MINUTES), // 測量故障率的時間間隔
Time.of(5, TimeUnit.SECONDS) // 每次任務失敗時間間隔
));
最后,需要注意的是,在實際生產環境中由於每個任務的負載和資源消耗不一樣,我們推薦在代碼中指定每個任務的重試機制和重啟策略。
並行度
並行度是 Flink 執行任務的核心概念之一,它被定義為在分布式運行環境中我們的一個算子任務被切分成了多少個子任務並行執行。我們提高任務的並行度(Parallelism)在很大程度上可以大大提高任務運行速度。
一般情況下,我們可以通過四種級別來設置任務的並行度。
算子級別
在代碼中可以調用 setParallelism 方法來設置每一個算子的並行度。例如:
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new LineSplitter()) .groupBy(0) .sum(1).setParallelism(1);
事實上,Flink 的每個算子都可以單獨設置並行度。這也是我們最推薦的一種方式,可以針對每個算子進行任務的調優。
執行環境級別
我們在創建 Flink 的上下文時可以顯示的調用 env.setParallelism() 方法,來設置當前執行環境的並行度,這個配置會對當前任務的所有算子、Source、Sink 生效。當然你還可以在算子級別設置並行度來覆蓋這個設置。
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(5);
提交任務級別
用戶在提交任務時,可以顯示的指定 -p 參數來設置任務的並行度,例如:
./bin/flink run -p 10 WordCount.jar
系統配置級別
flink-conf.yaml 中的一個配置:parallelism.default,該配置即是在系統層面設置所有執行環境的並行度配置。
整體上講,這四種級別的配置生效優先級如下:算子級別 > 執行環境級別 > 提交任務級別 > 系統配置級別。
在這里,要特別提一下 Flink 中的 Slot 概念。我們知道,Flink 中的 TaskManager 是執行任務的節點,那么在每一個 TaskManager 里,還會有“槽位”,也就是 Slot。Slot 個數代表的是每一個 TaskManager 的並發執行能力。
假如我們指定 taskmanager.numberOfTaskSlots:3,即每個 taskManager 有 3 個 Slot ,那么整個集群就有 3 * taskManager 的個數多的槽位。這些槽位就是我們整個集群所擁有的所有執行任務的資源。