flink學習筆記-各種Time


說明:本文為《Flink大數據項目實戰》學習筆記,想通過視頻系統學習Flink這個最火爆的大數據計算框架的同學,推薦學習課程:

 

Flink大數據項目實戰:http://t.cn/EJtKhaz

 

 

 

從上圖可以看出Flink 中的Time大致分為以下三類:

1.Event TimeEvent 真正產生的時間,我們稱之為Event Time

 

2.Ingestion TimeEvent 事件被Source拿到,進入Flink處理引擎的時間,我們稱之為Ingestion Time

 

3.Window Processing TimeEvent事件被Flink 處理(比如做windows操作)時的時間,我們稱之為Window Processing Time

 

4. Stateful Operations

 

 

 

什么是狀態?

state一般指一個具體的task/operator的狀態,比如當前處理那些數據,數據處理的進度等等。

 

Flink state操作狀態分為兩類:

1.Operator State

Operator State跟一個特定operator的一個並發實例綁定,整個operator只對應一個state

 

2.Keyed State

基於KeyedStream上的狀態。這個狀態是跟特定的key綁定的,對KeyedStream流上的每一個key,可能都對應一個state

 

Flink 每個操作狀態又分為兩類:

Keyed StateOperator State可以以兩種形式存在:原始狀態和托管狀態( Flink框架管理的狀態)。

1.原始狀態:比如一個字符串或者數組,它需要序列化,保存到內存或磁盤,或者外部存儲中,這就是它的原始狀態。

2.托管狀態:比如數據放在Hash表中,或者放在HDFS中,或者放在rocksdb中,這種就是托管狀態。當需要處理數據的時候,從托管狀態中讀取出來,還原成原始狀態,甚至變量和集合,然后再進行處理。

5.Checkpoints(備份)

什么是checkpoint

所謂checkpoint,就是在某一時刻,將所有task的狀態做一個快照(snapshot),然后存儲到State Backend(比如hdfs)。checkpoint擁有輕量級容錯機制,可以保證exactly-once 語義,用於內部失敗的恢復(比如當應用掛了,它可以自動恢復從上次的進度接着執行)

 

checkpoint基本原理:通過往source 注入barrier(可以理解為特殊的Event),barrier作為checkpoint的標志,它會自動做checkpoint無需人工干預。

 

6.Savepoint

 

savepoint是流處理過程中的狀態歷史版本,它具有可以replay的功能。用於外部恢復,當Flink應用重啟和升級,它會做一個先做一個savepoint,下次應用啟動可以接着上次進度執行。

 

 

 

savepoint兩種觸發方式:

 

1.Cancel with savepoint

 

2.手動主動觸發

 

savepoint可以理解為是一種特殊的checkpointsavepoint就是指向checkpoint的一個指針,需要手動觸發,而且不會過期,不會被覆蓋,除非手動刪除。正常情況下的線上環境是不需要設置savepoint的。除非對job或集群做出重大改動的時候,需要進行測試運行。

 

4Flink Runtime

 

1. Flink運行時架構

 

1.1Flink架構

 

Flink 運行時架構主要包含幾個部分:ClientJobManager(master節點)TaskManger(slave節點)

 

 

 

 

ClientFlink 作業在哪台機器上面提交,那么當前機器稱之為Client。用戶開發的Program 代碼,它會構建出DataFlow graph,然后通過Client提交給JobManager

 

 

 

JobManager:是主(master)節點,相當於YARN里面的REsourceManager,生成環境中一般可以做HA 高可用。JobManager會將任務進行拆分,調度到TaskManager上面執行。

 

 

 

TaskManager:是從節點(slave),TaskManager才是真正實現task的部分。

 

 

 

 

 

Client提交作業到JobManager,就需要跟JobManager進行通信,它使用Akka框架或者庫進行通信,另外ClientJobManager進行數據交互,使用的是Netty框架。Akka通信基於Actor SystemClient可以向JobManager發送指令,比如Submit job或者Cancel /update jobJobManager也可以反饋信息給Client,比如status updatesStatisticsresults

 

 

 

Client提交給JobManager的是一個Job,然后JobManagerJob拆分成task,提交給TaskManagerworker)。JobManagerTaskManager也是基於Akka進行通信,JobManager發送指令,比如Deploy/Stop/Cancel Tasks或者觸發Checkpoint,反過來TaskManager也會跟JobManager通信返回Task StatusHeartbeat(心跳),Statistics等。另外TaskManager之間的數據通過網絡進行傳輸,比如Data Stream做一些算子的操作,數據往往需要在TaskManager之間做數據傳輸。

 

1.2. TaskManger Slot

 

 

TaskManager是進程,他下面運行的task(整個Flink應用是JobJob可以拆分成很多個task)是線程,每個task/subtask(線程)下可運行一個或者多個operator,即OperatorChainTaskclass,抽象的,subtaskObject(類比學習),具體的。

 

一個TaskManager通過Slot(任務槽)來控制它上面可以接受多少個task,比如一個TaskManager划分了3Task Slot(僅限內存托管,目前CPU未做隔離),它只能接受3taskSlot均分TaskManager所托管的內存,比如一個TaskManager6G內存,那么每個Slot分配2G

 

同一個TaskManager中的task共享TCP連接(通過多路復用)和心跳消息。它們還可以共享數據集和數據結構,從而減少每個任務的開銷。一個TaskManagerN個槽位只能接受NTask嗎?不是,后面會講共享槽位。

1.3. OperatorChain && Task

為了更高效地分布式執行,Flink會盡可能地將operatorsubtask鏈接(chain)在一起形成task。以wordcount為例,解析不同視圖下的數據流,如下圖所示。

數據流(邏輯視圖)

創建Source(並行度設置為1)讀取數據源,數據經過FlatMap(並行度設置為2)做轉換操作,然后數據經過Key Agg(並行度設置為2)做聚合操作,最后數據經過Sink(並行度設置為2)將數據輸出。

 

數據流(並行化視圖)

並行度為1Source讀取數據源,然后FlatMap並行度為2讀取數據源進行轉化操作,然后數據經過Shuffle交給並行度為2Key Agg進行聚合操作,然后並行度為2Sink將數據輸出,未優化前的task總和為7

 

數據流(優化后視圖)

 

並行度為1Source讀取數據源,然后FlatMap並行度為2讀取數據源進行轉化操作,然后數據經過Shuffle交給Key Agg進行聚合操作,此時Key AggSink操作合並為一個task(注意:將KeyAggSink兩個operator進行了合並,因為這兩個合並后並不會改變整體的拓撲結構),它們一起的並行度為2,數據經過Key AggSink之后將數據輸出,優化后的task總和為5.

1.4. OperatorChain的優點和組成條件

OperatorChain的優點

1.減少線程切換

2.減少序列化與反序列化

3.減少數據在緩沖區的交換

4.減少延遲並且提高吞吐能力

 

OperatorChain 組成條件

1.沒有禁用Chain

2.上下游算子並行度一致 。

3.下游算子的入度為1(也就是說下游節點沒有來自其他節點的輸入)

4.上下游算子在同一個slot group(后面緊跟着就會講如何通過slot group先分配到同一個solt,然后才能chain)

5.下游節點的 chain 策略為 ALWAYS(可以與上下游鏈接,mapflatmapfilter等默認是ALWAYS)。

6.上游節點的 chain 策略為 ALWAYS HEAD(只能與下游鏈接,不能與上游鏈接,Source默認是HEAD)。

7.上下游算子之間沒有數據shuffle (數據分區方式是 forward)

 

1.5. 編程改變OperatorChain行為

 

Operator chain的行為可以通過編程API中進行指定,可以通過在DataStreamoperator后面(如someStream.map(..))調用startNewChain()來指示從該operator開始一個新的chain(與前面截斷,不會被chain到前面)。可以調用disableChaining()來指示該operator不參與chaining(不會與前后的operator chain一起)。可以通過調用StreamExecutionEnvironment.disableOperatorChaining()來全局禁用chaining。可以設置Slot group,例如someStream.filter(...).slotSharingGroup(name)。可以通過調整並行度,來調整Operator chain

 

 

2. Slot分配與共享

 

2.1共享Slot

 

 

 

默認情況下,Flink 允許subtasks共享slot,條件是它們都來自同一個Job的不同tasksubtask。結果可能一個slot持有該job的整個pipeline

 

允許slot共享有以下兩點好處:

1.Flink集群需要的任務槽與作業中使用的最高並行度正好相同(前提,保持默認SlotSharingGroup)。也就是說我們不需要再去計算一個程序總共會起多少個task了。

 

2.更容易獲得更充分的資源利用。如果沒有slot共享,那么非密集型操作source/flatmap就會占用同密集型操作 keyAggregation/sink 一樣多的資源。如果有slot共享,將task2個並行度增加到6個,能充分利用slot資源,同時保證每個TaskManager能平均分配到重的subtasks

 

2.2共享Slot實例

 

WordCount 的並行度從之前的2個增加到6個(Source並行度仍為1),並開啟slot共享(所有operator都在default共享組),將得到如上圖所示的slot分布圖。

首先,我們不用去計算這個job會其多少個task,總之該任務最終會占用6slots(最高並行度為6)。其次,我們可以看到密集型操作 keyAggregation/sink 被平均地分配到各個 TaskManager

2.3 SlotSharingGroup(soft)

SlotSharingGroupFlink中用來實現slot共享的類,它盡可能地讓subtasks共享一個slot

保證同一個group的並行度相同的sub-tasks 共享同一個slots。算子的默認groupdefault(即默認一個job下的subtask都可以共享一個slot)

為了防止不合理的共享,用戶也能通過API來強制指定operator的共享組,比如:someStream.filter(...).slotSharingGroup("group1");就強制指定了filterslot共享組為group1。怎么確定一個未做SlotSharingGroup設置算子的SlotSharingGroup什么呢(根據上游算子的group 和自身是否設置group共同確定)。適當設置可以減少每個slot運行的線程數,從而整體上減少機器的負載。

 

 

2.4 CoLocationGroup(強制)

CoLocationGroup可以保證所有的並行度相同的sub-tasks運行在同一個slot,主要用於迭代流(訓練機器學習模型)

 

3. Slot & parallelism的關系

 

3.1 Slots && parallelism

 

 

 

 

 

如上圖所示,有兩個TaskManager,每個TaskManager3個槽位。假設source操作並行度為3map操作的並行度為4sink的並行度為4,所需的task slots數與jobtask的最高並行度一致,最高並行度為4,那么使用的Slot也為4

 

3.2如何計算Slot

如何計算一個應用需要多少slot

 

如果不設置SlotSharingGroup,那么需要的Slot數為應用的最大並行度數。如果設置了SlotSharingGroup,那么需要的Slot數為所有SlotSharingGroup中的最大並行度之和。比如已經強制指定了mapslot共享組為test,那么mapmap下游的組為testmap的上游source的組為默認的default,此時default組中最大並行度為10test組中最大並行度為20,那么需要的Slot=10+20=30

 

4.Flink部署模式

 

4.1 Local 本地部署

 

Flink 可以運行在 LinuxMac OS X Windows 上。本地模式的安裝唯一需要的只是 Java 1.7.x或更高版本,本地運行會啟動Single JVM,主要用於測試調試代碼。

 

4.2 Standalone Cluster集群部署

 

軟件需求

 

1.安裝Java1.8或者更高版本

 

2.集群各個節點需要ssh免密登錄

 

 

 

Flink Standalone 運行流程前面已經講過,這里就不在贅敘。

 

4.3Flink ON YARN

 

 

 

 

 

Flink ON YARN工作流程如下所示:

 

首先提交jobYARN,就需要有一個Flink YARN Client

 

第一步:ClientFlink 應用jar包和配置文件上傳到HDFS

 

第二步:ClientREsourceManager注冊resources和請求APPMaster  Container

 

第三步:REsourceManager就會給某一個Worker節點分配一個Container來啟動APPMasterJobManager會在APPMaster中啟動。

 

第四步:APPMasterFlinkTaskManagers分配容器並啟動TaskManagerTaskManager內部會划分很多個Slot,它會自動從HDFS下載jar文件和修改后的配置,然后運行相應的taskTaskManager也會與APPMaster中的JobManager進行交互,維持心跳等。

 

5.Flink Standalone集群部署

 

安裝Flink之前需要提前安裝好JDK,這里我們安裝的是JDK1.8版本。

 

 

5.1下載

 

可以到官網:https://archive.apache.org/dist/flink/ Flink1.6.2版本下載到本地。

 

 

5.2解壓

將下載的flink-1.6.2-bin-hadoop26-scala_2.11.tgz上傳至主節點

 

使用tar -zxvf flink-1.6.2-bin-hadoop26-scala_2.11.tgz命令解壓flink安裝包

方便后期flink多版本的使用,可以創建flink軟連接

ln -s flink-1.6.2 flink

 

5.3配置環境變量

vi ~/.bashrc

export FLINK_HOME=/home/hadoop/app/flink

export PATH=$FLINK_HOME/bin:$PATH

 

 

使配置文件生效

source ~/.bashrc

 

 

查看flink版本

flink -v

5.4修改配置文件

1.修改flink-conf.yaml配置文件

vi flink-conf.yaml

#JobManager地址

jobmanager.rpc.address: cdh01

 

#槽位配置為3

taskmanager.numberOfTaskSlots: 3

 

#設置並行度為3

parallelism.default: 3

 

2.修改masters配置

vi masters

cdh01:8081

 

 

3.修改slaves配置

vi slaves

cdh01

cdh02

cdh03

 

5.5主節點安裝目錄同步到從節點

通過deploy.sh腳本將flink安裝目錄同步到其他節點。

deploy.sh flink-1.6.2 /home/hadoop/app/ slave

 

 

在從節點分別創建flink軟連接

ln -s flink-1.6.2 flink

5.6啟動服務

進入flink bin目錄執行啟動集群腳本start-cluster.sh

bin/start-cluster.sh

 

 

 

 

通過web查看flink集群,查看相關集群信息。

 

http://cdh01:8081

5.7測試運行

查看官網案例:https://ci.apache.org/projects/flink/flink-docs-release-1.6/

 

1.啟動nc服務

nc -l 9000

 

 

2.提交flink作業

bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000

 

 

3.輸入測試數據

5.7測試運行

查看官網案例:https://ci.apache.org/projects/flink/flink-docs-release-1.6/

 

1.啟動nc服務

nc -l 9000

 

 

2.提交flink作業

bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000

 

 

3.輸入測試數據

 

4.查看運行結果

TaskManager界面查看Flink運行結果

 

 

5Flink開發環境搭建

1. 創建Flink項目及依賴管理

1.1創建Flink項目

官網創建Flink項目有兩種方式:

https://ci.apache.org/projects/flink/flink-docs-release-1.6/quickstart/java_api_quickstart.html

方式一:

mvn archetype:generate \

-DarchetypeGroupId=org.apache.flink \

-DarchetypeArtifactId=flink-quickstart-java \

-DarchetypeVersion=1.6.2

 

 

方式二

$ curl https://flink.apache.org/q/quickstart.sh | bash -s 1.6.2

 

 

這里我們仍然使用第一種方式創建Flink項目。

 

打開終端,切換到對應的目錄,通過maven創建flink項目

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java  -DarchetypeVersion=1.6.2

 

 

項目構建過程中需要輸入groupIdartifactIdversionpackage

Flink項目創建成功

 

打開IDEA工具,點擊open

 

選擇剛剛創建的flink項目

 

Flink項目已經成功導入IDEA開發工具

 

通過maven打包測試運行

 

mvn clean package

 

 

 

 

刷新target目錄可以看到剛剛打包的flink項目

 

1.2. Flink依賴

 

Core Dependencies(核心依賴)

 

1.核心依賴打包在flink-dist*.jar

 

2.包含coordination, networking, checkpoints, failover, APIs, operations (such as windowing), resource management等必須的依賴

 

注意:核心依賴不會隨着應用打包(<scope>provided</scope>)

 

3.核心依賴項盡可能小,並避免依賴項沖突

 

 

 

Pom文件中添加核心依賴

 

<dependency>

 

<groupId>org.apache.flink</groupId>

 

<artifactId>flink-java</artifactId>

 

<version>1.6.2</version>

 

<scope>provided</scope>

 

</dependency>

 

<dependency>

 

<groupId>org.apache.flink</groupId>

 

<artifactId>flink-streaming-java_2.11</artifactId>

 

<version>1.6.2</version>

<scope>provided</scope>

</dependency>

注意:不會隨着應用打包。

 

 

User Application Dependencies(應用依賴)

connectors, formats, or libraries(CEP, SQL, ML)

注意:應用依賴會隨着應用打包(scope保持默認值就好)

Pom文件中添加應用依賴

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-connector-kafka-0.10_2.11</artifactId>

<version>1.6.2</version>

</dependency>

 

注意:應用依賴按需選擇,會隨着應用打包,可以通過Maven Shade插件進行打包。

1.3. 關於Scala版本

Scala各版本之間是不兼容的(你基於Scala2.12開發Flink應用就不能依賴Scala2.11的依賴包)

 

只使用Java的開發人員可以選擇任何Scala版本,Scala開發人員需要選擇與他們的應用程序的Scala版本匹配的Scala版本。

1.4. Hadoop依賴

不要把Hadoop依賴直接添加到Flink application,而是:

export HADOOP_CLASSPATH=`hadoop classpath`

Flink組件啟動時會使用該環境變量的

 

 

特殊情況:如果在Flink application中需要用到Hadoopinput-/output format,只需引入Hadoop兼容包即可(Hadoop compatibility wrappers)

<dependency>

<groupId>org.apache.flink</groupId>

<artifactId>flink-hadoop-compatibility_2.11</artifactId>

<version>1.6.2</version>

</dependency>

1.5 Flink項目打包

Flink 可以使用maven-shade-pluginFlink maven項目進行打包,具體打包命令為mvn clean 

 

 

 

 

package。

 

2. 自己編譯Flink

 

2.1安裝maven

 

1.下載

 

maven官網下載安裝包,這里我們可以選擇使用apache-maven-3.3.9-bin.tar.gz。

 

 

 

2.解壓

 

將apache-maven-3.3.9-bin.tar.gz安裝包上傳至主節點的,然后使用tar命令進行解壓

 

tar -zxvf apache-maven-3.3.9-bin.tar.gz

 

 

 

3.創建軟連接

 

ln -s apache-maven-3.3.9 maven

 

 

 

4.配置環境變量

 

vi ~/.bashrc

 

export MAVEN_HOME=/home/hadoop/app/maven

 

export PATH=$MAVEN_HOME/bin:$PATH

 

 

 

5.生效環境變量

 

source ~/.bashrc

 

 

 

6.查看maven版本

 

mvn –version

 

 

 

7. settings.xml配置阿里鏡像

 

添加阿里鏡像

 

<mirror>

 

<id>nexus-osc</id>

 

<mirrorOf>*</mirrorOf>

 

<name>Nexus osc</name>

 

<url>http://maven.aliyun.com/nexus/content/repositories/central</url>

 

</mirror>

 

2.2安裝jdk

 

編譯flink要求jdk8或者以上版本,這里已經提前安裝好jdk1.8,具體安裝配置不再贅敘,查看版本如下:

 

[hadoop@cdh01 conf]$ java -version

 

java version "1.8.0_51"

Java(TM) SE Runtime Environment (build 1.8.0_51-b16)

Java HotSpot(TM) 64-Bit Server VM (build 25.51-b03, mixed mode)

2.3下載源碼

登錄githubhttps://github.com/apache/flink,獲取flink下載地址:https://github.com/apache/flink.git

 

打開Flink主節點終端,進入/home/hadoop/opensource目錄,通過git clone下載flink源碼:

git clone https://github.com/apache/flink.git

 

 

錯誤1:如果Linux沒有安裝git,會報如下錯誤:

bash: git: command not found

 

解決:git安裝步驟如下所示:

1.安裝編譯git時需要的包(注意需要在root用戶下安裝)

yum install curl-devel expat-devel gettext-devel openssl-devel zlib-devel

 

yum install  gcc perl-ExtUtils-MakeMaker

 

2.刪除已有的git

yum remove git

 

3.下載git源碼

 

先安裝wget

yum -y install wget

 

使用wget下載git源碼

wget https://www.kernel.org/pub/software/scm/git/git-2.0.5.tar.gz

 

解壓git

tar xzf git-2.0.5.tar.gz

 

 

編譯安裝git

cd git-2.0.5

make prefix=/usr/local/git all

sudo make prefix=/usr/local/git install

echo "export PATH=$PATH:/usr/local/git/bin" >> ~/.bashrc

source ~/.bashrc

 

查看git版本

git –version

 

 

錯誤2git clone https://github.com/apache/flink.git

Cloning into 'flink'...

fatal: unable to access 'https://github.com/apache/flink.git/': SSL connect error

 

解決:

升級 nss 版本:yum update nss

2.4切換對應flink版本

使用如下命令查看flink版本分支

git tag

 

切換到flink對應版本(這里我們使用flink1.6.2

git checkout release-1.6.2

 

2.5編譯flink

進入flink 源碼根目錄:/home/hadoop/opensource/flink,通過maven編譯flink

mvn clean install -DskipTests -Dhadoop.version=2.6.0

 

 

報錯:

[INFO] BUILD FAILURE

[INFO] ------------------------------------------------------------------------

[INFO] Total time: 06:58 min

[INFO] Finished at: 2019-01-18T22:11:54-05:00

[INFO] Final Memory: 106M/454M

[INFO] ------------------------------------------------------------------------

[ERROR] Failed to execute goal on project flink-mapr-fs: Could not resolve dependencies for project org.apache.flink:flink-mapr-fs:jar:1.6.2: Could not find artifact com.mapr.hadoop:maprfs:jar:5.2.1-mapr in nexus-osc (http://maven.aliyun.com/nexus/content/repositories/central) -> [Help 1]

[ERROR]

[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.

[ERROR] Re-run Maven using the -X switch to enable full debug logging.

[ERROR]

[ERROR] For more information about the errors and possible solutions, please read the following articles:

 

[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException

[ERROR]

[ERROR] After correcting the problems, you can resume the build with the command

[ERROR]   mvn <goals> -rf :flink-mapr-fs

報錯缺失flink-mapr-fs,需要手動下載安裝。

 

解決:

1.下載maprfs jar

通過手動下載maprfs-5.2.1-mapr.jar包,下載地址地址:https://repository.mapr.com/nexus/content/groups/mapr-public/com/mapr/hadoop/maprfs/5.2.1-mapr/

 

2.上傳至主節點

將下載的maprfs-5.2.1-mapr.jar包上傳至主節點的/home/hadoop/downloads目錄下。

 

3.手動安裝

手動安裝缺少的包到本地倉庫

mvn install:install-file -DgroupId=com.mapr.hadoop -DartifactId=maprfs -Dversion=5.2.1-mapr -Dpackaging=jar  -Dfile=/home/hadoop/downloads/maprfs-5.2.1-mapr.jar

 

4.繼續編譯

使用maven繼續編譯flink(可以排除剛剛已經安裝的包)

mvn clean install -Dmaven.test.skip=true -Dhadoop.version=2.7.3  -rf :flink-mapr-fs

 

報錯:

[INFO] BUILD FAILURE

[INFO] ------------------------------------------------------------------------

[INFO] Total time: 05:51 min

[INFO] Finished at: 2019-01-18T22:39:20-05:00

[INFO] Final Memory: 108M/480M

[INFO] ------------------------------------------------------------------------

[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project flink-mapr-fs: Compilation failure: Compilation failure:

[ERROR] /home/hadoop/opensource/flink/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java:[70,44] package org.apache.hadoop.fs does not exist

[ERROR] /home/hadoop/opensource/flink/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java:[73,45] cannot find symbol

[ERROR] symbol:   class Configuration

[ERROR] location: package org.apache.hadoop.conf

[ERROR] /home/hadoop/opensource/flink/flink-filesystems/flink-mapr-fs/src/main/java/org/apache/flink/

runtime/fs/maprfs/MapRFileSystem.java:[73,93] cannot find symbol

[ERROR] symbol:   class Configuration

缺失org.apache.hadoop.fs包,報錯找不到。

 

解決:

flink-mapr-fs模塊的pom文件中添加如下依賴:

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-common</artifactId>

<version>${hadoop.version}</version>

</dependency>

 

繼續往后編譯:

mvn clean install -Dmaven.test.skip=true -Dhadoop.version=2.7.3  -rf :flink-mapr-fs

 

 

又報錯:

[ERROR] Failed to execute goal on project flink-avro-confluent-registry: Could not resolve dependencies for project org.apache.flink:flink-avro-confluent-registry:jar:1.6.2: Could not find artifact io.confluent:kafka-schema-registry-client:jar:3.3.1 in nexus-osc (http://maven.aliyun.com/nexus/content/repositories/central) -> [Help 1]

[ERROR]

 

報錯缺少kafka-schema-registry-client-3.3.1.jar 

 

解決:

手動下載kafka-schema-registry-client-3.3.1.jar包,下載地址如下:

http://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/3.3.1/kafka-schema-registry-client-3.3.1.jar

 

將下載的kafka-schema-registry-client-3.3.1.jar上傳至主節點的目錄下/home/hadoop/downloads

 

手動安裝缺少的kafka-schema-registry-client-3.3.1.jar包

mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=3.3.1 -Dpackaging=jar  -Dfile=/home/hadoop/downloads/kafka-schema-registry-client-3.3.1.jar

 

繼續往后編譯

mvn clean install -Dmaven.test.skip=true -Dhadoop.version=2.7.3  -rf :flink-mapr-fs

 

 

6Flink API 通用基本概念

1. 繼續侃Flink編程基本套路

1.1 DataSet and DataStream

DataSet and DataStream表示Flink app中的分布式數據集。它們包含重復的、不可變數據集。DataSet有界數據集,用在Flink批處理。DataStream可以是無界,用在Flink流處理。它們可以從數據源創建,也可以通過各種轉換操作創建。

1.2共同的編程套路

DataSet and DataStream 這里以WordCount為例,共同的編程套路如下所示:

 

1.獲取執行環境(execution environment)

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 

2.加載/創建初始數據集

// 讀取輸入數據

DataStream<String> text;

if (params.has("input")) {

// 讀取text文件

text = env.readTextFile(params.get("input"));

} else {

System.out.println("Executing WordCount example with default input data set.");

System.out.println("Use --input to specify file input.");

// 讀取默認測試數據集

text = env.fromElements(WordCountData.WORDS);

}

3.對數據集進行各種轉換操作(生成新的數據集)

 

// 切分每行單詞

 

text.flatMap(new Tokenizer())

 

//對每個單詞分組統計詞頻數

 

.keyBy(0).sum(1);

 

4.指定將計算的結果放到何處去

 

// 輸出統計結果

 

if (params.has("output")) {

 

//寫入文件地址

 

counts.writeAsText(params.get("output"));

 

} else {

 

System.out.println("Printing result to stdout. Use --output to specify output path.");

 

//數據打印控制台

 

counts.print();

 

}

 

5.觸發APP執行

 

// 執行flink 程序

 

env.execute("Streaming WordCount");

 

1.3惰性計算

 

Flink APP都是延遲執行的,只有當execute()被顯示調用時才會真正執行,本地執行還是在集群上執行取決於執行環境的類型。好處:用戶可以根據業務構建復雜的應用,Flink可以整體進優化並生成執行計划。

 

2. 指定鍵(Specifying Keys

 

2.1誰需要指定鍵

 

哪些操作需要指定key呢?常見的操作如join, coGroup, keyBy, groupByReduce, GroupReduce, Aggregate, Windows等。

 

 

 

Flink編程模型的key是虛擬的,不需要你創建鍵值對,可以在具體算子通過參數指定,如下代碼所示:

 

DataSet<...> input = // [...]

 

DataSet<...> reduced = input

 

.groupBy(/*define key here*/)

 

.reduceGroup(/*do something*/);

2.2Tuple定義鍵

Tuple定義鍵的方式有很多種,接下來我們一起看幾個示例:

 

按照指定屬性分組

DataStream<Tuple3<Integer,String,Long>> input = // [...] KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)

注意:此時表示使用Tuple3三元組的第一個成員作為keyBy

 

按照組合鍵進行分組

DataStream<Tuple3<Integer,String,Long>> input = // [...] KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)

注意:此時表示使用Tuple3三元組的前兩個元素一起作為keyBy

 

特殊情況:嵌套Tuple

DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> input = // [...]

KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)

注意:這里使用KeyBy(0)指定鍵,系統將會使用整個Tuple2作為鍵(整型和浮點型的)。如果想使用Tuple2內部字段作為鍵,你可以使用字段來表示鍵,這種方法會在后面闡述。

2.3使用字段表達式定義鍵

基於字符串的字段表達式可以用來引用嵌套字段(例如Tuple,POJO)

public class WC {

    public String word;

public User user;

    public int count;

}

public class User{

public int age;

public String zip;

}

 

示例:通過word字段進行分組

DataStream<WC> words = // [...]

DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);

 

 

語法:

1.直接使用字段名選擇POJO字段

 例如 user 表示 一個POJOuser字段

 

2.Tuple通過offset來選擇

 

"_1""5"分別代表第一和第六個Scala Tuple字段

 

f0 and f5”分別代表第一和第六個Java Tuple字段

 

 

 

3.選擇POJOTuples的嵌套屬性

 

user.zip

 

scala里你可以"_2.user.zip""user._4.1.zip

 

java里你可以“2.user.zip”或者" user.f0.1.zip

 

 

 

4.使用通配符表達式選擇所有屬性,java為“*”,scala"_"。不是POJO或者Tuple的類型也適用。

 

2.4字段表達式實例-Java

 

以下定義兩個Java類:

 

public static class WC {

 

     public ComplexNestedClass complex;

 

     private int count;

 

     public int getCount() {

 

           return count;

 

      }

 

      public void setCount(int c) {

 

           this.count = c;

 

      }

 

}

 

public static class ComplexNestedClass {

 

      public Integer someNumber;

 

      public float someFloat;

 

      public Tuple3<Long, Long, String> word;

 

      public IntWritable hadoopCitizen;

 

}

 

 

 

我們一起看看如下key字段如何理解:

 

1."count": wc 類的count字段

 

 

 

2."complex":遞歸的選取ComplexNestedClass的所有字段

 

 

 

3."complex.word.f2": ComplexNestedClass類中的tuple word的第三個字段;

 

 

 

4."complex.hadoopCitizen":選擇Hadoop IntWritable類型。

 

2.5字段表達式實例-Scala

 

以下定義兩個Scala類:

 

"_1""5"分別代表第一和第六個Scala Tuple字段

 

f0 and f5”分別代表第一和第六個Java Tuple字段

 

 

 

3.選擇POJOTuples的嵌套屬性

 

user.zip

 

scala里你可以"_2.user.zip""user._4.1.zip

 

java里你可以“2.user.zip”或者" user.f0.1.zip

 

 

 

4.使用通配符表達式選擇所有屬性,java為“*”,scala"_"。不是POJO或者Tuple的類型也適用。

 

2.4字段表達式實例-Java

 

以下定義兩個Java類:

 

public static class WC {

 

     public ComplexNestedClass complex;

 

     private int count;

 

     public int getCount() {

 

           return count;

 

      }

 

      public void setCount(int c) {

 

           this.count = c;

 

      }

 

}

 

public static class ComplexNestedClass {

 

      public Integer someNumber;

 

      public float someFloat;

 

      public Tuple3<Long, Long, String> word;

 

      public IntWritable hadoopCitizen;

 

}

 

 

 

我們一起看看如下key字段如何理解:

 

1."count": wc 類的count字段

 

 

 

2."complex":遞歸的選取ComplexNestedClass的所有字段

 

 

 

3."complex.word.f2": ComplexNestedClass類中的tuple word的第三個字段;

 

 

 

4."complex.hadoopCitizen":選擇Hadoop IntWritable類型。

 

2.5字段表達式實例-Scala

 

以下定義兩個Scala類:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM