Flink 運行方式
三種運行方式(與spark相似):
1、local 本地測試
2、Standallone Cluster 獨立集群(做實時計算,不需要hadoop,該獨立集群可能用的上)
3、Flink on Yarn 推薦
Standallone Cluster 獨立集群
獨立集群是不依賴hadoop的,所以可以先停掉 Hadoop
注意:獨立集群的搭建需要配置 JAVA_HOME 和 免密登錄
1、上傳、解壓、配置環境變量
#進入壓縮包所在目錄
cd /usr/local/soft
#解壓
tar -zxvf /usr/local/soft/flink-1.11.2-bin-scala_2.11.tgz -C /usr/local/soft/
#配置環境變量
vim /etc/profile
#添加
export FLINK_HOME=/usr/local/soft/flink-1.11.2
export PATH=$PATH:$FLINK_HOME/bin
#刷新
source /etc/profile
2、修改配置文件
在此之前我們先來簡單了解一下Flink集群的架構
vim /usr/local/soft/flink-1.11.2/conf/flink-conf.yaml
# (修改)指定主節點ip地址
jobmanager.rpc.address: master
vim workers
# (修改)指定從節點
node1
node2
vim masters
# 改成主節點master
master:8081
3、同步到所有節點
scp -r /usr/local/soft/flink-1.11.2/ node1:`pwd`
scp -r /usr/local/soft/flink-1.11.2/ node2:`pwd`
4、啟動(停止)集群
Flink集群的命令都在 bin 目錄下,不記得可以去找
所有主從架構的集群都是在主節點操作命令
# 啟動
start-cluster.sh
# 停止
stop-cluster.sh
訪問Flink web界面
http://master:8081
Flink提交任務的三種方式
1、在web頁面中提交
以昨天WordCount代碼為例(代碼不需要改),將之打成jar包
2、同flink命令提交任務
將jar包上傳至集群,提交任務
flink run -c com.shujia.flink.soure.Demo4ReadKafka flink-1.0.jar
com.shujia.flink.soure.Demo4ReadKafka -- 主類名
flink-1.0.jar -- jar包名
3、rpc方式提交任務 --- 遠程提交
用的較少
代碼寫完之后需要先打包,再運行
Flink框架報錯有一個特點:前幾條報錯原因都是廢話,要從后面看
package com.shujia.flink.core
import org.apache.flink.streaming.api.scala._
object Demo2RpcSubmit {
def main(args: Array[String]): Unit = {
/**
* 創建遠程環境,遠程提交flink任務
*
*/
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createRemoteEnvironment(
//主機名
"master",
//端口號
8081,
//指定jar包的路徑
"C:\\Users\\qx\\IdeaProjects\\bigdata14\\flink\\target\\flink-1.0.jar"
)
val linesDS: DataStream[String] = env.socketTextStream("master", 8888)
//1、將數據展開
val wordsDS: DataStream[String] = linesDS.flatMap(line => line.split(","))
//2、轉換成kv格式
val kvDS: DataStream[(String, Int)] = wordsDS.map((_, 1))
//3、按照單詞進行分組
val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(kv => kv._1)
//4、統計數量,對value進行求和, 指定下標進行聚合
val countDS: DataStream[(String, Int)] = keyByDS.sum(1)
//打印結果
countDS.print()
env.execute("rpc")
}
}
Flink on Yarn
只需要部署一個節點 -- 在master中部署即可
需要先將獨立集群停掉
JAVA_HOME、免密登錄、上傳、解壓、配置環境變量 -- 這些配置都要做
1、配置HADOOP_CONF_DIR
export -- 全局生效
vim /etc/profile
#添加
export HADOOP_CONF_DIR=/usr/local/soft/hadoop-2.7.6/etc/hadoop/
source /etc/profile
2、將hadoop依賴jar上傳到flink lib目錄
cd /usr/local/soft/flink-1.11.2/lib
#jar包名
flink-shaded-hadoop-2-uber-2.6.5-10.0.jar
flink和spark一樣都是粗粒度資源申請
啟動方式
1、yarn-session 在yarn里面啟動一個flink集群---- 所有任務共享同一個jobManager(ApplicationMaster)
需要先啟動Hadoop:
start-all.sh
yarn-session是所有任務共享同一個jobmanager
# 在master中
# 開啟
yarn-session.sh -jm 1024m -tm 1096m
# -jm 1024m -- 指定JobManager的內存
# -tm 1096m -- 指定TaskManager的內存
# 啟動完畢之后會返回一個地址給我們--->Flink集群的web界面
# http://note01:46386
提交任務的三種方式
在master中輸入
nc -lk 8888
的時候,可能會出現 端口占用查看端口是否占用:
ps -aux | grep 8888
如果占用,將其殺死:
kill -9 編號
# 提交任務
# 任務提交的時候是根據並行度動態申請taskmanager
1、在web頁面提交任務
# 將代碼代碼打包好的jar包添加到web頁面
2、同flink命令提交任務
flink run -c com.shujia.flink.soure.Demo4ReadKafka flink-1.0.jar
3、rpc方式提交任務(遠程提交)
# 遠程提交,需要結合啟動集群返回的地址---http://node1:46386
# 修改代碼,創建環境的時候,將master改成node1,將8888改成46386
# 然后將代碼打包,然后再運行代碼就行了(不需要我們手動向集群中添加jar包了)
# 任務結束之后,資源被動態回收
# 關閉/停止 yarn-session
yarn application -kill application_1647657435495_0001
# application_1647657435495_0001 -- yarn-session的yarn上進程號
stop 也能退出
2、直接提交任務到yarn----每一個任務單獨使用一個jobManager
直接提交任務就不需要啟動
yarn-session
如果之前已啟動,需要將其殺死
yarn application -kill
直接提交任務到yarn,一個任務需要執行一次命令,一個任務會有一個 單獨的web頁面
flink run -m yarn-cluster -yjm 1024m -ytm 1096m -c com.shujia.flink.core.Demo1WordCount flink-1.0.jar
# -m yarn-cluster -- 指定提交模式
# -yjm 1024m -ytm 1096m -- 指定JobManager和TaskManager的內存
# -c com.shujia.flink.core.Demo1WordCount -- 指定主類名
# flink-1.0.jar -- 指定jar包名
# 殺掉yarn上的任務
yarn application -kill application_1599820991153_0005
# 查看日志
yarn logs -applicationId application_1647657435495_0002
yarn-session先在yarn中啟動一個jobMansager ,所有的任務共享一個jobmanager (提交任務更快,任務之間共享jobmanager , 相互有影響)
直接提交任務模型,為每一個任務啟動一個joibmanager (每一個任務獨立jobmanager , 任務運行穩定)
Flink讀取HDFS上的數據
package com.shujia.flink.core
import java.util.concurrent.TimeUnit
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala._
object Demo3FlinkOnHdfs {
def main(args: Array[String]): Unit = {
//創建flink環境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
/**
* 讀取hdfs中的數據 -- 有界流
*/
val studentDS: DataStream[String] = env.readTextFile("hdfs://master:9000/data/student")
val clazzNumDS: DataStream[(String, Int)] = studentDS
.map(stu => (stu.split(",")(4), 1))
.keyBy(_._1)
.sum(1)
/**
* 將數據保存到hdfs(改代碼可以通過官網查看,不需要死記)
*/
val sink: StreamingFileSink[(String, Int)] = StreamingFileSink
//指定保存路徑和數據的編碼格式
.forRowFormat(new Path("hdfs://master:9000/data/flink_clazz"), new SimpleStringEncoder[(String, Int)]("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toSeconds(15))
.withInactivityInterval(TimeUnit.MINUTES.toSeconds(5))
.withMaxPartSize(1024)
.build())
.build()
clazzNumDS.addSink(sink)
env.execute()
}
}
# 將代碼打包,然后提交到yarn上運行
# 執行結果可以通過日志查看,在web頁面只能查看無界流的任務運行結果