flink架構,任務、子任務、算子概念


數據來源:https://blog.csdn.net/zhaocuit/article/details/106588758

 

flink架構
Job Managers(master):作業管理器,負責任務安排、協調檢查點、協調故障恢復等
Task Managers(worker):任務管理器,接收master的任務調度,並在本地執行相關任務
在worker節點上,會啟動一個TaskManagersRunner的進程,來接收master的任務調度

一個worker包含至少一個任務槽,每個任務槽表示worker內存資源的固定子集。

例如,具有三個槽的worker會將其托管內存的1/3專用於每個槽。分配資源意味着子任務不會與其他作業的子任務競爭托管內存。

注意:此處沒有發生CPU隔離。當前插槽僅將任務的托管內存分開。

多個槽共享TaskManangerRunner的JVM內存以及TCP連接和心跳信息,還會共享數據集和數據結構。

任務槽中運行的是什么?任務?子任務?

 

任務、子任務、算子
一個job的任務、子任務該怎么划分呢?如下taskAndSubTask方法的代碼:

public class Test{
public static void taskAndSubTask() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//這個方式的source並行度為1
DataStreamSource<String> source = env.socketTextStream("localhost", 9999);
//默認為電腦邏輯核數 如4
SingleOutputStreamOperator<Tuple2<String, Integer>> flatOperator = source.flatMap(new WindowWordCount.Splitter());
//並行度 繼承前一個算子
KeyedStream<Tuple2<String, Integer>, Tuple> keyby = flatOperator.keyBy(0);
SingleOutputStreamOperator<Object> map = keyby.map(new MapFunction<Tuple2<String, Integer>, Object>() {
@Override
public Object map(Tuple2<String, Integer> value) throws Exception {
return null;
}
});
//並行度繼承前一個算子
map.print();

//執行操作
env.execute("Window WordCount");
}
}

該job分為一個source算子(並行度1)、一個flatMap算子(並行度4)、一個keyBy算子(並行度4)、一個keyBy后map算子(並行度4)、一個sink算子(並行度4)

 

任務
  任務的划分:在一個job的執行計划(數據流圖)中,從source到計算到sink,每當並行度發生變化或者數據需要分組(keyBy)時(還可通過API明確設置),就會產生任務。

  在上述代碼中:source並行度和flatMap並行度不一樣,因此source是一個任務,flatMap是一個任務,keyBy是一個分組算子,因此又是一個任務,而keyBy、keyBy后map算子和sink是分組后操作且並行度未改變,因此屬於同一個任務。

  即該job有3個任務:source任務、flatMap任務、keyBy、keyBy后map算子和sink任務

  假設一個keyBy后map算子的並行度2,那么任務的划分如下:source任務、flatM任務、keyBy任務、keyBy后map以及sink算作一個任務

子任務
  子任務:一個任務的並行度為N,那么這個任務就擁有N個子任務。假設keyBy后map以及sink算子,他們的並行度為4,那么flink會在任務槽中運行4個keyBy后map以及sink算子

算子
  flink job中用於處理數據的一個單元,如讀取數據、計算數據、保存數據等,addSource、addSink、keyBy、map等都是一個數據處理單元。

 

自定義任務
上述任務划分只是針對默認情況下的,我們可以通過代碼讓某個任務分解成多個任務,如方法startNewChain和disableChaining

  • startNewChain:當某個算子調用該方法時,那么該算子及其后面的且屬於原來任務的算子將變成一個新的任務
  • disableChaining:當某個算子調用該方法時,那么該算子將從原來的任務中分離出來,變成一個新的任務,該算子前面的且屬於原來任務的所有算子為一個任務,該算子后面的且屬於原來任務的所有算子,也將變成一個任務


自定義任務示例
假設有如下流:source(並行度1)–>flatMap(並行度4)–>filter(並行度4)–>map(並行度4)–>keyby(並行度4)–>sink(並行度4)

  • 默認情況下:source是一個任務,flatMap、filter、map組成一個任務,keyby和sink組成一個任務
  • startNewChain:假設filter調用了startNewChain方法,那么任務就變成了:source是一個任務,flatMap是一個任務,filter、map組成一個任務,keyby和sink組成一個任務
  • disableChaining:假設filter調用了disableChaining方法,那么任務就變成了:source是一個任務,flatMap是一個任務,filter是一個任務,map是一個任務,keyby和sink組成一個任務

如何能看到任務子任務的划分情況呢?需要flink集群環境,然后進入flink網頁控制台,將job打包上傳到網頁控制台,並啟動任務或者點擊執行計划,就可以在頁面上看到任務和子任務的划分情況

 

代碼邏輯和部署邏輯
上述代碼中taskAndSubTask方法的代碼邏輯為:

  • 一個source算子(並行度1)
  • 一個flatMap算子(並行度4)
  • 一個keyBy算子(並行度4)
  • 一個keyBy后map算子(並行度4)
  • 一個sink算子(並行度4)


上述代碼中taskAndSubTask方法的部署邏輯為:

  • 一個source子任務
  • 4個flatMap子任務
  • 4個keyBy-map-sink子任務

即3個任務,9個子任務,17個算子,那么他們在槽中是如何分配的呢?

假設有2個worker,共A B C D四個槽,那么source子任務會隨機分配到一個槽中,flatMap子任務將會每個槽分配一個,keyBy-map-sink子任務每個槽分配一個

子任務在槽中的分配:盡可能讓每個槽都能執行一個完整的數據流,而不是將一個並行度為非1的某個子任務全部分配到一個槽里,這樣才能最大化的提高性能

一個worker,包含多個槽,一個槽可以運行多個子任務,一個槽下的多個子任務共享整個槽的內存資源,多個槽的內存資源等於整個worker進程的內存資源

一個worker,就是一個進程,一個子任務就是該進程下某個任務槽中的一個線程

 

重啟策略
設置重啟策略:env.setRestartStrategy()

重啟策略分類:

  • 不重啟
  • 固定延遲重啟策略:env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));
  • 故障率重啟策略

 

故障恢復策略

在flink-conf.yaml中配置jobmanager.execution.failover-strategy=full|region

  • full:重啟Job中所有的Task,即重置整個ExecutionGraph,簡單粗暴。
  • region:只重啟ExecutionGraph中對應的Region包含的Task。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM