flink從flink1.10版本對其內存結構發生改變,所以在環境配置的時候也要主要具體怎么配置比較合適。
內存結構可以看官網:https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_setup.html
Standalone模式官網:https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/cluster_setup.html
基於yarn模式官網:https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html
一、環境准備
1.1、java環境
下載java安裝包,通過tar -zxvf 命令解壓java壓縮包,然后配置java變量環境
vim /etc/profile ##########在后面追加############## export JAVA_HOME=/usr/java/default export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export PATH=$JAVA_HOME/bin
1.2、ssh環境
准備幾台機器
#准備機器 /etc/hosts 192.168.88.130 lgh 192.168.88.131 lgh1 192.168.88.132 lgh2
添加flink group和flink user
useradd flink -d /home/flink echo "flink123" | passwd flink --stdin
然后對該用戶配置ssh環境(在192.168.88.130,指定一台操作)
su - flink ssh-keygen -t rsa ssh-copy-id 192.168.88.131 ssh-copy-id 192.168.88.132
1.3、zookeeper環境
下載zookeeper的安裝包進行安裝
##解壓 tar -zxvf zookeeper-3.4.8.tar.gz -C xxx目錄 ##創建軟鏈接(每一台機器) ln -s zookeeper-3.4.8 zookeeper
配置環境變量
vim ~/.bashrc export ZOOKEEPER_HOME=/home/spark/zookeeper export PATH=$ZOOKEEPER_HOME/bin:$ZOOKEEPER_HOME/conf:$PATH
修改配置
cd /home/spark/zookeeper/conf cp zoo_sample.cfg zoo.cfg vim zoo.cfg ###########修改配置如下################## tickTime=2000 #服務器與客戶端之間交互的基本時間單元(ms) initLimit=10 # 此配置表示允許follower連接並同步到leader的初始化時間,它以tickTime的倍數來表示。當超過設置倍數的tickTime時間,則連接失敗 syncLimit=5 # Leader服務器與follower服務器之間信息同步允許的最大時間間隔,如果超過次間隔,默認follower服務器與leader服務器之間斷開鏈接 dataDir=/home/spark/zookeeper/data #保存zookeeper數據路徑 dataLogDir=/home//spark/zookeeper/dataLog #保存zookeeper日志路徑,當此配置不存在時默認路徑與dataDir一致 clientPort=2181 #客戶端訪問zookeeper時經過服務器端時的端口號 server.1=lgh:2888:3888 #表示了不同的zookeeper服務器的自身標識,作為集群的一部分,每一台服務器應該知道其他服務器的信息 server.2=lgh1:2888:3888 server.3=lgh2:2888:3888 maxClientCnxns=60 #限制連接到zookeeper服務器客戶端的數量
創建myid文件
cd /home/spark/zookeeper/data vim myid #輸入1
復制到其他機器以及啟動
##復制到其他機器 scp -r zookeeper-3.4.8 spark@lgh1:/home/spark/ scp -r zookeeper-3.4.8 spark@lgh2:/home/spark/ #修改myid文件 不同機器數字不一樣,分別為2和3 ##啟動 zkServer.sh start #查看狀態 zkServer.sh status #查看進程 jps QuorumPeerMain
二、安裝flink
2.1、基礎安裝配置
下載:https://flink.apache.org/downloads.html#apache-flink-1101
下載如上兩個兩個包(第二個包對應hadoop的版本),然后對flink的通過過tar命令進行解壓,把第二個包放在flink包的lib目錄下,如下所示
[flink@lgh01 lib]$ pwd /home/flink/flink10/lib [flink@lgh01 lib]$ ll | grep hadoop -rwxrwxrwx 1 mstream hive 80331 Apr 16 14:20 flink-hadoop-compatibility_2.11-1.10.0.jar -rwxrwxrwx 1 mstream hive 36433393 Apr 16 14:20 flink-shaded-hadoop-2-uber-2.6.5-8.0.jar
然后配置hadoop相關的環境變量和flink環境變量
vim /etc/profile #########后面追加內容######### export PATH=/apps/opt/cloudera/parcels/CDH/bin:$PATH export HADOOP_HOME=/apps/opt/cloudera/parcels/CDH/lib/hadoop export HADOOP_CONF_DIR=/etc/hadoop/conf export YARN_CONF_DIR=/etc/hadoop/conf export PATH=$PATH:/apps/opt/cloudera/parcels/CDH/bin export CLASSPATH=$CLASSPATH:/apps/opt/cloudera/parcels/CDH/jars/:/utf/ #export FLINK_HOME=/apps/flink/flink #export PATH=$FLINK_HOME/bin:$PATH export FLINK_HOME=/apps/mstream/install/flink10 export PATH=$FLINK_HOME/bin:$PATH
然后修改flinkxxx/conf/flink-conf.yaml這個配置文件:
jobmanager.rpc.address: 192.168.88.130 jobmanager.rpc.port: 6124 jobmanager.heap.size: 1024m taskmanager.heap.size: 1024m taskmanager.numberOfTaskSlots: 2 #根據自己的CPU core進行配置,lscpu可以查看cpu的核數 cluster.evenly-spread-out-slots: true env.java.home: /usr/java/default #可以不配置 parallelism.default: 2 high-availability: zookeeper high-availability.zookeeper.path.root: /flink high-availability.storageDir: hdfs:///user/flink/ha/ high-availability.zookeeper.quorum: lgh1:2181,lgh2:2181,lgh3:2181 #high-availability.cluster-id: /cluster_one # important: customize per cluster #這個參數在yarn模式下不能配置 rest.port: 8081 rest.bind-port: 8080-8180 #這里啟動Standalone集群的時候會在日志中選擇端口,而不一定就是8081,所以登錄的網址就是看日志 taskmanager.memory.process.size: 2048m #flink1.10內存結構變化之后必須要配置三種其中的一個,根據集群進行相應的配置,詳情見官網 jobmanager.execution.failover-strategy: region #checkpoint state.checkpoints.dir: hdfs:///user/flink/checkpoint state.checkpoints.num-retained: 20 #savepoint state.savepoints.dir: hdfs:///user/flink/savepoints #stateful state.backend: filesystem #使用FsStateBackend(生產推薦使用rocksdb) state.backend.fs.checkpointdir: hdfs:///user/flink/pointsdata/ state.backend.incremental: true jobmanager.archive.fs.dir: hdfs:///user/flink/flink-jobs/ historyserver.web.address: 192.168.88.130 historyserver.web.port: 8083 historyserver.archive.fs.dir: hdfs:///user/flink/historyserver historyserver.archive.fs.refresh-interval: 10000 blob.storage.directory: /tmp/
配置conf/masters
192.168.88.130:8081 192.168.88.131:8081
配置conf/slaves
192.168.88.130 192.168.88.131 192.168.88.132
然后將flink的安裝包通過scp -r 命令復制到其他的節點
2.2、日志相關配置(可選)
JobManager和TaskManager的啟動日志可以在Flinkbinary目錄下的log子目錄中找到:
-rw-r--r-- 1 flink flink 0 Mar 25 11:43 flink-flink-standalonesession-8-lgh01.out.1 -rw-r--r-- 1 flink flink 0 Mar 23 16:31 flink-flink-standalonesession-8-lgh01.out.2 -rw-r--r-- 1 flink flink 0 Mar 23 15:18 flink-flink-standalonesession-8-lgh01.out.3 -rw-r--r-- 1 flink flink 0 Mar 23 14:55 flink-flink-standalonesession-8-lgh01.out.4 -rw-r--r-- 1 flink flink 216520 May 5 16:38 flink-flink-taskexecutor-0-lgh01.log -rw-r--r-- 1 flink flink 14191242 Apr 19 00:05 flink-flink-taskexecutor-0-lgh01.log.1 -rw-r--r-- 1 flink flink 821762 Apr 16 12:24 flink-flink-taskexecutor-0-lgh01.log.2 -rw-r--r-- 1 flink flink 0 Apr 28 11:42 flink-flink-taskexecutor-0-lgh01.out
目錄中以“flink-${user}-standalonesession-${id}-${hostname}”為前綴的文件對應的即是JobManager 的輸出,其中有三個文件:
- flink-${user}-standalonesession-${id}-${hostname}.log:代碼中的日志輸出
- flink-${user}-standalonesession-${id}-${hostname}.out:進程執行時的 stdout 輸出
- flink-${user}-standalonesession-${id}-${hostname}-gc.log:JVM 的 GC 的日志
目錄中以“flink-${user}-taskexecutor-${id}-${hostname}”為前綴的文件對應的是 TaskManager的輸出,也包括三個文件,和 JobManager 的輸出一致。
日志配置文件在flink安裝包的conf目錄下,如下:
[flink@lgh01 conf]$ pwd /home/flink/flink10/conf [flink@lgh01 conf]$ ll total 56 -rw-r--r-- 1 flink flink 1187 Mar 25 11:43 flink-conf.yaml -rw-r--r-- 1 flink flink 2138 Jan 24 17:01 log4j-cli.properties -rw-r--r-- 1 flink flink 1884 Jan 24 17:01 log4j-console.properties -rw-r--r-- 1 flink flink 1939 Jan 24 17:01 log4j.properties -rw-r--r-- 1 flink flink 1709 Jan 24 17:01 log4j-yarn-session.properties -rw-r--r-- 1 flink flink 2294 Jan 24 17:01 logback-console.xml -rw-r--r-- 1 flink flink 2331 Jan 24 17:01 logback.xml -rw-r--r-- 1 flink flink 1550 Jan 24 17:01 logback-yarn.xml -rw-r--r-- 1 flink flink 36 Mar 16 15:15 masters -rw-r--r-- 1 flink flink 74 Feb 13 09:51 slaves -rw-r--r-- 1 flink flink 5484 Apr 28 14:08 sql-client-defaults.yaml -rw-r--r-- 1 flink flink 1541 Feb 28 16:38 sql-client-hive.yaml -rw-r--r-- 1 flink flink 1434 Jan 24 17:01 zoo.cfg
其中:
- log4j-cli.properties:用Flink命令行時用的log配置,比如執行“flinkrun”命令
- log4j-yarn-session.properties:是用yarn-session.sh啟動時命令行執行時用的log配置
- log4j.properties:無論是standalone還是yarn模式,JobManager和TaskManager上用的log配置都是log4j.properties
這三個“log4j.*properties”文件分別有三個“logback.*xml”文件與之對應,如果想使用logback的同學,之需要把與之對應的“log4j.*properties”文件刪掉即可,對應關系如下:
- log4j-cli.properties->logback-console.xml
- log4j-yarn-session.properties->logback-yarn.xml
- log4j.properties->logback.xml
需要注意的是,“flink-${user}-standalonesession-${id}-${hostname}”和“flink-${user}-taskexecutor-${id}-${hostname}”都帶有“${id}”,“${id}”表示本進程在本機上該角色(JobManager
或TaskManager)的所有進程中的啟動順序,默認從0開始。
2.3、啟動相關
Standalone集群:在打通ssh的節點執行命令: start-cluster.sh即可,然后可以通過jps命令查看相關進程
yarn-session模式:后台啟動yarn-session.sh即可,這里可以配置很多參數,可以用yarn-session.sh --help查看,比如:
./bin/yarn-session.sh -jm 1024m -tm 4096m
[flink@lgh01bin]$ ./yarn-session.sh --help Usage: Optional -at,--applicationType <arg> Set a custom application type for the application on YARN -D <property=value> use value for given property -d,--detached If present, runs the job in detached mode -h,--help Help for the Yarn session CLI. -id,--applicationId <arg> Attach to running YARN session -j,--jar <arg> Path to Flink jar file -jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB) -m,--jobmanager <arg> Address of the JobManager (master) to which to connect. -nl,--nodeLabel <arg> Specify YARN node label for the YARN application -nm,--name <arg> Set a custom name for the application on YARN -q,--query Display available YARN resources (memory, cores) -qu,--queue <arg> Specify YARN queue. -s,--slots <arg> Number of slots per TaskManager -t,--ship <arg> Ship files in the specified directory (t for transfer) -tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB) -yd,--yarndetached If present, runs the job in detached mode (deprecated; use non-YARN specific option instead) -z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for high availability mode
yarn per-job模式:不要啟動任何啥,通過flink run -m yarn-cluster命令進行提交任務即可,比如:
./bin/flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m ./examples/batch/WordCount.jar
如果有小伙伴也在學習研究flink的話,可以關注下,后期會更新flink相關的基礎和flink的相關源碼分析
參考:
https://files.alicdn.com/tpsservice/4824447b829149c86bedd19424d05915.pdf