說明:本文為《Flink大數據項目實戰》學習筆記,想通過視頻系統學習Flink這個最火爆的大數據計算框架的同學,推薦學習課程:
Flink大數據項目實戰:http://t.cn/EJtKhaz
從上圖可以看出Flink 中的Time大致分為以下三類:
1.Event Time:Event 真正產生的時間,我們稱之為Event Time。
2.Ingestion Time:Event 事件被Source拿到,進入Flink處理引擎的時間,我們稱之為Ingestion Time。
3.Window Processing Time:Event事件被Flink 處理(比如做windows操作)時的時間,我們稱之為Window Processing Time。
4. Stateful Operations
什么是狀態?
state一般指一個具體的task/operator的狀態,比如當前處理那些數據,數據處理的進度等等。
Flink state操作狀態分為兩類:
1.Operator State
Operator State跟一個特定operator的一個並發實例綁定,整個operator只對應一個state。
2.Keyed State
基於KeyedStream上的狀態。這個狀態是跟特定的key綁定的,對KeyedStream流上的每一個key,可能都對應一個state。
Flink 每個操作狀態又分為兩類:
Keyed State和Operator State可以以兩種形式存在:原始狀態和托管狀態( Flink框架管理的狀態)。
1.原始狀態:比如一個字符串或者數組,它需要序列化,保存到內存或磁盤,或者外部存儲中,這就是它的原始狀態。
2.托管狀態:比如數據放在Hash表中,或者放在HDFS中,或者放在rocksdb中,這種就是托管狀態。當需要處理數據的時候,從托管狀態中讀取出來,還原成原始狀態,甚至變量和集合,然后再進行處理。
5.Checkpoints(備份)
什么是checkpoint?
所謂checkpoint,就是在某一時刻,將所有task的狀態做一個快照(snapshot),然后存儲到State Backend(比如hdfs)。checkpoint擁有輕量級容錯機制,可以保證exactly-once 語義,用於內部失敗的恢復(比如當應用掛了,它可以自動恢復從上次的進度接着執行)。
checkpoint基本原理:通過往source 注入barrier(可以理解為特殊的Event),barrier作為checkpoint的標志,它會自動做checkpoint無需人工干預。
6.Savepoint
savepoint是流處理過程中的狀態歷史版本,它具有可以replay的功能。用於外部恢復,當Flink應用重啟和升級,它會做一個先做一個savepoint,下次應用啟動可以接着上次進度執行。
savepoint兩種觸發方式:
1.Cancel with savepoint
2.手動主動觸發
savepoint可以理解為是一種特殊的checkpoint,savepoint就是指向checkpoint的一個指針,需要手動觸發,而且不會過期,不會被覆蓋,除非手動刪除。正常情況下的線上環境是不需要設置savepoint的。除非對job或集群做出重大改動的時候,需要進行測試運行。
(4)Flink Runtime
1. Flink運行時架構
1.1Flink架構
Flink 運行時架構主要包含幾個部分:Client、JobManager(master節點)和TaskManger(slave節點)。
Client:Flink 作業在哪台機器上面提交,那么當前機器稱之為Client。用戶開發的Program 代碼,它會構建出DataFlow graph,然后通過Client提交給JobManager。
JobManager:是主(master)節點,相當於YARN里面的REsourceManager,生成環境中一般可以做HA 高可用。JobManager會將任務進行拆分,調度到TaskManager上面執行。
TaskManager:是從節點(slave),TaskManager才是真正實現task的部分。
Client提交作業到JobManager,就需要跟JobManager進行通信,它使用Akka框架或者庫進行通信,另外Client與JobManager進行數據交互,使用的是Netty框架。Akka通信基於Actor System,Client可以向JobManager發送指令,比如Submit job或者Cancel /update job。JobManager也可以反饋信息給Client,比如status updates,Statistics和results。
Client提交給JobManager的是一個Job,然后JobManager將Job拆分成task,提交給TaskManager(worker)。JobManager與TaskManager也是基於Akka進行通信,JobManager發送指令,比如Deploy/Stop/Cancel Tasks或者觸發Checkpoint,反過來TaskManager也會跟JobManager通信返回Task Status,Heartbeat(心跳),Statistics等。另外TaskManager之間的數據通過網絡進行傳輸,比如Data Stream做一些算子的操作,數據往往需要在TaskManager之間做數據傳輸。
1.2. TaskManger Slot
TaskManager是進程,他下面運行的task(整個Flink應用是Job,Job可以拆分成很多個task)是線程,每個task/subtask(線程)下可運行一個或者多個operator,即OperatorChain。Task是class,抽象的,subtask是Object(類比學習),具體的。
一個TaskManager通過Slot(任務槽)來控制它上面可以接受多少個task,比如一個TaskManager划分了3個Task Slot(僅限內存托管,目前CPU未做隔離),它只能接受3個task。Slot均分TaskManager所托管的內存,比如一個TaskManager有6G內存,那么每個Slot分配2G。
同一個TaskManager中的task共享TCP連接(通過多路復用)和心跳消息。它們還可以共享數據集和數據結構,從而減少每個任務的開銷。一個TaskManager有N個槽位只能接受N個Task嗎?不是,后面會講共享槽位。
1.3. OperatorChain && Task
為了更高效地分布式執行,Flink會盡可能地將operator的subtask鏈接(chain)在一起形成task。以wordcount為例,解析不同視圖下的數據流,如下圖所示。
數據流(邏輯視圖)
創建Source(並行度設置為1)讀取數據源,數據經過FlatMap(並行度設置為2)做轉換操作,然后數據經過Key Agg(並行度設置為2)做聚合操作,最后數據經過Sink(並行度設置為2)將數據輸出。
數據流(並行化視圖)
並行度為1的Source讀取數據源,然后FlatMap並行度為2讀取數據源進行轉化操作,然后數據經過Shuffle交給並行度為2的Key Agg進行聚合操作,然后並行度為2的Sink將數據輸出,未優化前的task總和為7。
數據流(優化后視圖)
並行度為1的Source讀取數據源,然后FlatMap並行度為2讀取數據源進行轉化操作,然后數據經過Shuffle交給Key Agg進行聚合操作,此時Key Agg和Sink操作合並為一個task(注意:將KeyAgg和Sink兩個operator進行了合並,因為這兩個合並后並不會改變整體的拓撲結構),它們一起的並行度為2,數據經過Key Agg和Sink之后將數據輸出,優化后的task總和為5.
1.4. OperatorChain的優點和組成條件
OperatorChain的優點
1.減少線程切換
2.減少序列化與反序列化
3.減少數據在緩沖區的交換
4.減少延遲並且提高吞吐能力
OperatorChain 組成條件
1.沒有禁用Chain
2.上下游算子並行度一致 。
3.下游算子的入度為1(也就是說下游節點沒有來自其他節點的輸入)。
4.上下游算子在同一個slot group(后面緊跟着就會講如何通過slot group先分配到同一個solt,然后才能chain) 。
5.下游節點的 chain 策略為 ALWAYS(可以與上下游鏈接,map、flatmap、filter等默認是ALWAYS)。
6.上游節點的 chain 策略為 ALWAYS 或 HEAD(只能與下游鏈接,不能與上游鏈接,Source默認是HEAD)。
7.上下游算子之間沒有數據shuffle (數據分區方式是 forward)。
1.5. 編程改變OperatorChain行為
Operator chain的行為可以通過編程API中進行指定,可以通過在DataStream的operator后面(如someStream.map(..))調用startNewChain()來指示從該operator開始一個新的chain(與前面截斷,不會被chain到前面)。可以調用disableChaining()來指示該operator不參與chaining(不會與前后的operator chain一起)。可以通過調用StreamExecutionEnvironment.disableOperatorChaining()來全局禁用chaining。可以設置Slot group,例如someStream.filter(...).slotSharingGroup(“name”)。可以通過調整並行度,來調整Operator chain。
2. Slot分配與共享
2.1共享Slot
默認情況下,Flink 允許subtasks共享slot,條件是它們都來自同一個Job的不同task的subtask。結果可能一個slot持有該job的整個pipeline。
允許slot共享有以下兩點好處:
1.Flink集群需要的任務槽與作業中使用的最高並行度正好相同(前提,保持默認SlotSharingGroup)。也就是說我們不需要再去計算一個程序總共會起多少個task了。
2.更容易獲得更充分的資源利用。如果沒有slot共享,那么非密集型操作source/flatmap就會占用同密集型操作 keyAggregation/sink 一樣多的資源。如果有slot共享,將task的2個並行度增加到6個,能充分利用slot資源,同時保證每個TaskManager能平均分配到重的subtasks。
2.2共享Slot實例
將 WordCount 的並行度從之前的2個增加到6個(Source並行度仍為1),並開啟slot共享(所有operator都在default共享組),將得到如上圖所示的slot分布圖。
首先,我們不用去計算這個job會其多少個task,總之該任務最終會占用6個slots(最高並行度為6)。其次,我們可以看到密集型操作 keyAggregation/sink 被平均地分配到各個 TaskManager。
2.3 SlotSharingGroup(soft)
SlotSharingGroup是Flink中用來實現slot共享的類,它盡可能地讓subtasks共享一個slot。
保證同一個group的並行度相同的sub-tasks 共享同一個slots。算子的默認group為default(即默認一個job下的subtask都可以共享一個slot)
為了防止不合理的共享,用戶也能通過API來強制指定operator的共享組,比如:someStream.filter(...).slotSharingGroup("group1");就強制指定了filter的slot共享組為group1。怎么確定一個未做SlotSharingGroup設置算子的SlotSharingGroup什么呢(根據上游算子的group 和自身是否設置group共同確定)。適當設置可以減少每個slot運行的線程數,從而整體上減少機器的負載。
2.4 CoLocationGroup(強制)
CoLocationGroup可以保證所有的並行度相同的sub-tasks運行在同一個slot,主要用於迭代流(訓練機器學習模型)。
3. Slot & parallelism的關系
3.1 Slots && parallelism
如上圖所示,有兩個TaskManager,每個TaskManager有3個槽位。假設source操作並行度為3,map操作的並行度為4,sink的並行度為4,所需的task slots數與job中task的最高並行度一致,最高並行度為4,那么使用的Slot也為4。
3.2如何計算Slot
如何計算一個應用需要多少slot?
如果不設置SlotSharingGroup,那么需要的Slot數為應用的最大並行度數。如果設置了SlotSharingGroup,那么需要的Slot數為所有SlotSharingGroup中的最大並行度之和。比如已經強制指定了map的slot共享組為test,那么map和map下游的組為test,map的上游source的組為默認的default,此時default組中最大並行度為10,test組中最大並行度為20,那么需要的Slot=10+20=30。
4.Flink部署模式
4.1 Local 本地部署
Flink 可以運行在 Linux、Mac OS X 和 Windows 上。本地模式的安裝唯一需要的只是 Java 1.7.x或更高版本,本地運行會啟動Single JVM,主要用於測試調試代碼。
4.2 Standalone Cluster集群部署
軟件需求
1.安裝Java1.8或者更高版本
2.集群各個節點需要ssh免密登錄
Flink Standalone 運行流程前面已經講過,這里就不在贅敘。
4.3Flink ON YARN
Flink ON YARN工作流程如下所示:
首先提交job給YARN,就需要有一個Flink YARN Client。
第一步:Client將Flink 應用jar包和配置文件上傳到HDFS。
第二步:Client向REsourceManager注冊resources和請求APPMaster Container
第三步:REsourceManager就會給某一個Worker節點分配一個Container來啟動APPMaster,JobManager會在APPMaster中啟動。
第四步:APPMaster為Flink的TaskManagers分配容器並啟動TaskManager,TaskManager內部會划分很多個Slot,它會自動從HDFS下載jar文件和修改后的配置,然后運行相應的task。TaskManager也會與APPMaster中的JobManager進行交互,維持心跳等。
5.Flink Standalone集群部署
安裝Flink之前需要提前安裝好JDK,這里我們安裝的是JDK1.8版本。
5.1下載
可以到官網:https://archive.apache.org/dist/flink/ 將Flink1.6.2版本下載到本地。
5.2解壓
將下載的flink-1.6.2-bin-hadoop26-scala_2.11.tgz上傳至主節點
使用tar -zxvf flink-1.6.2-bin-hadoop26-scala_2.11.tgz命令解壓flink安裝包
方便后期flink多版本的使用,可以創建flink軟連接
ln -s flink-1.6.2 flink
5.3配置環境變量
vi ~/.bashrc
export FLINK_HOME=/home/hadoop/app/flink
export PATH=$FLINK_HOME/bin:$PATH
使配置文件生效
source ~/.bashrc
查看flink版本
flink -v
5.4修改配置文件
1.修改flink-conf.yaml配置文件
vi flink-conf.yaml
#JobManager地址
jobmanager.rpc.address: cdh01
#槽位配置為3
taskmanager.numberOfTaskSlots: 3
#設置並行度為3
parallelism.default: 3
2.修改masters配置
vi masters
cdh01:8081
3.修改slaves配置
vi slaves
cdh01
cdh02
cdh03
5.5主節點安裝目錄同步到從節點
通過deploy.sh腳本將flink安裝目錄同步到其他節點。
deploy.sh flink-1.6.2 /home/hadoop/app/ slave
在從節點分別創建flink軟連接
ln -s flink-1.6.2 flink
5.6啟動服務
進入flink bin目錄執行啟動集群腳本start-cluster.sh
bin/start-cluster.sh
通過web查看flink集群,查看相關集群信息。
5.7測試運行
查看官網案例:https://ci.apache.org/projects/flink/flink-docs-release-1.6/
1.啟動nc服務
nc -l 9000
2.提交flink作業
bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
3.輸入測試數據
5.7測試運行
查看官網案例:https://ci.apache.org/projects/flink/flink-docs-release-1.6/
1.啟動nc服務
nc -l 9000
2.提交flink作業
bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
3.輸入測試數據
4.查看運行結果
在TaskManager界面查看Flink運行結果
(5)Flink開發環境搭建
1. 創建Flink項目及依賴管理
1.1創建Flink項目
官網創建Flink項目有兩種方式:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/quickstart/java_api_quickstart.html
方式一:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.6.2
方式二
$ curl https://flink.apache.org/q/quickstart.sh | bash -s 1.6.2
這里我們仍然使用第一種方式創建Flink項目。
打開終端,切換到對應的目錄,通過maven創建flink項目
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.6.2
項目構建過程中需要輸入groupId,artifactId,version和package
Flink項目創建成功
打開IDEA工具,點擊open。
選擇剛剛創建的flink項目
Flink項目已經成功導入IDEA開發工具
通過maven打包測試運行
mvn clean package
刷新target目錄可以看到剛剛打包的flink項目
1.2. Flink依賴
Core Dependencies(核心依賴):
1.核心依賴打包在flink-dist*.jar里
2.包含coordination, networking, checkpoints, failover, APIs, operations (such as windowing), resource management等必須的依賴
注意:核心依賴不會隨着應用打包(<scope>provided</scope>)
3.核心依賴項盡可能小,並避免依賴項沖突
Pom文件中添加核心依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.6.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.6.2</version>
<scope>provided</scope>
</dependency>
注意:不會隨着應用打包。
User Application Dependencies(應用依賴):
connectors, formats, or libraries(CEP, SQL, ML)、
注意:應用依賴會隨着應用打包(scope保持默認值就好)
Pom文件中添加應用依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.6.2</version>
</dependency>
注意:應用依賴按需選擇,會隨着應用打包,可以通過Maven Shade插件進行打包。
1.3. 關於Scala版本
Scala各版本之間是不兼容的(你基於Scala2.12開發Flink應用就不能依賴Scala2.11的依賴包)。
只使用Java的開發人員可以選擇任何Scala版本,Scala開發人員需要選擇與他們的應用程序的Scala版本匹配的Scala版本。
1.4. Hadoop依賴
不要把Hadoop依賴直接添加到Flink application,而是:
export HADOOP_CLASSPATH=`hadoop classpath`
Flink組件啟動時會使用該環境變量的
特殊情況:如果在Flink application中需要用到Hadoop的input-/output format,只需引入Hadoop兼容包即可(Hadoop compatibility wrappers)
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>1.6.2</version>
</dependency>
1.5 Flink項目打包
Flink 可以使用maven-shade-plugin對Flink maven項目進行打包,具體打包命令為mvn clean
package。
2. 自己編譯Flink
2.1安裝maven
1.下載
到maven官網下載安裝包,這里我們可以選擇使用apache-maven-3.3.9-bin.tar.gz。
2.解壓
將apache-maven-3.3.9-bin.tar.gz安裝包上傳至主節點的,然后使用tar命令進行解壓
tar -zxvf apache-maven-3.3.9-bin.tar.gz
3.創建軟連接
ln -s apache-maven-3.3.9 maven
4.配置環境變量
vi ~/.bashrc
export MAVEN_HOME=/home/hadoop/app/maven
export PATH=$MAVEN_HOME/bin:$PATH
5.生效環境變量
source ~/.bashrc
6.查看maven版本
mvn –version
7. settings.xml配置阿里鏡像
添加阿里鏡像
<mirror>
<id>nexus-osc</id>
<mirrorOf>*</mirrorOf>
<name>Nexus osc</name>
<url>http://maven.aliyun.com/nexus/content/repositories/central</url>
</mirror>
2.2安裝jdk
編譯flink要求jdk8或者以上版本,這里已經提前安裝好jdk1.8,具體安裝配置不再贅敘,查看版本如下:
[hadoop@cdh01 conf]$ java -version
java version "1.8.0_51"
Java(TM) SE Runtime Environment (build 1.8.0_51-b16)
Java HotSpot(TM) 64-Bit Server VM (build 25.51-b03, mixed mode)
2.3下載源碼
登錄github:https://github.com/apache/flink,獲取flink下載地址:https://github.com/apache/flink.git
打開Flink主節點終端,進入/home/hadoop/opensource目錄,通過git clone下載flink源碼:
git clone https://github.com/apache/flink.git
錯誤1:如果Linux沒有安裝git,會報如下錯誤:
bash: git: command not found
解決:git安裝步驟如下所示:
1.安裝編譯git時需要的包(注意需要在root用戶下安裝)
yum install curl-devel expat-devel gettext-devel openssl-devel zlib-devel
yum install gcc perl-ExtUtils-MakeMaker
2.刪除已有的git
yum remove git
3.下載git源碼
先安裝wget
yum -y install wget
使用wget下載git源碼
wget https://www.kernel.org/pub/software/scm/git/git-2.0.5.tar.gz
解壓git
tar xzf git-2.0.5.tar.gz
編譯安裝git
cd git-2.0.5
make prefix=/usr/local/git all
sudo make prefix=/usr/local/git install
echo "export PATH=$PATH:/usr/local/git/bin" >> ~/.bashrc
source ~/.bashrc
查看git版本
git –version
錯誤2:git clone https://github.com/apache/flink.git
Cloning into 'flink'...
fatal: unable to access 'https://github.com/apache/flink.git/': SSL connect error
解決:
升級 nss 版本:yum update nss
2.4切換對應flink版本
使用如下命令查看flink版本分支
git tag
切換到flink對應版本(這里我們使用flink1.6.2)
git checkout release-1.6.2
2.5編譯flink
進入flink 源碼根目錄:/home/hadoop/opensource/flink,通過maven編譯flink
mvn clean install -DskipTests -Dhadoop.version=2.6.0
報錯:
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 06:58 min
[INFO] Finished at: 2019-01-18T22:11:54-05:00
[INFO] Final Memory: 106M/454M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal on project flink-mapr-fs: Could not resolve dependencies for project org.apache.flink:flink-mapr-fs:jar:1.6.2: Could not find artifact com.mapr.hadoop:maprfs:jar:5.2.1-mapr in nexus-osc (http://maven.aliyun.com/nexus/content/repositories/central) -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException
[ERROR]
[ERROR] After correcting the problems, you can resume the build with the command
[ERROR] mvn <goals> -rf :flink-mapr-fs
報錯缺失flink-mapr-fs,需要手動下載安裝。
解決:
1.下載maprfs jar包
通過手動下載maprfs-5.2.1-mapr.jar包,下載地址地址:https://repository.mapr.com/nexus/content/groups/mapr-public/com/mapr/hadoop/maprfs/5.2.1-mapr/
2.上傳至主節點
將下載的maprfs-5.2.1-mapr.jar包上傳至主節點的/home/hadoop/downloads目錄下。
3.手動安裝
手動安裝缺少的包到本地倉庫
mvn install:install-file -DgroupId=com.mapr.hadoop -DartifactId=maprfs -Dversion=5.2.1-mapr -Dpackaging=jar -Dfile=/home/hadoop/downloads/maprfs-5.2.1-mapr.jar
4.繼續編譯
使用maven繼續編譯flink(可以排除剛剛已經安裝的包)
mvn clean install -Dmaven.test.skip=true -Dhadoop.version=2.7.3 -rf :flink-mapr-fs
報錯:
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 05:51 min
[INFO] Finished at: 2019-01-18T22:39:20-05:00
[INFO] Final Memory: 108M/480M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project flink-mapr-fs: Compilation failure: Compilation failure:
[ERROR] /home/hadoop/opensource/flink/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java:[70,44] package org.apache.hadoop.fs does not exist
[ERROR] /home/hadoop/opensource/flink/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java:[73,45] cannot find symbol
[ERROR] symbol: class Configuration
[ERROR] location: package org.apache.hadoop.conf
[ERROR] /home/hadoop/opensource/flink/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/
runtime/fs/maprfs/MapRFileSystem.java:[73,93] cannot find symbol
[ERROR] symbol: class Configuration
缺失org.apache.hadoop.fs包,報錯找不到。
解決:
flink-mapr-fs模塊的pom文件中添加如下依賴:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
繼續往后編譯:
mvn clean install -Dmaven.test.skip=true -Dhadoop.version=2.7.3 -rf :flink-mapr-fs
又報錯:
[ERROR] Failed to execute goal on project flink-avro-confluent-registry: Could not resolve dependencies for project org.apache.flink:flink-avro-confluent-registry:jar:1.6.2: Could not find artifact io.confluent:kafka-schema-registry-client:jar:3.3.1 in nexus-osc (http://maven.aliyun.com/nexus/content/repositories/central) -> [Help 1]
[ERROR]
報錯缺少kafka-schema-registry-client-3.3.1.jar 包
解決:
手動下載kafka-schema-registry-client-3.3.1.jar包,下載地址如下:
將下載的kafka-schema-registry-client-3.3.1.jar上傳至主節點的目錄下/home/hadoop/downloads
手動安裝缺少的kafka-schema-registry-client-3.3.1.jar包
mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=3.3.1 -Dpackaging=jar -Dfile=/home/hadoop/downloads/kafka-schema-registry-client-3.3.1.jar
繼續往后編譯
mvn clean install -Dmaven.test.skip=true -Dhadoop.version=2.7.3 -rf :flink-mapr-fs
(6)Flink API 通用基本概念
1. 繼續侃Flink編程基本套路
1.1 DataSet and DataStream
DataSet and DataStream表示Flink app中的分布式數據集。它們包含重復的、不可變數據集。DataSet有界數據集,用在Flink批處理。DataStream可以是無界,用在Flink流處理。它們可以從數據源創建,也可以通過各種轉換操作創建。
1.2共同的編程套路
DataSet and DataStream 這里以WordCount為例,共同的編程套路如下所示:
1.獲取執行環境(execution environment)
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
2.加載/創建初始數據集
// 讀取輸入數據
DataStream<String> text;
if (params.has("input")) {
// 讀取text文件
text = env.readTextFile(params.get("input"));
} else {
System.out.println("Executing WordCount example with default input data set.");
System.out.println("Use --input to specify file input.");
// 讀取默認測試數據集
text = env.fromElements(WordCountData.WORDS);
}
3.對數據集進行各種轉換操作(生成新的數據集)
// 切分每行單詞
text.flatMap(new Tokenizer())
//對每個單詞分組統計詞頻數
.keyBy(0).sum(1);
4.指定將計算的結果放到何處去
// 輸出統計結果
if (params.has("output")) {
//寫入文件地址
counts.writeAsText(params.get("output"));
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
//數據打印控制台
counts.print();
}
5.觸發APP執行
// 執行flink 程序
env.execute("Streaming WordCount");
1.3惰性計算
Flink APP都是延遲執行的,只有當execute()被顯示調用時才會真正執行,本地執行還是在集群上執行取決於執行環境的類型。好處:用戶可以根據業務構建復雜的應用,Flink可以整體進優化並生成執行計划。
2. 指定鍵(Specifying Keys)
2.1誰需要指定鍵
哪些操作需要指定key呢?常見的操作如join, coGroup, keyBy, groupBy,Reduce, GroupReduce, Aggregate, Windows等。
Flink編程模型的key是虛擬的,不需要你創建鍵值對,可以在具體算子通過參數指定,如下代碼所示:
DataSet<...> input = // [...]
DataSet<...> reduced = input
.groupBy(/*define key here*/)
.reduceGroup(/*do something*/);
2.2為Tuple定義鍵
Tuple定義鍵的方式有很多種,接下來我們一起看幾個示例:
按照指定屬性分組
DataStream<Tuple3<Integer,String,Long>> input = // [...] KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)
注意:此時表示使用Tuple3三元組的第一個成員作為keyBy
按照組合鍵進行分組
DataStream<Tuple3<Integer,String,Long>> input = // [...] KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)
注意:此時表示使用Tuple3三元組的前兩個元素一起作為keyBy
特殊情況:嵌套Tuple
DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> input = // [...]
KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)
注意:這里使用KeyBy(0)指定鍵,系統將會使用整個Tuple2作為鍵(整型和浮點型的)。如果想使用Tuple2內部字段作為鍵,你可以使用字段來表示鍵,這種方法會在后面闡述。
2.3使用字段表達式定義鍵
基於字符串的字段表達式可以用來引用嵌套字段(例如Tuple,POJO)
public class WC {
public String word;
public User user;
public int count;
}
public class User{
public int age;
public String zip;
}
示例:通過word字段進行分組
DataStream<WC> words = // [...]
DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);
語法:
1.直接使用字段名選擇POJO字段
例如 user 表示 一個POJO的user字段
2.Tuple通過offset來選擇
"_1"和"5"分別代表第一和第六個Scala Tuple字段
“f0” and “f5”分別代表第一和第六個Java Tuple字段
3.選擇POJO和Tuples的嵌套屬性
user.zip
在scala里你可以"_2.user.zip"或"user._4.1.zip”
在java里你可以“2.user.zip”或者" user.f0.1.zip ”
4.使用通配符表達式選擇所有屬性,java為“*”,scala為 "_"。不是POJO或者Tuple的類型也適用。
2.4字段表達式實例-Java
以下定義兩個Java類:
public static class WC {
public ComplexNestedClass complex;
private int count;
public int getCount() {
return count;
}
public void setCount(int c) {
this.count = c;
}
}
public static class ComplexNestedClass {
public Integer someNumber;
public float someFloat;
public Tuple3<Long, Long, String> word;
public IntWritable hadoopCitizen;
}
我們一起看看如下key字段如何理解:
1."count": wc 類的count字段
2."complex":遞歸的選取ComplexNestedClass的所有字段
3."complex.word.f2": ComplexNestedClass類中的tuple word的第三個字段;
4."complex.hadoopCitizen":選擇Hadoop IntWritable類型。
2.5字段表達式實例-Scala
以下定義兩個Scala類:
"_1"和"5"分別代表第一和第六個Scala Tuple字段
“f0” and “f5”分別代表第一和第六個Java Tuple字段
3.選擇POJO和Tuples的嵌套屬性
user.zip
在scala里你可以"_2.user.zip"或"user._4.1.zip”
在java里你可以“2.user.zip”或者" user.f0.1.zip ”
4.使用通配符表達式選擇所有屬性,java為“*”,scala為 "_"。不是POJO或者Tuple的類型也適用。
2.4字段表達式實例-Java
以下定義兩個Java類:
public static class WC {
public ComplexNestedClass complex;
private int count;
public int getCount() {
return count;
}
public void setCount(int c) {
this.count = c;
}
}
public static class ComplexNestedClass {
public Integer someNumber;
public float someFloat;
public Tuple3<Long, Long, String> word;
public IntWritable hadoopCitizen;
}
我們一起看看如下key字段如何理解:
1."count": wc 類的count字段
2."complex":遞歸的選取ComplexNestedClass的所有字段
3."complex.word.f2": ComplexNestedClass類中的tuple word的第三個字段;
4."complex.hadoopCitizen":選擇Hadoop IntWritable類型。
2.5字段表達式實例-Scala
以下定義兩個Scala類: