一、Flink提交任務的流程
Flink任務提交后,Client向HDFS上傳Flink的jar包和配置,之后向Yarn ResourceManager提交任務,ResourceManager分配Container資源並通知對應的NodeManager啟動
ApplicationMaster,ApplicationMaster啟動后加載Flink的jar包和配置構建環境,然后啟動JobManager;之后Application Master向ResourceManager申請資源啟動TaskManager
,ResourceManager分配Container資源后,由ApplicationMaster通知資源所在的節點的NodeManager啟動TaskManager,NodeManager加載Flink的Jar包和配置構建環境並啟動TaskManager,TaskManager啟動向JobManager發送心跳,並等待JobManager向其分配任務。
二、Flink任務調度原理
1、Program Code:我們編寫的Flink應用程序代碼。
2、JobClient:JobClient不是Flink程序執行的內部部分,但它是任務執行的起點。JobClient負責接受用戶的程序代碼,然后創建數據流,將數據流提交給
JobManager以便進一步執行。執行完成后,JobClient將結果返回給用戶。
3、JobManager:主進程(也稱為作業管理器)協調和管理程序的執行。它的主要指責包括安排任務,管理checkpoint,故障恢復等。機器集群中至少要有一個master,
master負責調度task,協調checkpoints和容災,高可用設置的話可以有多個master,但要保證一個是leader,其他是standby;JobManager包含ActorSystem、Scheduler、
CheckPoint三個重要的組件。
4、TaskManager:從JobManager處接收需要部署的Task。TaskManager是在JVM中一個或多個線程中執行任務的工作節點。任務執行的並行性由每個TaskManager上可用
的任務槽決定。每個任務代表分配給任務槽的一組資源。例如:如果TaskManager有四個插槽,那么它將為每個插槽分配25%的內存。可以在任務槽中運行一個或多個線程。
同一插槽中的線程共享相同的JVM。同一JVM中的任務共享TCP連接和心跳信息。TaskManager的一個Slot代表一個可用線程,該線程具有固定的內存,注意Slot只對內存隔離,
沒有對CPU隔離。
默認情況下,Flink允許子任務共享Slot,即使它們是不同的task的subtask,只要他們來自相同的job。這種共享可以有更好的資源利用率。
三、Work與Slot
每一個worker(TaskManager)是一個JVM進程,他可能會在獨立的線程上執行一個或多個subtask。為了控制一個worker能接收到多少個task,worker通過task slot來進行控制
(一個worker至少要有一個task slot)。
每個task slot表示TaskManager擁有資源的一個固定大小的子集。假如一個TaskManager有三個slot,那么它會將其管理的內存分成三份給各個slot。資源slot化意味着一個subtask將不需要跟來自其他的job的subtask競爭被管理的內存,取而代之的是它將擁有一定數量的內存儲備。需要注意的是,這里不會涉及到CPU的隔離,slot目前僅僅用來隔離
task的受管理的內存。
通過調整task slot的數量,允許用戶定義subtask之間如何互相隔離。如果一個TaskManager一個slot,那將意味着每個task group運行在獨立的JVM中(該JVM可能是通過一個特定的容器啟動的),而一個TaskManager多個slot意味着更多的subtask可以共享同一個JVM。而在同一個JVM進程中的task將共享TCP連接(基於多路復用)和心跳信息。他們也
可能共享數據集和數據結構,因此這減少了每個task的負載。
四、Flink程序架構
每個Flink程序都包含以下的若干流程
獲得一個執行環境:(Execution Environment) 相當於Spark中的SparkContext
加在/創建初始數據:(Source)
指定轉換這些數據:(Transformation)
指定放置計算結果的位置:(Sink)
觸發程序執行
五、 Environment
執行環境StreamExecutionEnvironment是所有Flink程序的基礎。
StreamExecutionEnvironment.getExecutionEnvironment(根據運行情況,返回本地或者集群的運行環境) 默認分區是8
創建一個執行環境,表示當前執行程序的上下文。如果程序是獨立調用的,則此方法返回本地執行環境:如果從命令行客戶端調用程序以提交到集群,則此方法返回此集群的執行環境,也就是說,
getEexecutionEnvironment會根據查詢運行的方式決定返回什么樣的運行環境,是最常用的一種創建執行環境的方式。
StreamExecutionEnvironment.createLocalEnvironment(1) -》開始只有一個分區
返回本地執行環境,需要在調用時指定默認的並行度。
StreamExecutionEnvironment.createRemoteEnvironment("localhost",8800)
返回集群執行環境,將Jar提交到遠程服務器。需要在調用時指定JobManager的IP和端口號,並指定要在集群中運行的Jar包。
六、Source
1、基於File的數據源
1.1、readTextFile(path)
1.1、readTextFile(path)
一列一列的讀取遵循TextInputFormat規范的文本文件,並將結果作為String返回。