一、手動下載安裝包
wget http://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.6.1/flink-1.6.1-bin-hadoop27-scala_2.11.tgz
二、解壓
tar -zxvf flink-1.6.1-bin-hadoop27-scala_2.11.tgz
節點名稱 | master | worker | zookeeper |
cent-1 | master | zookeeper | |
cent-2 | master | worker | zookeeper |
cent-3 | worker | zookeeper |
三、修改flink/conf/masters,slaves,flink-conf.yaml
vi masters
cent-1:8081
vi slaves
cent-2 cent-3
vi flink-conf.yaml
taskmanager.numberOfTaskSlots:2 jobmanager.rpc.address: cent-1
可選配置:
- 每個JobManager(
jobmanager.heap.mb
)的可用內存量, - 每個TaskManager(
taskmanager.heap.mb
)的可用內存量, - 每台機器的可用CPU數量(
taskmanager.numberOfTaskSlots
), - 集群中的CPU總數(
parallelism.default
)和 - 臨時目錄(
taskmanager.tmp.dirs
)
四、拷貝到其他節點
scp -r flink-1.6.1/ admin@cent-2:`pwd`
scp -r flink-1.6.1/ admin@cent-3:`pwd`
五、配置環境變量,每個節點都要配置
vi /etc/profile export FLINK_HOME=/opt/module/flink-1.6.1 export PATH=$PATH:$FLINK_HOME/bin
source /etc/profile
六、啟動flink
./bin/start-cluster.sh
登錄web查看狀態
http://cent-1:8081
七、修改配置文件
修改flink-conf.yaml,HA模式下,jobmanager不需要指定,在master file中配置,由zookeeper選出leader與standby。
#jobmanager.rpc.address: cent-1 high-availability:zookeeper #指定高可用模式(必須) high-availability.zookeeper.quorum:cent-1:2181,cent-2:2181,cent-3:2181 #ZooKeeper仲裁是ZooKeeper服務器的復制組,它提供分布式協調服務(必須) high-availability.storageDir:hdfs:///flink/ha/ #JobManager元數據保存在文件系統storageDir中,只有指向此狀態的指針存儲在ZooKeeper中(必須) high-availability.zookeeper.path.root:/flink #根ZooKeeper節點,在該節點下放置所有集群節點(推薦) high-availability.cluster-id:/flinkCluster #自定義集群(推薦) state.backend: filesystem state.checkpoints.dir: hdfs:///flink/checkpoints state.savepoints.dir: hdfs:///flink/checkpoints
修改conf/zoo.cfg
server.1=cent-1:2888:3888 server.2=cent-2:2888:3888 server.3=cent-3:2888:3888
修改conf/masters
cent-1:8081 cent-2:8081
配置信息要同步到各個節點
八、先啟動zookeeper集群各節點(測試環境中也可以用Flink自帶的start-zookeeper-quorum.sh),啟動dfs ,再啟動flink
start-cluster.sh
手動將JobManager / TaskManager實例添加到群集
使用bin/jobmanager.sh
和bin/taskmanager.sh
腳本將JobManager和TaskManager實例添加到正在運行的集群中。
添加JobManager
bin/jobmanager.sh ((start|start-foreground) [host] [webui-port])|stop|stop-all
添加TaskManager
bin/taskmanager.sh start|start-foreground|stop|stop-all
jobmanager.sh start cent-2
九、Yarn Cluster模式
配置環境變量
export HADOOP_CONF_DIR= /opt/module/hadoop-3.2.0/etc/hadoop
啟動
yarn-session.sh -d -s 2 -tm 800 -n 2
./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar --input /opt/wcinput/wc.txt --output /opt/wcoutput/
以上命令在參數前加上y前綴,-yn表示TaskManager個數。
在這個模式下,同樣可以使用-m yarn-cluster提交一個"運行后即焚"的detached yarn(-yd)作業到yarn cluster
2.停止yarn cluster
yarn application -kill application_1539058959130_0001
3.Yarn模式HA
應用最大嘗試次數(yarn-site.xml),您必須配置為嘗試應用的最大數量的設置yarn-site.xml,當前YARN版本的默認值為2(表示允許單個JobManager失敗)。
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>4</value>
<description>The maximum number of application master execution attempts</description>
</property>
高可用的Yarn會話
-
配置HA模式和zookeeper法定人數在
conf/flink-conf.yaml
:high-availability: zookeeper high-availability.zookeeper.quorum: node21:2181,node22:2181,node23:2181 high-availability.storageDir: hdfs:///flink/recovery high-availability.zookeeper.path.root: /flink yarn.application-attempts: 10
-
配置ZooKeeper的服務器中
conf/zoo.cfg
(目前它只是可以運行每台機器的單一的ZooKeeper服務器):server.1=cent-1:2888:3888 server.2=cent-2:2888:3888 server.3=cent-3:2888:3888
-
啟動ZooKeeper仲裁:
./ start-zookeeper-quorum.sh
-
啟動HA群集:
./ yarn-session.sh -n 2
可選配置:
- 每個JobManager(
jobmanager.heap.mb
)的可用內存量, - 每個TaskManager(
taskmanager.heap.mb
)的可用內存量, - 每台機器的可用CPU數量(
taskmanager.numberOfTaskSlots
), - 集群中的CPU總數(
parallelism.default
)和 - 臨時目錄(
taskmanager.tmp.dirs
)