Flink學習筆記:異步I/O訪問外部數據


 本文為《Flink大數據項目實戰》學習筆記,想通過視頻系統學習Flink這個最火爆的大數據計算框架的同學,推薦學習課程:

 Flink大數據項目實戰:http://t.cn/EJtKhaz

1. Aysnc I/O

1.1Aysnc I/O是啥?

Async I/O 是阿里巴巴貢獻給社區的一個呼聲非常高的特性,於1.2版本引入。

 主要目的:是為了解決與外部系統交互時網絡延遲成為了系統瓶頸的問題。

 場景:

流計算系統中經常需要與外部系統進行交互,比如需要查詢外部數據庫以關聯上用戶的額外信息。通常,我們的實現方式是向數據庫發送用戶a的查詢請求(例如在MapFunction中),然后等待結果返回,在這之前,我們無法發送用戶b的查詢請求。這是一種同步訪問的模式,如下圖左邊所示。

 

圖中棕色的長條表示等待時間,可以發現網絡等待時間極大地阻礙了吞吐和延遲。為了解決同步訪問的問題,異步模式可以並發地處理多個請求和回復。也就是說,你可以連續地向數據庫發送用戶a、b、c等的請求,與此同時,哪個請求的回復先返回了就處理哪個回復,從而連續的請求之間不需要阻塞等待,如上圖右邊所示。這也正是 Async I/O 的實現原理。

1.2提高吞吐量的兩種方式對比

目標:提高吞吐量

 

 

1.3使用Aysnc I/O的前提條件

1.數據庫(或key/value存儲)提供支持異步請求的client。

2.沒有異步請求客戶端的話也可以將同步客戶端丟到線程池中執行作為異步客戶端。

注意:后者沒有前者效果好。

1.4Async I/O API實現異步流式轉換

Flink Async I/O API 允許用戶在數據流中使用異步請求客戶端訪問外部存儲。該API處理與數據流的集成,以及消息順序性(Order)、事件時間(event time)、一致性(容錯)等臟活累活。用戶只需要專注於業務。

如果目標數據庫有異步客戶端,則三步即可實現異步流式轉換操作(針對該數據庫異步):

1.實現用來分發請求的AsyncFunction ,用來向數據庫發送異步請求並設置回調。

2.獲取操作結果的callback,並將它提交給ResultFuture。

3.將異步I/O操作應用於DataStream。

1.5AsyncFunction

AsyncFunction實現實例如下所示:

 

1.6AsyncFunction超時處理

當異步I/O請求超時時,默認情況下會拋出異常並重新啟動job。如果希望處理超時,可以覆蓋AsyncFunction的timeout方法。

 

1.7AsyncDataStream工具類

一個工具類,用於將AsyncFunction應用於DataStream。

AsyncFunction發出的並發請求通常是無序的,該順序基於哪個請求先完成。為了控制結果記錄的發出順序,Flink提供了兩種模式,分別對應AsyncDataStream 的兩個靜態方法,orderedWait 和 unorderedWait:

1.orderedWait(有序):消息的發送順序與接收到的順序相同(包括 watermark ),也就是先進先出。

2.unorderedWait(無序) :

a)在 ProcessingTime 的情況下,完全無序,先返回的結果先發送(最低延遲和最低開銷)。

b)在 EventTime 的情況下,watermark 不能超越消息,消息也不能超越 watermark,也就是說 watermark 定義的順序的邊界。在兩個 watermark 之間的消息的發送是無序的,但是在watermark之后的消息不能先於該watermark之前的消息發送。(這意味着,在存在watermark的情況下,無序模式引入了與有序模式相同的延遲和管理開銷。這種開銷的大小取決於watermark頻率。)

 

注意:Ingestion Time是一種特殊的Event Time ,它根據source的processing time自動生成watermark。

1.8容錯與最佳實踐

Async I/O operator提供完全exactly-once容錯保證,它將運行中的異步請求記錄存儲在檢查點中,並在從故障恢復時恢復/重新觸發請求

最佳實踐

1.使用Executor作為Future的回調時,推薦使用線程切換開銷較小的DirectExecutor,可以選擇下面任意方式或得:

org.apache.flink.runtime.concurrent.Executors.directExecutor()

com.google.common.util.concurrent.MoreExecutors.directExecutor()

2.asyncInvoke#asyncInvoke不是被Flink多線程調用的,不要在里面直接使用阻塞操作。

2.原理

2.1原理實現

注意:本小節不要求掌握

AsyncDataStream.(un)orderedWait方法的主要工作就是創建了一個 AsyncWaitOperator。AsyncWaitOperator 是支持異步 IO 訪問的算子實現,該算子會運行 AsyncFunction 並處理異步返回的結果,其內部原理如下圖所示:

 

 

如圖所示,AsyncWaitOperator 主要由兩部分組成:StreamElementQueue 和 Emitter。StreamElementQueue 是一個 Promise 隊列,所謂 Promise 是一種異步抽象表示將來會有一個值(排隊買鴨血粉絲湯給你的小票),這個隊列是未完成的 Promise 隊列,也就是進行中的請求隊列。Emitter 是一個單獨的線程,負責發送消息(收到的異步回復)給下游。

圖中E5表示進入該算子的第五個元素(”Element-5”),在執行過程中首先會將其包裝成一個 “Promise” P5,然后將P5放入隊列。最后調用 AsyncFunction 的 ayncInvoke 方法,該方法會向外部服務發起一個異步的請求,並注冊回調。該回調會在異步請求成功返回時調用 AsyncCollector.collect 方法將返回的結果交給框架處理。實際上 AsyncCollector 是一個 Promise ,也就是 P5,在調用 collect 的時候會標記 Promise 為完成狀態,並通知 Emitter 線程有完成的消息可以發送了。Emitter 就會從隊列中拉取完成的 Promise ,並從 Promise 中取出消息發送給下游。

2.2消息的順序性

上文提到 Async I/O 提供了兩種輸出模式。其實細分有三種模式: 有序,ProcessingTime 無序,EventTime 無序。

Flink 使用隊列來實現不同的輸出模式,並抽象出一個隊列的接口(StreamElementQueue),這種分層設計使得AsyncWaitOperator和Emitter不用關心消息的順序問題。StreamElementQueue有兩種具體實現,分別是 OrderedStreamElementQueue 和 UnorderedStreamElementQueue。UnorderedStreamElementQueue 比較有意思,它使用了一套邏輯巧妙地實現完全無序和 EventTime 無序。

2.3有序

有序比較簡單,使用一個隊列就能實現。所有新進入該算子的元素(包括 watermark),都會包裝成 Promise 並按到達順序放入該隊列。如下圖所示,盡管P4的結果先返回,但並不會發送,只有 P1 (隊首)的結果返回了才會觸發 Emitter 拉取隊首元素進行發送。如下圖所示:

 

2.4ProcessingTime 無序

ProcessingTime 無序也比較簡單,因為沒有 watermark,不需要協調 watermark 與消息的順序性,所以使用兩個隊列就能實現,一個 uncompletedQueue 一個 completedQueue。所有新進入該算子的元素,同樣的包裝成 Promise 並放入 uncompletedQueue 隊列,當uncompletedQueue隊列中任意的Promise返回了數據,則將該 Promise 移到 completedQueue 隊列中,並通知 Emitter 消費。如下圖所示:

 

2.5EventTime 無序

EventTime 無序類似於有序與 ProcessingTime 無序的結合體。因為有 watermark,需要協調 watermark 與消息之間的順序性,所以uncompletedQueue中存放的元素從原先的 Promise 變成了 Promise 集合。如果進入算子的是消息元素,則會包裝成 Promise 放入隊尾的集合中。如果進入算子的是 watermark,也會包裝成 Promise 並放到一個獨立的集合中,再將該集合加入到 uncompletedQueue 隊尾,最后再創建一個空集合加到 uncompletedQueue 隊尾。這樣,watermark 就成了消息順序的邊界。只有處在隊首的集合中的 Promise 返回了數據,才能將該 Promise 移到 completedQueue 隊列中,由 Emitter 消費發往下游。只有隊首集合空了,才能處理第二個集合。這樣就保證了當且僅當某個 watermark 之前所有的消息都已經被發送了,該 watermark 才能被發送。過程如下圖所示:

 

 

2.6快照與恢復

分布式快照機制是為了保證狀態的一致性。我們需要分析哪些狀態是需要快照的,哪些是不需要的。首先,已經完成回調並且已經發往下游的元素是不需要快照的。否則,會導致重發,那就不是 exactly-once 了。而已經完成回調且未發往下游的元素,加上未完成回調的元素,就是上述隊列中的所有元素。

 

所以快照的邏輯也非常簡單。

(1)清空原有的狀態存儲,

(2)遍歷uncompletedQueue中的所有 Promise,從中取出 StreamElement(消息或 watermark)並放入狀態存儲中(3)執行快照操作。

 

恢復的時候,從快照中讀取所有的元素全部再處理一次,當然包括之前已完成回調的元素。所以在失敗恢復后,會有元素重復請求外部服務,但是每個回調的結果只會被發往下游一次。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM