轉自:https://blog.csdn.net/a_drjiaoda/article/details/88203323
Flink on Yarn模式部署始末:Flink的Standalone和on Yarn模式都屬於集群運行模式,但是有很大的不同,在實際環境中,使用Flink on Yarn模式者居多。那么使用on yarn模式到底好在哪呢?首先,在集群運行時,可能會有很多的集群實例包括MapReduce、Spark、Flink等等,那么如果它們全基於on Yarn就可以完成資源分配,減少單個實例集群的維護,提高集群的利用率。
Flink on Yarn模式安裝部署要做的其實不多,正常的步驟:1、上傳二進制包 ===》2、解壓縮 ===》 3、更改文件名稱 ===》 4、配置環境變量。首先看下面這張圖(來自於徐葳大神),Flink on yarn的job運行模式大致分為兩類:
內存集中管理模式:在Yarn中初始化一個Flink集群,開辟指定的資源,之后我們提交的Flink Jon都在這個Flink yarn-session中,也就是說不管提交多少個job,這些job都會共用開始時在yarn中申請的資源。這個Flink集群會常駐在Yarn集群中,除非手動停止。
內存Job管理模式【推薦使用】:在Yarn中,每次提交job都會創建一個新的Flink集群,任務之間相互獨立,互不影響並且方便管理。任務執行完成之后創建的集群也會消失。
一、內存集中管理模式
第一種模式分為兩步:yarn-session.sh(開辟資源)+flink run(提交任務)
1、開源資源,使用命令
yarn-session.sh -n 2 -jm 1024 -tm 1024 -d
參數解釋:
//-n 2 表示指定兩個容器
// -jm 1024 表示jobmanager 1024M內存
// -tm 1024表示taskmanager 1024M內存
//-d 任務后台運行
//-nm,--name YARN上為一個自定義的應用設置一個名字
//-q,--query 顯示yarn中可用的資源 (內存, cpu核數)
//-z,--zookeeperNamespace <arg> 針對HA模式在zookeeper上創建NameSpace
//-id,--applicationId <yarnAppId> YARN集群上的任務id,附着到一個后台運行的yarn session中
由於flink on yarn 模式 是基於hadoop的,如果hadoop 集群沒啟動,則會連接失敗。
當啟動之后,又會出現NameNode處於安全模式,這里沒有必要手動關閉。解決方法:等hadoop啟動之后差不多20s再提交yarn-session的命令。正常運行后如下圖所示,並訪問JM的web 接口,這里有個麻煩的事情就是每次需要去看主機名和端口號。
其實,由於這還是屬於一個Yarn application,因此我們也可以通過yarn.resourcemanager.webapp.address端口來選擇訪問哪一個flink集群,例如我這里剛剛啟動了兩個Flink集群,這里可通過Tracking UI的值來跳轉到對用的Flink集群監控頁面。
關閉某個Flink集群:上述圖中大家可以看到有兩個Flink集群,這是由於誤操作直接按了ctrl+c鍵,導致前台程序退出,但是真正的Flink集群依然在后台健壯的運行着,為了演示方便,這里又通過上述的命令開啟了新的flink yarn-session。現在需要關閉一個,其實也很簡單,因為是yarn程序,我們可以直接使用 yarn application -kill application_1552292557465_0001 來結束進程。
2、提交任務
為了進行測試,我們對Flink目錄下的LICENSE文件進行詞頻統計
上傳文件至HDFS。hadoop fs -put LICENSE /
查看文件是否上傳成功。hadoop fs -ls /
執行命令。./flink run ../examples/batch/WordCount.jar -input hdfs://192.168.83.129:9000/LICENSE -output hdfs://192.168.83.129:9000/wordcount-result.txt
查看輸出結果。hadoop fs -cat /wordcount-result.txt
二、內存Job管理模式
第二種模式其實也分為兩個部分,依然是開辟資源和提交任務,但是在Job模式下,這兩步都合成一個命令了。
這里,我們直接執行命令./flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 ../examples/batch/WordCount.jar。上面的命令中沒有指定-input 和 -output,這是由於有默認的數據集和輸出方式,看看效果。
上述方框中內容就是默認的數據集,以及將輸出打印到控制台上。下面yarn application的圖可以清晰的反映第二種方式,在job結束后就會關閉flink yarn-session的集群。
第二種方式命令 參數解釋:
flink run [OPTIONS] <jar-file> <arguments>
• "run" 操作參數:
// -c,--class <classname> 如果沒有在jar包中指定入口類,則需要在這里通過這個參數指定
// -m,--jobmanager <host:port> 指定需要連接的jobmanager(主節點)地址,使用這個參數可以指定一個不同於配置文件中的jobmanager
// -p,--parallelism <parallelism> 指定程序的並行度。可以覆蓋配置文件中的默認值。
三、兩種模式區分
//第一種模式,會去找已有的Flink集群
默認查找當前yarn集群中已有的yarn-session信息中的jobmanager【/tmp/.yarn-properties-root】:
• ./bin/flink run ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1
//第一種模式,給flink指定一個已有的JM,不讓他自己去找
連接指定host和port的jobmanager:
• ./bin/flink run -m master:1234 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1
//第二種模式,指定為 yarn-cluster
啟動一個新的yarn-session:
• ./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1
注意:yarn session命令行的選項也可以使用./bin/flink 工具獲得。它們都有一個y或者yarn的前綴
例如:./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar