大數據課程之Flink
第一章 Flink簡介
1、初識Flink
Apache Flink是一個框架和分布式處理引擎,用於對無界和有界數據流進行有狀態計算。Flink被設計在所有常見的集群環境中運行,以內存執行速度和任意規模來執行計算。
Flink起源於Stratosphere項目,Stratosphere是在2010~2014年由3所地處柏林的大學和歐洲的一些其他的大學共同進行的研究項目,2014年4月Stratosphere的代碼被復制並捐贈給了Apache軟件基金會,參加這個孵化項目的初始成員是Stratosphere系統的核心開發人員,2014年12月,Flink一躍成為Apache軟件基金會的頂級項目。
在德語中,Flink一詞表示快速和靈巧,項目采用一只松鼠的彩色圖案作為logo,這不僅是因為松鼠具有快速和靈巧的特點,還因為柏林的松鼠有一種迷人的紅棕色,而Flink的松鼠logo擁有可愛的尾巴,尾巴的顏色與Apache軟件基金會的logo顏色相呼應,也就是說,這是一只Apache風格的松鼠。
圖 Flink Logo
Flink雖然誕生的早(2010年),但是其實是起大早趕晚集,直到2015年才開始突然爆發熱度。
在Flink被apache提升為頂級項目之后,阿里實時計算團隊決定在阿里內部建立一個 Flink 分支 Blink,並對 Flink 進行大量的修改和完善,讓其適應阿里巴巴這種超大規模的業務場景。
Blink由2016年上線,服務於阿里集團內部搜索、推薦、廣告和螞蟻等大量核心實時業務。與2019年1月Blink正式開源,目前阿里70%的技術部門都有使用該版本。
Blink比起Flink的優勢就是對SQL語法的更完善的支持以及執行SQL的性能提升。
2 Flink的重要特點
2.1 事件驅動型(Event-driven)
事件驅動型應用是一類具有狀態的應用,它從一個或多個事件流提取數據,並根據到來的事件觸發計算、狀態更新或其他外部動作。比較典型的就是以kafka為代表的消息隊列幾乎都是事件驅動型應用。
與之不同的就是SparkStreaming微批次,如圖:
事件驅動型:
2.2 流與批的世界觀
批處理的特點是有界、持久、大量,非常適合需要訪問全套記錄才能完成的計算工作,一般用於離線統計。
流處理的特點是無界、實時, 無需針對整個數據集執行操作,而是對通過系統傳輸的每個數據項執行操作,一般用於實時統計。
在spark的世界觀中,一切都是由批次組成的,離線數據是一個大批次,而實時數據是由一個一個無限的小批次組成的。
而在flink的世界觀中,一切都是由流組成的,離線數據是有界限的流,實時數據是一個沒有界限的流,這就是所謂的有界流和無界流。
無界數據流:無界數據流有一個開始但是沒有結束,它們不會在生成時終止並提供數據,必須連續處理無界流,也就是說必須在獲取后立即處理event。對於無界數據流我們無法等待所有數據都到達,因為輸入是無界的,並且在任何時間點都不會完成。處理無界數據通常要求以特定順序(例如事件發生的順序)獲取event,以便能夠推斷結果完整性。
有界數據流:有界數據流有明確定義的開始和結束,可以在執行任何計算之前通過獲取所有數據來處理有界流,處理有界流不需要有序獲取,因為可以始終對有界數據集進行排序,有界流的處理也稱為批處理。
這種以流為世界觀的架構,獲得的最大好處就是具有極低的延遲。
2.3 分層api
最底層級的抽象僅僅提供了有狀態流,它將通過過程函數(Process Function)被嵌入到DataStream API中。底層過程函數(Process Function) 與 DataStream API 相集成,使其可以對某些特定的操作進行底層的抽象,它允許用戶可以自由地處理來自一個或多個數據流的事件,並使用一致的容錯的狀態。除此之外,用戶可以注冊事件時間並處理時間回調,從而使程序可以處理復雜的計算。
實際上,大多數應用並不需要上述的底層抽象,而是針對核心API(Core APIs) 進行編程,比如DataStream API(有界或無界流數據)以及DataSet API(有界數據集)。這些API為數據處理提供了通用的構建模塊,比如由用戶定義的多種形式的轉換(transformations),連接(joins),聚合(aggregations),窗口操作(windows)等等。DataSet API 為有界數據集提供了額外的支持,例如循環與迭代。這些API處理的數據類型以類(classes)的形式由各自的編程語言所表示。
Table API 是以表為中心的聲明式編程,其中表可能會動態變化(在表達流數據時)。Table API遵循(擴展的)關系模型:表有二維數據結構(schema)(類似於關系數據庫中的表),同時API提供可比較的操作,例如select、project、join、group-by、aggregate等。Table API程序聲明式地定義了什么邏輯操作應該執行,而不是准確地確定這些操作代碼的看上去如何 。 盡管Table API可以通過多種類型的用戶自定義函數(UDF)進行擴展,其仍不如核心API更具表達能力,但是使用起來卻更加簡潔(代碼量更少)。除此之外,Table API程序在執行之前會經過內置優化器進行優化。
你可以在表與 DataStream/DataSet 之間無縫切換,以允許程序將 Table API 與 DataStream 以及 DataSet 混合使用。
Flink提供的最高層級的抽象是 SQL 。這一層抽象在語法與表達能力上與 Table API 類似,但是是以SQL查詢表達式的形式表現程序。SQL抽象與Table API交互密切,同時SQL查詢可以直接在Table API定義的表上執行。
2.4 支持有狀態計算
Flink在1.4版本中實現了狀態管理,所謂狀態管理就是在流失計算過程中將算子的中間結果保存在內存或者文件系統中,等下一個事件進入算子后可以讓當前事件的值與歷史值進行匯總累計。
2.5 支持exactly-once語義
在分布式系統中,組成系統的各個計算機是獨立的。這些計算機有可能fail。
一個sender發送一條message到receiver。根據receiver出現fail時sender如何處理fail,可以將message delivery分為三種語義:
At Most once: 對於一條message,receiver最多收到一次(0次或1次).
可以達成At Most Once的策略:
sender把message發送給receiver.無論receiver是否收到message,sender都不再重發message.
At Least once: 對於一條message,receiver最少收到一次(1次及以上).
可以達成At Least Once的策略:
sender把message發送給receiver.當receiver在規定時間內沒有回復ACK或回復了error信息,那么sender重發這條message給receiver,直到sender收到receiver的ACK.
Exactly once: 對於一條message,receiver確保只收到一次
2.6 支持事件時間(EventTime)
目前大多數框架時間窗口計算,都是采用當前系統時間,以時間為單位進行的聚合計算只能反應數據到達計算引擎的時間,而並不是實際業務時間
第二章 快速上手
1 搭建maven工程 flink-2019
1.1、pom文件
<?xml version="1.0" encoding="UTF-8"?>
<build>
|
1.2 添加scala框架 和 scala文件夾
2 批處理wordcount
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} object WordCountBeach { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val input = "F:\\input\\words.txt" val ds: DataSet[String] = env.readTextFile(input) import org.apache.flink.api.scala._ val aggDs = ds.flatMap(_.split(" ")).map((_, 1)).groupBy(0).sum(1) aggDs.print() } }
def main(args: Array[String]): Unit = {
|
注意:Flink程序支持java 和 scala兩種語言,本課程中以scala語言為主。
在引入包中,有java和scala兩種包時注意要使用scala的包
3 流處理 wordcount
import org.apache.flink.api.java.utils.ParameterTool
|
測試
在linux系統中用
nc -lk 7777 |
進行發送測試
第三章 Flink部署
1 standalone模式
1.1 安裝
解壓縮 flink-1.7.0-bin-hadoop27-scala_2.11.tgz
修改 flink/conf/flink-conf.yaml 文件
注意:yaml文件必須是冒號+空格
修改 /conf/slave文件
.分發給 另外兩台機子
啟動
1.2 提交任務
1) 准備數據文件
2) 把含數據文件的文件夾,分發到taskmanage 機器中
由於讀取數據是從本地磁盤讀取,實際任務會被分發到taskmanage的機器中,所以要把目標文件分發。
3) 執行程序
./flink run -c com.atguigu.flink.app.BatchWcApp /ext/flink0503-1.0-SNAPSHOT.jar --input /applog/flink/input.txt --output /applog/flink/output.csv |
4) 到目標文件夾中查看計算結果
注意:計算結果根據會保存到taskmanage的機器下,不會再jobmanage下。
5) 在webui控制台查看計算過程
2 yarn模式
1) 啟動hadoop集群
2) 啟動yarn-session
./yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d |
其中:
-n(--container):TaskManager的數量。
-s(--slots): 每個TaskManager的slot數量,默認一個slot一個core,默認每個taskmanager的slot的個數為1,有時可以多一些taskmanager,做冗余。
-jm:JobManager的內存(單位MB)。
-tm:每個taskmanager的內存(單位MB)。
-nm:yarn 的appName(現在yarn的ui上的名字)。
-d:后台執行。
3) 執行任務
./flink run -m yarn-cluster -c com.atguigu.flink.app.BatchWcApp /ext/flink0503-1.0-SNAPSHOT.jar --input /applog/flink/input.txt --output /applog/flink/output5.csv |
4) 去yarn控制台查看任務狀態
第四章 Flink運行架構
1 任務提交流程(yarn模式)
圖 Yarn模式任務提交流程
Flink任務提交后,Client向HDFS上傳Flink的Jar包和配置,之后向Yarn ResourceManager提交任務,ResourceManager分配Container資源並通知對應的NodeManager啟動ApplicationMaster,ApplicationMaster啟動后加載Flink的Jar包和配置構建環境,然后啟動JobManager,之后ApplicationMaster向ResourceManager申請資源啟動TaskManager,ResourceManager分配Container資源后,由ApplicationMaster通知資源所在節點的NodeManager啟動TaskManager,NodeManager加載Flink的Jar包和配置構建環境並啟動TaskManager,TaskManager啟動后向JobManager發送心跳包,並等待JobManager向其分配任務。
2 任務調度原理
圖 任務調度原理
客戶端不是運行時和程序執行的一部分,但它用於准備並發送dataflow(JobGraph)給Master(JobManager),然后,客戶端斷開連接或者維持連接以等待接收計算結果。
當 Flink 集群啟動后,首先會啟動一個 JobManger 和一個或多個的 TaskManager。由 Client 提交任務給 JobManager,JobManager 再調度任務到各個 TaskManager 去執行,然后 TaskManager 將心跳和統計信息匯報給 JobManager。TaskManager 之間以流的形式進行數據的傳輸。上述三者均為獨立的 JVM 進程。
Client 為提交 Job 的客戶端,可以是運行在任何機器上(與 JobManager 環境連通即可)。提交 Job 后,Client 可以結束進程(Streaming的任務),也可以不結束並等待結果返回。
JobManager 主要負責調度 Job 並協調 Task 做 checkpoint,職責上很像 Storm 的 Nimbus。從 Client 處接收到 Job 和 JAR 包等資源后,會生成優化后的執行計划,並以 Task 的單元調度到各個 TaskManager 去執行。
TaskManager 在啟動的時候就設置好了槽位數(Slot),每個 slot 能啟動一個 Task,Task 為線程。從 JobManager 處接收需要部署的 Task,部署啟動后,與自己的上游建立 Netty 連接,接收數據並處理。
關於執行圖
Flink 中的執行圖可以分成四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執行圖。
StreamGraph:是根據用戶通過 Stream API 編寫的代碼生成的最初的圖。用來表示程序的拓撲結構。
JobGraph:StreamGraph經過優化后生成了 JobGraph,提交給 JobManager 的數據結構。主要的優化為,將多個符合條件的節點 chain 在一起作為一個節點,這樣可以減少數據在節點之間流動所需要的序列化/反序列化/傳輸消耗。
ExecutionGraph:JobManager 根據 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的並行化版本,是調度層最核心的數據結構。
物理執行圖:JobManager 根據 ExecutionGraph 對 Job 進行調度后,在各個TaskManager 上部署 Task 后形成的“圖”,並不是一個具體的數據結構。
3 Worker與Slots
每一個worker(TaskManager)是一個JVM進程,它可能會在獨立的線程上執行一個或多個subtask。為了控制一個worker能接收多少個task,worker通過task slot來進行控制(一個worker至少有一個task slot)。·
每個task slot表示TaskManager擁有資源的一個固定大小的子集。假如一個TaskManager有三個slot,那么它會將其管理的內存分成三份給各個slot。資源slot化意味着一個subtask將不需要跟來自其他job的subtask競爭被管理的內存,取而代之的是它將擁有一定數量的內存儲備。需要注意的是,這里不會涉及到CPU的隔離,slot目前僅僅用來隔離task的受管理的內存。
通過調整task slot的數量,允許用戶定義subtask之間如何互相隔離。如果一個TaskManager一個slot,那將意味着每個task group運行在獨立的JVM中(該JVM可能是通過一個特定的容器啟動的),而一個TaskManager多個slot意味着更多的subtask可以共享同一個JVM。而在同一個JVM進程中的task將共享TCP連接(基於多路復用)和心跳消息。它們也可能共享數據集和數據結構,因此這減少了每個task的負載。
圖 TaskManager與Slot
Task Slot是靜態的概念,是指TaskManager具有的並發執行能力,可以通過參數taskmanager.numberOfTaskSlots進行配置,而並行度parallelism是動態概念,即TaskManager運行程序時實際使用的並發能力,可以通過參數parallelism.default進行配置。
也就是說,假設一共有3個TaskManager,每一個TaskManager中的分配3個TaskSlot,也就是每個TaskManager可以接收3個task,一共9個TaskSlot,如果我們設置parallelism.default=1,即運行程序默認的並行度為1,9個TaskSlot只用了1個,有8個空閑,因此,設置合適的並行度才能提高效率。
4 並行數據流
Flink程序的執行具有並行、分布式的特性。在執行過程中,一個 stream 包含一個或多個 stream partition ,而每一個 operator 包含一個或多個 operator subtask,這些operator subtasks在不同的線程、不同的物理機或不同的容器中彼此互不依賴得執行。
一個特定operator的subtask的個數被稱之為其parallelism(並行度)。一個stream的並行度總是等同於其producing operator的並行度。一個程序中,不同的operator可能具有不同的並行度。
圖 並行數據流
Stream在operator之間傳輸數據的形式可以是one-to-one(forwarding)的模式也可以是redistributing的模式,具體是哪一種形式,取決於operator的種類。
One-to-one:stream(比如在source和map operator之間)維護着分區以及元素的順序。那意味着map operator的subtask看到的元素的個數以及順序跟source operator的subtask生產的元素的個數、順序相同,map、fliter、flatMap等算子都是one-to-one的對應關系。
Ø 類似於spark中的窄依賴
Redistributing:stream(map()跟keyBy/window之間或者keyBy/window跟sink之間)的分區會發生改變。每一個operator subtask依據所選擇的transformation發送數據到不同的目標subtask。例如,keyBy() 基於hashCode重分區、broadcast和rebalance會隨機重新分區,這些算子都會引起redistribute過程,而redistribute過程就類似於Spark中的shuffle過程。
Ø 類似於spark中的寬依賴
5 task與operator chains
相同並行度的one to one操作,Flink這樣相連的operator 鏈接在一起形成一個task,原來的operator成為里面的subtask。將operators鏈接成task是非常有效的優化:它能減少線程之間的切換和基於緩存區的數據交換,在減少時延的同時提升吞吐量。鏈接的行為可以在編程API中進行指定。
圖 task與operator chains
OperatorChain的優點
Ø 減少線程切換
Ø 減少序列化與反序列化
Ø 減少延遲並且提高吞吐能力
• OperatorChain 組成條件(重要)
Ø 上下游算子並行度一致
Ø 上下游算子之間沒有數據shuffle
第五章 Flink 流處理Api
1 Environment
getExecutionEnvironment
創建一個執行環境,表示當前執行程序的上下文。 如果程序是獨立調用的,則此方法返回本地執行環境;如果從命令行客戶端調用程序以提交到集群,則此方法返回此集群的執行環境,也就是說,getExecutionEnvironment會根據查詢運行的方式決定返回什么樣的運行環境,是最常用的一種創建執行環境的方式。
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
|
如果沒有設置並行度,會以flink-conf.yaml中的配置為准,默認是1
createLocalEnvironment
返回本地執行環境,需要在調用時指定默認的並行度。
val env = StreamExecutionEnvironment.createLocalEnvironment(1) |
createRemoteEnvironment
返回集群執行環境,將Jar提交到遠程服務器。需要在調用時指定JobManager的IP和端口號,並指定要在集群中運行的Jar包。
val env = ExecutionEnvironment.createRemoteEnvironment("jobmanager-hostname", 6123,"C://jar//flink//wordcount.jar") |
2 Source
創建kafka工具類
object MyKafkaUtil {
|
增加業務主類 StartupApp
object StartupApp { def main(args: Array[String]): Unit = { } |
Flink+kafka是如何實現exactly-once語義的
Flink通過checkpoint來保存數據是否處理完成的狀態
由JobManager協調各個TaskManager進行checkpoint存儲,checkpoint保存在 StateBackend中,默認StateBackend是內存級的,也可以改為文件級的進行持久化保存。
執行過程實際上是一個兩段式提交,每個算子執行完成,會進行“預提交”,直到執行完sink操作,會發起“確認提交”,如果執行失敗,預提交會放棄掉。
如果宕機需要通過StateBackend進行恢復,只能恢復所有確認提交的操作。
3 Transform
轉換算子
3.1 map
val streamMap = stream.map { x => x * 2 } |
3.2 flatMap
val streamFlatMap = stream.flatMap{ x => x.split(" ") } |
3.3 Filter
val streamFilter = stream.filter{ x => x == 1 } |
3.4 KeyBy
DataStream → KeyedStream:輸入必須是Tuple類型,邏輯地將一個流拆分成不相交的分區,每個分區包含具有相同key的元素,在內部以hash的形式實現的。
3.5 Reduce
KeyedStream → DataStream:一個分組數據流的聚合操作,合並當前的元素和上次聚合的結果,產生一個新的值,返回的流中包含每一次聚合的結果,而不是只返回最后一次聚合的最終結果。
//求各個渠道的累計個數
|
flink是如何保存累計值的,
flink是一種有狀態的流計算框架,其中說的狀態包括兩個層面:
1) operator state 主要是保存數據在流程中的處理狀態,用於確保語義的exactly-once。
2) keyed state 主要是保存數據在計算過程中的累計值。
這兩種狀態都是通過checkpoint機制保存在StateBackend中,StateBackend可以選擇保存在內存中(默認使用)或者保存在磁盤文件中。
3.6 Split 和 Select
Split
圖 Split
DataStream → SplitStream:根據某些特征把一個DataStream拆分成兩個或者多個DataStream。
Select
圖 Select
SplitStream→DataStream:從一個SplitStream中獲取一個或者多個DataStream。
需求:把appstore和其他的渠道的數據單獨拆分出來,做成兩個流
// 將appstore與其他渠道拆分拆分出來 成為兩個獨立的流
|
3.7 Connect和 CoMap
圖 Connect算子
DataStream,DataStream → ConnectedStreams:連接兩個保持他們類型的數據流,兩個數據流被Connect之后,只是被放在了一個同一個流中,內部依然保持各自的數據和形式不發生任何變化,兩個流相互獨立。
CoMap,CoFlatMap
圖 CoMap/CoFlatMap
ConnectedStreams → DataStream:作用於ConnectedStreams上,功能與map和flatMap一樣,對ConnectedStreams中的每一個Stream分別進行map和flatMap處理。
//合並以后打印
|
3.8 Union
圖 Union
DataStream → DataStream:對兩個或者兩個以上的DataStream進行union操作,產生一個包含所有DataStream元素的新DataStream。注意:如果你將一個DataStream跟它自己做union操作,在新的DataStream中,你將看到每一個元素都出現兩次。
//合並以后打印
|
Connect與 Union 區別:
1 、 Union之前兩個流的類型必須是一樣,Connect可以不一樣,在之后的coMap中再去調整成為一樣的。
2 Connect只能操作兩個流,Union可以操作多個
4 Sink
Flink沒有類似於spark中foreach方法,讓用戶進行迭代的操作。雖有對外的輸出操作都要利用Sink完成。最后通過類似如下方式完成整個任務最終輸出操作。
myDstream.addSink(new MySink(xxxx)) |
官方提供了一部分的框架的sink。除此以外,需要用戶自定義實現sink。
4.1 Kafka
pom.xml
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 -->
|
mykafkaUtil中增加方法
def getProducer(topic:String): FlinkKafkaProducer011[String] ={
|
主函數中添加sink
val myKafkaProducer: FlinkKafkaProducer011[String] = MyKafkaUtil.getProducer("channel_sum")
sumDstream.map( chCount=>chCount._1+":"+chCount._2 ).addSink(myKafkaProducer)
|
4.2 Redis
pom.xml
<!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis -->
|
object MyRedisUtil {
|
在主函數中調用
sumDstream.map( chCount=>(chCount._1,chCount._2+"" )).addSink(MyRedisUtil.getRedisSink())
|
4.3 Elasticsearch
pom.xml
<dependency>
|
添加MyEsUtil
import java.util
|
在main方法中調用
// 明細發送到es 中
|
4.4 JDBC 自定義sink
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
|
添加MyJdbcSink
class MyJdbcSink(sql:String ) extends RichSinkFunction[Array[Any]] { //反復調用
|
在main方法中增加
把明細保存到mysql中
val startUplogDstream: DataStream[StartUpLog] = dstream.map{ JSON.parseObject(_,classOf[StartUpLog])}
|
第六章 Time與Window
1 Time
在Flink的流式處理中,會涉及到時間的不同概念,如下圖所示:
圖 Flink時間概念
Event Time:是事件創建的時間。它通常由事件中的時間戳描述,例如采集的日志數據中,每一條日志都會記錄自己的生成時間,Flink通過時間戳分配器訪問事件時間戳。
Ingestion Time:是數據進入Flink的時間。
Processing Time:是每一個執行基於時間操作的算子的本地系統時間,與機器相關,默認的時間屬性就是Processing Time。
例如,一條日志進入Flink的時間為2017-11-12 10:00:00.123,到達Window的系統時間為2017-11-12 10:00:01.234,日志的內容如下:
2017-11-02 18:37:15.624 INFO Fail over to rm2
對於業務來說,要統計1min內的故障日志個數,哪個時間是最有意義的?—— eventTime,因為我們要根據日志的生成時間進行統計。
2 Window
2.1 Window概述
streaming流式計算是一種被設計用於處理無限數據集的數據處理引擎,而無限數據集是指一種不斷增長的本質上無限的數據集,而window是一種切割無限數據為有限塊進行處理的手段。
Window是無限數據流處理的核心,Window將一個無限的stream拆分成有限大小的“buckets”桶,我們可以在這些桶上做計算操作。
2.2 Window類型
Window可以分成兩類:
l CountWindow:按照指定的數據條數生成一個Window,與時間無關。
l TimeWindow:按照時間生成Window。
對於TimeWindow,可以根據窗口實現原理的不同分成三類:滾動窗口(Tumbling Window)、滑動窗口(Sliding Window)和會話窗口(Session Window)。
1. 滾動窗口(Tumbling Windows)
將數據依據固定的窗口長度對數據進行切片。
特點:時間對齊,窗口長度固定,沒有重疊。
滾動窗口分配器將每個元素分配到一個指定窗口大小的窗口中,滾動窗口有一個固定的大小,並且不會出現重疊。例如:如果你指定了一個5分鍾大小的滾動窗口,窗口的創建如下圖所示:
圖 滾動窗口
適用場景:適合做BI統計等(做每個時間段的聚合計算)。
2. 滑動窗口(Sliding Windows)
滑動窗口是固定窗口的更廣義的一種形式,滑動窗口由固定的窗口長度和滑動間隔組成。
特點:時間對齊,窗口長度固定,有重疊。
滑動窗口分配器將元素分配到固定長度的窗口中,與滾動窗口類似,窗口的大小由窗口大小參數來配置,另一個窗口滑動參數控制滑動窗口開始的頻率。因此,滑動窗口如果滑動參數小於窗口大小的話,窗口是可以重疊的,在這種情況下元素會被分配到多個窗口中。
例如,你有10分鍾的窗口和5分鍾的滑動,那么每個窗口中5分鍾的窗口里包含着上個10分鍾產生的數據,如下圖所示:
圖 滑動窗口
適用場景:對最近一個時間段內的統計(求某接口最近5min的失敗率來決定是否要報警)。
3. 會話窗口(Session Windows)
由一系列事件組合一個指定時間長度的timeout間隙組成,類似於web應用的session,也就是一段時間沒有接收到新數據就會生成新的窗口。
特點:時間無對齊。
session窗口分配器通過session活動來對元素進行分組,session窗口跟滾動窗口和滑動窗口相比,不會有重疊和固定的開始時間和結束時間的情況,相反,當它在一個固定的時間周期內不再收到元素,即非活動間隔產生,那個這個窗口就會關閉。一個session窗口通過一個session間隔來配置,這個session間隔定義了非活躍周期的長度,當這個非活躍周期產生,那么當前的session將關閉並且后續的元素將被分配到新的session窗口中去。
圖 會話窗口
3 Window API
TimeWindow
TimeWindow是將指定時間范圍內的所有數據組成一個window,一次對一個window里面的所有數據進行計算。
1. 滾動窗口
Flink默認的時間窗口根據Processing Time 進行窗口的划分,將Flink獲取到的數據根據進入Flink的時間划分到不同的窗口中。
val keyedStream: KeyedStream[(String, Int), Tuple] = startUplogDstream.map(startuplog=>(startuplog.ch,1)).keyBy(0)
|
時間間隔可以通過Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一個來指定。
2. 滑動窗口(SlidingEventTimeWindows)
滑動窗口和滾動窗口的函數名是完全一致的,只是在傳參數時需要傳入兩個參數,一個是window_size,一個是sliding_size。
下面代碼中的sliding_size設置為了2s,也就是說,窗口每2s就計算一次,每一次計算的window范圍是5s內的所有元素。
val keyedStream: KeyedStream[(String, Int), Tuple] = startUplogDstream.map(startuplog=>(startuplog.ch,1)).keyBy(0)
|
時間間隔可以通過Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一個來指定。
CountWindow
CountWindow根據窗口中相同key元素的數量來觸發執行,執行時只計算元素數量達到窗口大小的key對應的結果。
注意:CountWindow的window_size指的是相同Key的元素的個數,不是輸入的所有元素的總數。
1 滾動窗口
默認的CountWindow是一個滾動窗口,只需要指定窗口大小即可,當元素數量達到窗口大小時,就會觸發窗口的執行。
val keyedStream: KeyedStream[(String, Int), Tuple] = startUplogDstream.map(startuplog=>(startuplog.ch,1)).keyBy(0)
|
2 滑動窗口
滑動窗口和滾動窗口的函數名是完全一致的,只是在傳參數時需要傳入兩個參數,一個是window_size,一個是sliding_size。
下面代碼中的sliding_size設置為了2,也就是說,每收到兩個相同key的數據就計算一次,每一次計算的window范圍是5個元素。
val keyedStream: KeyedStream[(String, Int), Tuple] = startUplogDstream.map(startuplog=>(startuplog.ch,1)).keyBy(0)
|
第七章 EventTime與Window
1 EventTime的引入
在Flink的流式處理中,絕大部分的業務都會使用eventTime,一般只在eventTime無法使用時,才會被迫使用ProcessingTime或者IngestionTime。
如果要使用EventTime,那么需要引入EventTime的時間屬性,引入方式如下所示:
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 從調用時刻開始給env創建的每一個stream追加時間特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
2 Watermark
2.1 基本概念
我們知道,流處理從事件產生,到流經source,再到operator,中間是有一個過程和時間的,雖然大部分情況下,流到operator的數據都是按照事件產生的時間順序來的,但是也不排除由於網絡、分布式等原因,導致亂序的產生,所謂亂序,就是指Flink接收到的事件的先后順序不是嚴格按照事件的Event Time順序排列的。
圖 數據的亂序
那么此時出現一個問題,一旦出現亂序,如果只根據eventTime決定window的運行,我們不能明確數據是否全部到位,但又不能無限期的等下去,此時必須要有個機制來保證一個特定的時間后,必須觸發window去進行計算了,這個特別的機制,就是Watermark。
Watermark是一種衡量Event Time進展的機制,它是數據本身的一個隱藏屬性,數據本身攜帶着對應的Watermark。
Watermark是用於處理亂序事件的,而正確的處理亂序事件,通常用Watermark機制結合window來實現。
數據流中的Watermark用於表示timestamp小於Watermark的數據,都已經到達了,因此,window的執行也是由Watermark觸發的。
Watermark可以理解成一個延遲觸發機制,我們可以設置Watermark的延時時長t,每次系統會校驗已經到達的數據中最大的maxEventTime,然后認定eventTime小於maxEventTime - t的所有數據都已經到達,如果有窗口的停止時間等於maxEventTime – t,那么這個窗口被觸發執行。
有序流的Watermarker如下圖所示:(Watermark設置為0)
圖 有序數據的Watermark
亂序流的Watermarker如下圖所示:(Watermark設置為2)
圖 無序數據的Watermark
當Flink接收到每一條數據時,都會產生一條Watermark,這條Watermark就等於當前所有到達數據中的maxEventTime - 延遲時長,也就是說,Watermark是由數據攜帶的,一旦數據攜帶的Watermark比當前未觸發的窗口的停止時間要晚,那么就會觸發相應窗口的執行。由於Watermark是由數據攜帶的,因此,如果運行過程中無法獲取新的數據,那么沒有被觸發的窗口將永遠都不被觸發。
上圖中,我們設置的允許最大延遲到達時間為2s,所以時間戳為7s的事件對應的Watermark是5s,時間戳為12s的事件的Watermark是10s,如果我們的窗口1是1s~5s,窗口2是6s~10s,那么時間戳為7s的事件到達時的Watermarker恰好觸發窗口1,時間戳為12s的事件到達時的Watermark恰好觸發窗口2。
Watermark 就是觸發前一窗口的“關窗時間”,一旦觸發關門那么以當前時刻為准在窗口范圍內的所有所有數據都會收入窗中。
只要沒有達到水位那么不管現實中的時間推進了多久都不會觸發關窗。
2.2 Watermark的引入
val textWithEventTimeDstream: DataStream[(String, Long, Int)] = textWithTsDstream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.milliseconds(1000)) {
|
7.3 EvnetTimeWindow API
3.1 滾動窗口(TumblingEventTimeWindows)
def main(args: Array[String]): Unit = {
|
結果是按照Event Time的時間窗口計算得出的,而無關系統的時間(包括輸入的快慢)。
3.2 滑動窗口(SlidingEventTimeWindows)
def main(args: Array[String]): Unit = {
|
3.3 會話窗口(EventTimeSessionWindows)
相鄰兩次數據的EventTime的時間差超過指定的時間間隔就會觸發執行。如果加入Watermark, 會在符合窗口觸發的情況下進行延遲。到達延遲水位再進行窗口觸發。
def main(args: Array[String]): Unit = {
|
第八章 Table API 與SQL
Table API是流處理和批處理通用的關系型API,Table API可以基於流輸入或者批輸入來運行而不需要進行任何修改。Table API是SQL語言的超集並專門為Apache Flink設計的,Table API是Scala 和Java語言集成式的API。與常規SQL語言中將查詢指定為字符串不同,Table API查詢是以Java或Scala中的語言嵌入樣式來定義的,具有IDE支持如:自動完成和語法檢測。
1 需要引入的pom依賴
<dependency>
|
2 構造表環境
def main(args: Array[String]): Unit = {
|
動態表
如果流中的數據類型是case class可以直接根據case class的結構生成table
tableEnv.fromDataStream(startupLogDstream) |
或者根據字段順序單獨命名
tableEnv.fromDataStream(startupLogDstream,’mid,’uid .......) |
最后的動態表可以轉換為流進行輸出
table.toAppendStream[(String,String)] |
字段
用一個單引放到字段前面 來標識字段名, 如 ‘name , ‘mid ,’amount 等
3 通過一個例子 了解TableAPI
//每10秒中渠道為appstore的個數
|
關於group by
1、 如果使用 groupby table轉換為流的時候只能用toRetractDstream
val rDstream: DataStream[(Boolean, (String, Long))] = table.toRetractStream[(String,Long)] |
2、 toRetractDstream 得到的第一個boolean型字段標識 true就是最新的數據,false表示過期老數據
val rDstream: DataStream[(Boolean, (String, Long))] = table.toRetractStream[(String,Long)] rDstream.filter(_._1).print() |
3、 如果使用的api包括時間窗口,那么時間的字段必須,包含在group by中。
val table: Table = startupLogTable.filter("ch ='appstore'").window(Tumble over 10000.millis on 'ts as 'tt).groupBy('ch ,'tt).select("ch,ch.count ") |
關於時間窗口
1 用到時間窗口,必須提前聲明時間字段,如果是processTime直接在創建動態表時進行追加就可以
val startupLogTable: Table = tableEnv.fromDataStream(startupLogWithEtDstream,'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,'logDate,'logHour,'logHourMinute,'ts.rowtime) |
2 如果是EventTime要在創建動態表時聲明
val startupLogTable: Table = tableEnv.fromDataStream(startupLogWithEtDstream,'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,'logDate,'logHour,'logHourMinute,'ps.processtime)
|
3 滾動窗口可以使用Tumble over 10000.millis on
val table: Table = startupLogTable.filter("ch ='appstore'").window(Tumble over 10000.millis on 'ts as 'tt).groupBy('ch ,'tt).select("ch,ch.count ") |
4 SQL如何編寫
def main(args: Array[String]): Unit = {
|