一、flink 並行度:
在flink中,每個算子是一個線程,source、 filter、 flatMap 、map、 sink 這5算子都是獨立的線程。在4個層面上可以設置並行度,算子層級的優先級最高。
•Operator Level(算子層次)
•Execution Environment Level(執行環境層次)
•Client Level(客戶端層次)
•System Level(系統層次)
Flink的每個TaskManager為集群提供solt。 solt的數量通常與每個TaskManager節點的可用CPU內核數成比例。一般情況下你的slot數是你每個節點的cpu的核數。
TaskManager為子節點,solt為子節點上獨立的進程,類似storm的worker進程。
啟動一個job的話,如果並行度為1,那么所有算子線程都運行在一個節點的一個slot上, 如果單獨給flatMap算子設置並行度為4,那么flatMap線程會並行運行在4個slot上。
flink 同一個算子的 不同task不可以共享slot,即一個算子的並行度為2,它一定運行在兩個slot上, slot是內存不共享的。
flink 兩個連續的算子並行度一樣的話,flink會把兩個算子 放入一個task中,同一個task 是一個線程,減少了線程切換帶來的開銷。
storm的並行和flink的並行有一點不同,就是storm在worker進程級別設置並行度的時候,會將不同的線程 均分到不同的進程中,如果有兩個子節點那么兩個進程會優先分到兩個節點上,如果只有一個子節點,那么兩個進程會運行在同一個節點上。flink設置並行度的時候,先保證同一個slot里有全部的算子,同一個算子的其他並行算子才會分到其他slot中。如果有兩個節點會優先分配在兩個節點的slot上,如果有一個節點會在同一個節點下分配到兩個slot上。
二、flink環境啟停及job任務提交與取消
注意:每次替換了jar包 都要停了flink job和flink環境然后再重啟:
1)停止flink job:
在bin目錄執行:./flink list 查看正在運行的flink任務的jobid
------------------ Running/Restarting Jobs -------------------
03.01.2020 16:24:30 : 4de78031c1c99110328d88fbdf2384c3 : Flink Application (RUNNING)
然后執行: ./flink cancel 4de78031c1c99110328d88fbdf2384c3
2)停止和重啟flink環境的命令(在bin目錄下):
./stop-cluster.sh
./start-clush.sh
3)啟動job
./flink run --class com.najing.FlinkAppStart xxx/xxx.jar
三、flink kafka消費積壓問題解決思路
1)不要把所有topic都放到一個job中,這樣會導致一個消費者消費所有主題數據,會導致積壓,要根據topic進行job拆分。
2)給kafka主題設置分區同時給flink的consumer設置並行度,使分區數=並行度,讓3個消費者同時消費三個分區。
3)如果上面兩種方式都設置后,還是積壓,一定是在source-> filter->flatMap->map->sink 算子處理流程中有個別算子處理速度非常慢,導致積壓。
因為不同的算子是不同的線程,當中間有線程處理慢以后,前面的線程也會被它阻塞掉,導致拉取環節的線程也變慢,最終導致積壓。這就要找出處理慢的算子,進行針對性的優化。
kafka的自動提交,會在拉取后不管有沒有沒成功處理,到時間就自動提交。
四、集成springboot的flink,job提交后spring環境啟動方式
flink job提交后其實是把各個算子分配到不同節點運行的,所以要想在算子里使用spring的bean,必須在算子運行前拉起spring環境,在各個算子的open()方法里要拉起springboot 環境。
一個節點上即使有多個flot,只需要拉起一個spring進程即可,各個flot和算子共用這一個環境,為了避免在同一節點上多次啟動spring環境導致啟動報錯,須在啟動spring環境的方法里做判斷,如果已經啟動了或則正在啟動,則不再啟動。
理論上各個算子都要拉起下spring環境,避免出錯,但如果環境只有兩個節點,且算子的最大並行度也是2,只在並行度最大的算子里拉起spring環境也可以保證job正常運行,但這樣有啟動報錯風險。