flink 上下文一些細節
Flink 程序可以在 本地環境運行,也可以在集群環境下運行,不同的運算環境,提交的運行過程也不太一樣,這就需要運行程序的時候需要獲取上下文環境,從而建立起與flink框架的聯系,只有獲取到上下文環境信息才能將任務分配到不同的taskmanager上運行。
StreamExecutionEnvironment
在編寫flink程序的第一步就是創建環境對象,StreamExecutionEnvironment,它是所有flink程序的基礎,在代碼中創建的執行環境其實是調用這個類的靜態方法
1、getExecutionEnvironment
最簡單的方式,就是直接調用getExecutionEnvironment方法。它會根據當前運行的上下文直接得到正確的結果:如果程序是獨立運行的,就返回一個本地執行環境;如果是創建了jar包,然后從命令行調用它並提交到集群執行,那么就返回集群的執行環境。也就是說,這個方法會根據當前運行的方式,自行決定該返回什么樣的運行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2. createLocalEnvironment
StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();
我們之前在IDEA運行的wordcount實質就是用的這個方式獲取的執行環境。wordcount 打包在集群上用的集群的方式運行
3. createRemoteEnvironment
這個方法返回集群執行環境。需要在調用時指定JobManager的主機名和端口號,並指定要在集群中運行的Jar包。
StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment("host", // JobManager主機名 1234, // JobManager進程端口號 "path/to/jarFile.jar"// 提交給JobManager的JAR包);
在獲取到程序執行環境后,我們還可以對執行環境進行靈活的設置。比如可以全局設置程序的並行度、禁用算子鏈,還可以定義程序的時間語義、配置容錯機制。關於時間語義和容錯機制,我們會在后續的章節介紹。
執行模式(Execution Mode)
StreamExecutionEnvironment 通過名稱來看這個上下文是做流處理的,但是新版本的已經實現流批一體,通過設置flink運行模式可以實現批處理。可以通過execution mode 來設置flink的運行方式。
流執行模式(STREAMING)
- 這是DataStream API最經典的模式,一般用於需要持續實時處理的無界數據流。默認情況下,程序使用的就是STREAMING執行模式。
- 批執行模式(BATCH)專門用於批處理的執行模式, 這種模式下,Flink處理作業的方式類似於MapReduce框架。對於不會持續計算的有界數據,我們用這種模式處理會更方便
- 自動模式(AUTOMATIC)在這種模式下,將由程序根據輸入數據源是否有界,來自動選擇執行模式。
BATCH模式的配置方法
通過命令行配置
bin/flink run -Dexecution.runtime-mode=BATCH ...
或者通過代碼寫死
env.setRuntimeMode(RuntimeExecutionMode.BATCH)
設置運行模式需注意:用BATCH模式處理批量數據,用STREAMING模式處理流式數據。因為數據有界的時候,直接輸出結果會更加高效;而當數據無界的時候, 我們沒得選擇——只有STREAMING模式才能處理持續的數據流。