Flink Standalone模式部署集群是最簡單的一種部署方式,不依賴於其他的組件,另外還支持YARN/Mesos/Docker等模式下的部署,這里使用的flink版本為最新的穩定版1.9.1版本,對應的Scala版本是2.11,二進制包為:flink-1.9.1-bin-scala_2.11.tgz,即將安裝的環境為內網4個節點,其中1個jobmanager,3個taskmanager,角色分配如下:
bigdata1 - jobmanager
bigdata2, bigdata3, bigdata4 - taskmanager
老規矩,配置flink前必須做下面的基礎准備:
1). JDK環境,1.8.x或者更高,Oracle JDK或者OpenJDK都可以,二進制包解壓的方式安裝要配置好JAVA_HOME
2). 主機名和hosts配置文件集群內完全對應,准確配置.
3). 集群之間保證通信正常,關閉防火牆或者提前設置好規則.
4). 集群所有節點配置ssh免密,否則后續啟動集群的時候還需要輸入密碼.
5). 集群配置時間同步服務,ntp或者chrony服務,這個應該是大數據組件集群的標配
然后准備安裝flink,flink的安裝目錄為了便於維護所有的安裝位置都要一致,這里是/opt/flink-1.9.1,首先在其中1個節點bigdata1上開始配置:
解壓安裝包並進入安裝目錄: tar -xvzf flink-1.9.1-bin-scala_2.11.tgz -C /opt && cd /opt/flink-1.9.1
編輯配置文件:conf/flink-conf.yaml,對於獨立集群有下面的配置項需要修改:
jobmanager.rpc.address: bigdata1 jobmanager.rpc.port: 6123 jobmanager.heap.size: 2048m taskmanager.heap.size: 4096m taskmanager.numberOfTaskSlots: 4 parallelism.default: 1
簡單來看一下jobmanager.rpc.address表示jobmanager rpc通信綁定的地址,這里就是jobmanager的主機名
jobmanager.rpc.port是jobmanager的rpc端口,默認是6123
jobmanager.heap.size 這個是 jobmanager jvm進程的堆內存大小,默認是1024M,這里設置成2048m,也就是2G
taskmanager.heap.size 這個是taskmanager jvm進程的堆內存大小,也就是實際運行任務的jvm最大所能占用的堆內存,默認也是1024m,這里設置成4g
taskmanager.numberOfTaskSlots 這個表示每個taskmanager所能提供的slots數量,也就是flink節點的計算能力,這個和算子的並行度配合使用,每個slot運行1個pipeline[source,transformation,slink],多個slot使得flink的subtasks是並行的,這個一般設置成和機器cpu核數一致,比如我們這里是3個taskmanager,每個taskmanager是4個slots,那么這個集群的slots個數為12.
parallelism.default 這個是默認任務的並行度,也就是說當代碼中或者提交時沒指定並行度,則按照這里的並行度執行任務,並行度是按照整個集群來算的,比如上面slots個數為12,那么支持subtasks最大的並行度就是12,因為在代碼中通常會針對單個任務設置並行度,所以這里的默認並行度可以不設置.
上面這幾項是flink獨立集群最基本的配置,另外還有關於rest和web ui的配置,如果需要可以配置一下,我這里按照默認的配置:
web ui和jobmanager同時運行,端口默認為8081,可以根據需要修改,另外還有個參數:web.submit.enable,也就是是否可以從界面提交任務,默認是開啟的,取消注釋就可以關閉.
確認上面配置無誤之后,保存配置
然后配置masters和slaves的節點文件,
conf/masters,配置jobmanager的機器列表,這里獨立集群非HA模式下配置為:bigdata1:8081
conf/slaves,配置taskmanager的機器列表,這里配置如下:
確認上面配置正常全部保存,接下來就可以從bigdata1將配置好的flink分發到其他3個節點:
scp -r /opt/flink-1.9.1 bigdata2:/opt scp -r /opt/flink-1.9.1 bigdata3:/opt scp -r /opt/flink-1.9.1 bigdata4:/opt
然后從任意1個節點可以啟動集群,啟動命令為: bin/start-cluster.sh 執行之后jobmanager和taskmanager就全部啟動了,通過jps可以查看到相應的進程,jobmanager的進程為StandaloneSessionClusterEntrypoint,其余3個taskmanager的進程為TaskManagerRunner,然后可以訪問瀏覽器界面查看web ui,這里是:http://bigdata1:8081
這樣,flink standalone模式的集群就配置完成了
停止集群命令: bin/stop-cluster.sh 執行之后所有節點的進程都會停止
單個jobmanager的啟動或停止: bin/jobmanager.sh start|start-foreground|stop|stop-all
單個taskmanager的啟動或停止: bin/taskmanager.sh start|start-foreground|stop|stop-all
其中start-foreground是在前台啟動,單獨啟動的命令一方面可以用在cluster啟動或者停止失敗的時候執行,另一方面可以用於flink集群運行時向其中加入節點,主要是運行時添加配置好的taskmanager節點,這樣可以為集群動態擴容,添加之后稍微等一下,web界面就可以看到可用的slots和task managers數量發生變化了.
參考文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/ops/deployment/cluster_setup.html