Flink基礎(十四):DS簡介(14) 搭建Flink運行流式應用


1 部署方式

1.1 獨立集群

獨立集群包含至少一個master進程,以及至少一個TaskManager進程,TaskManager進程運行在一台或者多台機器上。所有的進程都是JVM進程。下圖展示了獨立集群的部署。

master進程在不同的線程中運行了一個Dispatcher和一個ResourceManager。一旦它們開始運行,所有TaskManager都將在Resourcemanager中進行注冊。下圖展示了一個任務如何提交到一個獨立集群中去。

客戶端向Dispatcher提交了一個任務,Dispatcher將會啟動一個作業管理器線程,並提供執行所需的JobGraph。作業管理器向ResourceManager請求必要的task slots。一旦請求的slots分配好,作業管理器就會部署job。

在standalone這種部署方式中,master和worker進程在失敗以后,並不會自動重啟。如果有足夠的slots可供使用,job是可以從一次worker失敗中恢復的。只要我們運行多個worker就好了。但如果job想從master失敗中恢復的話,則需要進行高可用(HA)的配置了。

部署步驟

下載壓縮包

鏈接:http://mirror.bit.edu.cn/apache/flink/flink-1.11.0/flink-1.11.0-bin-scala_2.11.tgz

解壓縮

 
$ tar xvfz flink-1.11.0-bin-scala_2.11.tgz

啟動集群

 
$ cd flink-1.11.0 $ ./bin/start-cluster.sh 

檢查集群狀態可以訪問:http://localhost:8081

部署分布式集群

  1. 所有運行TaskManager的機器的主機名(或者IP地址)都需要寫入./conf/slaves文件中。
  2. start-cluster.sh腳本需要所有機器的無密碼的SSH登錄配置,方便啟動TaskManager進程。
  3. Flink的文件夾在所有的機器上都需要有相同的絕對路徑。
  4. 運行master進程的機器的主機名或者IP地址需要寫在./conf/flink-conf.yaml文件的jobmanager.rpc.address配置項。

一旦部署好,我們就可以調用./bin/start-cluster.sh命令啟動集群了,腳本會在本地機器啟動一個作業管理器,然后在每個slave機器上啟動一個TaskManager。停止運行,請使用./bin/stop-cluster.sh

1.2 Apache Hadoop Yarn

YARN是Apache Hadoop的資源管理組件。用來計算集群環境所需要的CPU和內存資源,然后提供給應用程序請求的資源。

Flink在YARN上運行,有兩種模式:job模式和session模式。在job模式中,Flink集群用來運行一個單獨的job。一旦job結束,Flink集群停止,並釋放所有資源。下圖展示了Flink的job如何提交到YARN集群。

當客戶端提交任務時,客戶端將建立和YARN ResourceManager的連接,然后啟動一個新的YARN應用的master進程,進程中包含一個作業管理器線程和一個ResourceManager。作業管理器向ResourceManager請求所需要的slots,用來運行Flink的job。接下來,Flink的ResourceManager將向Yarn的ResourceManager請求容器,然后啟動TaskManager進程。一旦啟動,TaskManager會將slots注冊在Flink的ResourceManager中,Flink的ResourceManager將把slots提供給作業管理器。最終,作業管理器把job的任務提交給TaskManager執行。

sesison模式將啟動一個長期運行的Flink集群,這個集群可以運行多個job,需要手動停止集群。如果以session模式啟動,Flink將會連接到YARN的ResourceManager,然后啟動一個master進程,包括一個Dispatcher線程和一個Flink的ResourceManager的線程。下圖展示了一個Flink YARN session的啟動。

當一個作業被提交運行,分發器將啟動一個作業管理器線程,這個線程將向Flink的資源管理器請求所需要的slots。如果沒有足夠的slots,Flink的資源管理器將向YARN的資源管理器請求額外的容器,來啟動TaskManager進程,並在Flink的資源管理器中注冊。一旦所需slots可用,Flink的資源管理器將把slots分配給作業管理器,然后開始執行job。下圖展示了job如何在session模式下執行。

無論是作業模式還是會話模式,Flink的ResourceManager都會自動對故障的TaskManager進行重啟。你可以通過./conf/flink-conf.yaml配置文件來控制Flink在YARN上的故障恢復行為。例如,可以配置有多少容器發生故障后終止應用。

無論使用job模式還是sesison模式,都需要能夠訪問Hadoop。

job模式可以用以下命令來提交任務:

 
$ ./bin/flink run -m yarn-cluster ./path/to/job.jar

參數-m用來定義提交作業的目標主機。如果加上關鍵字"yarn-cluster",客戶端會將作業提交到由Hadoop配置所指定的YARN集群上。Flink的CLI客戶端還支持很多參數,例如用於控制TaskManager容器內存大小的參數等。有關它們的詳細信息,請參閱文檔。Flink集群的Web UI由YARN集群某個節點上的主進程負責提供。你可以通過YARN的Web UI對其進行訪問,具體鏈接位置在"Tracking URL: ApplicationMaster"下的Application Overview頁面上。

session模式則是

 
$ ./bin/yarn-session.sh # 啟動一個yarn會話 $ ./bin/flink run ./path/to/job.jar # 向會話提交作業 

Flink的Web UI鏈接可以從YARN Web UI的Application Overview頁面上找到。

2 高可用配置

Flink的高可用配置需要Apache ZooKeeper組件,以及一個分布式文件系統,例如HDFS等等。作業管理器將會把相關信息都存儲在文件系統中,並將指向文件系統中相關信息的指針保存在ZooKeeper中。一旦失敗,一個新的作業管理器將從ZooKeeper中指向相關信息的指針所指向的文件系統中讀取元數據,並恢復運行。

配置文件編寫

high-availability.zookeeper.quorum: address1:2181[,...],addressX:2181
high-availability.storageDir: hdfs:///flink/recovery
high-availability.zookeeper.path.root: /flink

2.1 獨立集群高可用配置

需要在配置文件中加一行集群標識符信息,因為可能多個集群共用一個zookeeper服務。

 
high-availability.cluster-id: /cluster-1

2.2 yarn集群高可用配置

首先在yarn集群的配置文件yarn-site.xml中加入以下代碼

<property>
  <name>yarn.resourcemanager.am.max-attempts</name>
  <value>4</value>
  <description>
    The maximum number of application master execution attempts.
    Default value is 2, i.e., an application is restarted at most once.
  </description>
</property>

然后在./conf/flink-conf.yaml加上

 
yarn.application-attempts: 4

 

3 與Hadoop集成

推薦兩種方法

  1. 下載包含hadoop的Flink版本。
  2. 使用我們之前下載的Flink,然后配置Hadoop的環境變量。 export HADOOP_CLASSPATH={hadoop classpath}

我們還需要提供Hadoop配置文件的路徑。只需設置名為HADOOP_CONF_DIR的環境變量就可以了。這樣Flink就能夠連上YARN的ResourceManager和HDFS了。

4 保存點操作

$ ./bin/flink savepoint <jobId> [savepointPath]

例如

復制代碼
$ ./bin/flink savepoint bc0b2ad61ecd4a615d92ce25390f61ad \
hdfs:///xxx:50070/savepoints
Triggering savepoint for job bc0b2ad61ecd4a615d92ce25390f61ad.
Waiting for response...
Savepoint completed. 
Path: hdfs:///xxx:50070/savepoints/savepoint-bc0b2a-63cf5d5ccef8
You can resume your program from this savepoint with the run command.
復制代碼

刪除保存點文件

$ ./bin/flink savepoint -d <savepointPath>

例子

$ ./bin/flink savepoint -d \
hdfs:///xxx:50070/savepoints/savepoint-bc0b2a-63cf5d5ccef8
Disposing savepoint 'hdfs:///xxx:50070/savepoints/savepoint-bc0b2a-63cf5d5ccef8'.
Waiting for response...
​Savepoint 'hdfs:///xxx:50070/savepoints/savepoint-bc0b2a-63cf5d5ccef8' disposed.

5 取消一個應用

$ ./bin/flink cancel <jobId>

取消的同時做保存點操作

$ ./bin/flink cancel -s [savepointPath] <jobId>

例如

$ ./bin/flink cancel -s \
hdfs:///xxx:50070/savepoints d5fdaff43022954f5f02fcd8f25ef855
Cancelling job bc0b2ad61ecd4a615d92ce25390f61ad 
with savepoint to hdfs:///xxx:50070/savepoints.
Cancelled job bc0b2ad61ecd4a615d92ce25390f61ad. 
Savepoint stored in hdfs:///xxx:50070/savepoints/savepoint-bc0b2a-d08de07fbb10.

6 從保存點啟動應用程序

$ ./bin/flink run -s <savepointPath> [options] <jobJar> [arguments]

7 擴容,改變並行度操作

$ ./bin/flink modify <jobId> -p <newParallelism>

例子

$ ./bin/flink modify bc0b2ad61ecd4a615d92ce25390f61ad -p 16
Modify job bc0b2ad61ecd4a615d92ce25390f61ad.
​Rescaled job bc0b2ad61ecd4a615d92ce25390f61ad. Its new parallelism is 16.

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM