1 部署方式
1.1 獨立集群
獨立集群包含至少一個master進程,以及至少一個TaskManager進程,TaskManager進程運行在一台或者多台機器上。所有的進程都是JVM進程。下圖展示了獨立集群的部署。
master進程在不同的線程中運行了一個Dispatcher和一個ResourceManager。一旦它們開始運行,所有TaskManager都將在Resourcemanager中進行注冊。下圖展示了一個任務如何提交到一個獨立集群中去。
客戶端向Dispatcher提交了一個任務,Dispatcher將會啟動一個作業管理器線程,並提供執行所需的JobGraph。作業管理器向ResourceManager請求必要的task slots。一旦請求的slots分配好,作業管理器就會部署job。
在standalone這種部署方式中,master和worker進程在失敗以后,並不會自動重啟。如果有足夠的slots可供使用,job是可以從一次worker失敗中恢復的。只要我們運行多個worker就好了。但如果job想從master失敗中恢復的話,則需要進行高可用(HA)的配置了。
部署步驟
下載壓縮包
鏈接:http://mirror.bit.edu.cn/apache/flink/flink-1.11.0/flink-1.11.0-bin-scala_2.11.tgz
解壓縮
$ tar xvfz flink-1.11.0-bin-scala_2.11.tgz
啟動集群
$ cd flink-1.11.0 $ ./bin/start-cluster.sh
檢查集群狀態可以訪問:http://localhost:8081
部署分布式集群
- 所有運行TaskManager的機器的主機名(或者IP地址)都需要寫入
./conf/slaves
文件中。 start-cluster.sh
腳本需要所有機器的無密碼的SSH登錄配置,方便啟動TaskManager進程。- Flink的文件夾在所有的機器上都需要有相同的絕對路徑。
- 運行master進程的機器的主機名或者IP地址需要寫在
./conf/flink-conf.yaml
文件的jobmanager.rpc.address
配置項。
一旦部署好,我們就可以調用./bin/start-cluster.sh
命令啟動集群了,腳本會在本地機器啟動一個作業管理器,然后在每個slave機器上啟動一個TaskManager。停止運行,請使用./bin/stop-cluster.sh
。
1.2 Apache Hadoop Yarn
YARN是Apache Hadoop的資源管理組件。用來計算集群環境所需要的CPU和內存資源,然后提供給應用程序請求的資源。
Flink在YARN上運行,有兩種模式:job模式和session模式。在job模式中,Flink集群用來運行一個單獨的job。一旦job結束,Flink集群停止,並釋放所有資源。下圖展示了Flink的job如何提交到YARN集群。
當客戶端提交任務時,客戶端將建立和YARN ResourceManager的連接,然后啟動一個新的YARN應用的master進程,進程中包含一個作業管理器線程和一個ResourceManager。作業管理器向ResourceManager請求所需要的slots,用來運行Flink的job。接下來,Flink的ResourceManager將向Yarn的ResourceManager請求容器,然后啟動TaskManager進程。一旦啟動,TaskManager會將slots注冊在Flink的ResourceManager中,Flink的ResourceManager將把slots提供給作業管理器。最終,作業管理器把job的任務提交給TaskManager執行。
sesison模式將啟動一個長期運行的Flink集群,這個集群可以運行多個job,需要手動停止集群。如果以session模式啟動,Flink將會連接到YARN的ResourceManager,然后啟動一個master進程,包括一個Dispatcher線程和一個Flink的ResourceManager的線程。下圖展示了一個Flink YARN session的啟動。
當一個作業被提交運行,分發器將啟動一個作業管理器線程,這個線程將向Flink的資源管理器請求所需要的slots。如果沒有足夠的slots,Flink的資源管理器將向YARN的資源管理器請求額外的容器,來啟動TaskManager進程,並在Flink的資源管理器中注冊。一旦所需slots可用,Flink的資源管理器將把slots分配給作業管理器,然后開始執行job。下圖展示了job如何在session模式下執行。
無論是作業模式還是會話模式,Flink的ResourceManager都會自動對故障的TaskManager進行重啟。你可以通過./conf/flink-conf.yaml
配置文件來控制Flink在YARN上的故障恢復行為。例如,可以配置有多少容器發生故障后終止應用。
無論使用job模式還是sesison模式,都需要能夠訪問Hadoop。
job模式可以用以下命令來提交任務:
$ ./bin/flink run -m yarn-cluster ./path/to/job.jar
參數-m
用來定義提交作業的目標主機。如果加上關鍵字"yarn-cluster"
,客戶端會將作業提交到由Hadoop配置所指定的YARN集群上。Flink的CLI客戶端還支持很多參數,例如用於控制TaskManager容器內存大小的參數等。有關它們的詳細信息,請參閱文檔。Flink集群的Web UI由YARN集群某個節點上的主進程負責提供。你可以通過YARN的Web UI對其進行訪問,具體鏈接位置在"Tracking URL: ApplicationMaster"下的Application Overview頁面上。
session模式則是
$ ./bin/yarn-session.sh # 啟動一個yarn會話 $ ./bin/flink run ./path/to/job.jar # 向會話提交作業
Flink的Web UI鏈接可以從YARN Web UI的Application Overview頁面上找到。
2 高可用配置
Flink的高可用配置需要Apache ZooKeeper組件,以及一個分布式文件系統,例如HDFS等等。作業管理器將會把相關信息都存儲在文件系統中,並將指向文件系統中相關信息的指針保存在ZooKeeper中。一旦失敗,一個新的作業管理器將從ZooKeeper中指向相關信息的指針所指向的文件系統中讀取元數據,並恢復運行。
配置文件編寫
high-availability.zookeeper.quorum: address1:2181[,...],addressX:2181 high-availability.storageDir: hdfs:///flink/recovery high-availability.zookeeper.path.root: /flink
2.1 獨立集群高可用配置
需要在配置文件中加一行集群標識符信息,因為可能多個集群共用一個zookeeper服務。
high-availability.cluster-id: /cluster-1
2.2 yarn集群高可用配置
首先在yarn集群的配置文件yarn-site.xml
中加入以下代碼
<property> <name>yarn.resourcemanager.am.max-attempts</name> <value>4</value> <description> The maximum number of application master execution attempts. Default value is 2, i.e., an application is restarted at most once. </description> </property>
然后在./conf/flink-conf.yaml
加上
yarn.application-attempts: 4
3 與Hadoop集成
推薦兩種方法
- 下載包含hadoop的Flink版本。
- 使用我們之前下載的Flink,然后配置Hadoop的環境變量。
export HADOOP_CLASSPATH={hadoop classpath}
我們還需要提供Hadoop配置文件的路徑。只需設置名為HADOOP_CONF_DIR
的環境變量就可以了。這樣Flink就能夠連上YARN的ResourceManager和HDFS了。
4 保存點操作
$ ./bin/flink savepoint <jobId> [savepointPath]
例如
$ ./bin/flink savepoint bc0b2ad61ecd4a615d92ce25390f61ad \ hdfs:///xxx:50070/savepoints Triggering savepoint for job bc0b2ad61ecd4a615d92ce25390f61ad. Waiting for response... Savepoint completed. Path: hdfs:///xxx:50070/savepoints/savepoint-bc0b2a-63cf5d5ccef8 You can resume your program from this savepoint with the run command.
刪除保存點文件
$ ./bin/flink savepoint -d <savepointPath>
例子
$ ./bin/flink savepoint -d \ hdfs:///xxx:50070/savepoints/savepoint-bc0b2a-63cf5d5ccef8 Disposing savepoint 'hdfs:///xxx:50070/savepoints/savepoint-bc0b2a-63cf5d5ccef8'. Waiting for response... Savepoint 'hdfs:///xxx:50070/savepoints/savepoint-bc0b2a-63cf5d5ccef8' disposed.
5 取消一個應用
$ ./bin/flink cancel <jobId>
取消的同時做保存點操作
$ ./bin/flink cancel -s [savepointPath] <jobId>
例如
$ ./bin/flink cancel -s \ hdfs:///xxx:50070/savepoints d5fdaff43022954f5f02fcd8f25ef855 Cancelling job bc0b2ad61ecd4a615d92ce25390f61ad with savepoint to hdfs:///xxx:50070/savepoints. Cancelled job bc0b2ad61ecd4a615d92ce25390f61ad. Savepoint stored in hdfs:///xxx:50070/savepoints/savepoint-bc0b2a-d08de07fbb10.
6 從保存點啟動應用程序
$ ./bin/flink run -s <savepointPath> [options] <jobJar> [arguments]
7 擴容,改變並行度操作
$ ./bin/flink modify <jobId> -p <newParallelism>
例子
$ ./bin/flink modify bc0b2ad61ecd4a615d92ce25390f61ad -p 16 Modify job bc0b2ad61ecd4a615d92ce25390f61ad. Rescaled job bc0b2ad61ecd4a615d92ce25390f61ad. Its new parallelism is 16.