Flink on Yarn三部曲之三:提交Flink任務


歡迎訪問我的GitHub

https://github.com/zq2599/blog_demos

內容:所有原創文章分類匯總及配套源碼,涉及Java、Docker、Kubernetes、DevOPS等;

本文是《Flink on Yarn三部曲》系列的終篇,先簡單回顧前面的內容:

  1. 《Flink on Yarn三部曲之一:准備工作》:准備好機器、腳本、安裝包;
  2. 《Flink on Yarn三部曲之二:部署和設置》:完成CDH和Flink部署,並在管理頁面做好相關的設置;

現在Flink、Yarn、HDFS都就緒了,接下來實踐提交Flink任務到Yarn執行;

全文鏈接

  1. 《Flink on Yarn三部曲之一:准備工作》
  2. 《Flink on Yarn三部曲之二:部署和設置》
  3. 《Flink on Yarn三部曲之三:提交Flink任務》

實踐之前,對Flink on YARN先簡單了解一下,如下圖所示,Flink on Yarn在使用的時候分為兩種模式,Job ModeSession Mode
在這里插入圖片描述
Session Mode:在YARN中提前初始化一個Flink集群,以后所有Flink任務都提交到這個集群,如下圖:
在這里插入圖片描述
Job Mode:每次提交Flink任務都會創建一個專用的Flink集群,任務完成后資源釋放,如下圖:
在這里插入圖片描述
接下來分別實戰這兩種模式;

准備實戰用的數據(CDH服務器)

接下來提交的Flink任務是經典的WordCount,先在HDFS中准備一份文本文件,后面提交的Flink任務都會讀取這個文件,統計里面每個單詞的數字,准備文本的步驟如下:

  1. SSH登錄CDH服務器;
  2. 切換到hdfs賬號:su - hdfs
  3. 下載實戰用的txt文件:
wget https://github.com/zq2599/blog_demos/blob/master/files/GoneWiththeWind.txt
  1. 創建hdfs文件夾:hdfs dfs -mkdir /input
  2. 將文本文件上傳到/input目錄:hdfs dfs -put ./GoneWiththeWind.txt /input

准備工作完成,可以提交任務試試了。

Session Mode實戰

  1. SSH登錄CDH服務器;
  2. 切換到hdfs賬號:su - hdfs
  3. 進入目錄:/opt/flink-1.7.2/
  4. 執行如下命令創建Flink集群,-n參數表示TaskManager的數量,-jm表示JobManager的內存大小,-tm表示每個TaskManager的內存大小:
./bin/yarn-session.sh -n 2 -jm 1024 -tm 1024
  1. 創建成功后,控制台輸出如下圖,注意紅框中的提示,表明可以通過38301端口訪問Flink:
    在這里插入圖片描述
  2. 瀏覽器訪問CDH服務器的38301端口,可見Flink服務已經啟動:
    在這里插入圖片描述
  3. 瀏覽器訪問CDH服務器的8088端口,可見YARN的Application(即Flink集群)創建成功,如下圖,紅框中是任務ID,稍后結束Application的時候會用到此ID:
    在這里插入圖片描述
  4. 再開啟一個終端,SSH登錄CDH服務器,切換到hdfs賬號,進入目錄:/opt/flink-1.7.2
  5. 執行以下命令,就會提交一個Flink任務(安裝包自帶的WordCount例子),並指明將結果輸出到HDFS的wordcount-result.txt文件中:
bin/flink run ./examples/batch/WordCount.jar \
-input hdfs://192.168.50.134:8020/input/GoneWiththeWind.txt \
-output hdfs://192.168.50.134:8020/wordcount-result.txt
  1. 執行完畢后,控制台輸出如下:
    在這里插入圖片描述
  2. flink的WordCount任務結果保存在hdfs,我們將結果取出來看看:hdfs dfs -get /wordcount-result.txt
  3. vi打開wordcount-result.txt文件,如下圖,可見任務執行成功,指定文本中的每個單詞數量都統計出來了:
    在這里插入圖片描述
  4. 瀏覽器訪問Flink頁面(CDH服務器的38301端口),也能看到任務的詳細情況:
    在這里插入圖片描述
  5. 銷毀這個Flink集群的方法是在控制台執行命令:yarn application -kill application_1580173588985_0002
    在這里插入圖片描述
    Session Mode的實戰就完成了,接下來我們來嘗試Job Mode;

Job Mode

  1. 執行以下命令,創建一個Flink集群,該集群只用於執行參數中指定的任務(wordCount.jar),結果輸出到hdfs的wordcount-result-1.txt文件:
bin/flink run -m yarn-cluster \
-yn 2 \
-yjm 1024 \
-ytm 1024 \
./examples/batch/WordCount.jar \
-input hdfs://192.168.50.134:8020/input/GoneWiththeWind.txt \
-output hdfs://192.168.50.134:8020/wordcount-result-1.txt
  1. 控制台輸出如下,表明任務執行完成:
    在這里插入圖片描述
  2. 如果您的內存和CPU核數充裕,可以立即執行以下命令再創建一個Flink集群,該集群只用於執行參數中指定的任務(wordCount.jar),結果輸出到hdfs的wordcount-result-2.txt文件:
bin/flink run -m yarn-cluster \
-yn 2 \
-yjm 1024 \
-ytm 1024 \
./examples/batch/WordCount.jar \
-input hdfs://192.168.50.134:8020/input/GoneWiththeWind.txt \
-output hdfs://192.168.50.134:8020/wordcount-result-2.txt
  1. 在YARN管理頁面可見任務已經結束:
    在這里插入圖片描述
  2. 執行命令hdfs dfs -ls /查看結果文件,已經成功生成:
    在這里插入圖片描述
  3. 執行命令hdfs dfs -get /wordcount-result-1.txt下載結果文件到本地,檢查數據正常;
  4. 至此,Flink on Yarn的部署、設置、提交都實踐完成,《Flink on Yarn三部曲》系列也結束了,如果您也在學習Flink,希望本文能夠給您一些參考,也建議您根據自身情況和需求,修改ansible腳本,搭建更適合自己的環境;

歡迎關注公眾號:程序員欣宸

微信搜索「程序員欣宸」,我是欣宸,期待與您一同暢游Java世界...
https://github.com/zq2599/blog_demos


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM