Flink 原理(六)——異步I/O(asynchronous I/O)


1、前言

  本文是基於Flink官網上Asynchronous  I/O的介紹結合自己的理解寫成的,若有不正確的歡迎大伙留言交流,謝謝

2、Asynchronous  I/O簡介

  將Flink用於流計算時,若涉及到和外部系統進行交互,如利用Flink從數據庫中讀取數據,這種需要獲取I/O的場景時,我們需要考慮交互所帶來的時延問題。

  為分析如何減少時延,我們先來分析一下,Flink以同步的形式方法外部系統(以MapFunction中和數據庫交互為例)的過程,若圖1虛線左側所示,請求a發送到database后,MapFunction等待回復后才進行下發送下一個請求b,期間,I/O處於空閑狀態,請求b又開始重復此過程,這樣在兩個來回的時間內(發送請求-收到結果為一個來回),只處理兩個請求。如圖1虛線右側所示,同樣是在兩個來回的時間內,以異步的形式進行交互,請求a發出去后,在等待回復時,請求b,c,d依次發出,這樣既可以處理4個請求了。

 圖1 同/異訪問數據庫方式對比圖(Ref[1])

  在某些場景下,為了提高系統的吞吐能力,可以僅通過增大MapFunction的並發度以達目的,但是隨之而來是資源的大量消耗。

  【重要事項

  1)為了實現以異步I/O訪問數據庫或K/V存儲,數據庫等需要有能支持異步請求的client;若是沒有,可以通過創建多個同步的client並使用線程池處理同步call的方式實現類似並發的client,但是這方式沒有異步I/O的性能好。

  2)AsyncFunction不是以多線程方式調用的,一個AsyncFunction實例按順序為每個獨立消息發送請求;

  3)目前(Flink 1.9),使用AsyncWaitOperator時要打斷operator chain(默認也是不使用),原因見FLINK-13063

3、結果的順序

  由於請求響應的快慢可能不一樣,AsyncFunction的“並發”請求可能導致結果的亂序 。如圖1中虛線右側所示,若請求b發出之后,其結果在請求a的之前返回,這樣異步I/O算子前后的消息順序就不一致了。為了控制結果的返回順序,Flink提供了兩種模式:

  1)Unordered:當異步的請求完成時,其結果立馬返回,不考慮結果順序即亂序模式。當以processing time作為時間屬性時,該模式可以獲得最小的延時和最小的開銷,使用方式:AsyncDataStream.unorderedWait(...);

  2)Ordered:該模式下,消息在異步I/O算子前后的順序一致,先請求的先返回,即有序模式。為實現有序模式,算子將請求返回的結果放入緩存,直到該請求之前的結果全部返回或超時。該模式通常情況下回引入額外的時延以及在checkpoint過程中會帶來開銷,這是因為,和無序模式相比,消息和請求返回的結果都會在checkpoint的狀態中維持更長時間。使用方式:AsyncDataStream.orderedWait(...);

  在此,我們需要針對流任務和event time相結合的情況進行補充說明。為什么?是因為watermark和消息的整體相對位置是不會變的,什么意思了?發生在某個watermark之后的消息,只能在watermark被發出之后發出,其請求結果也是。換句話說,兩個watermark之間的消息整體與watermark的有序的。當然這個區間內消息之間是否有序這得根據使用的模式來分析。

  1)對Ordered模式,因為消息本身是有序的,所以watermark和消息之間也是有序的,和processing time相比,其不需要引入額外的開銷;

  2)對Unordered模式,其模式是先響應先返回,但在與event time結合的情況里,消息或結果都需在特定watermark發出之后才能發出,此時,就會引入延時和開銷,其開銷的大小取決於watermark的頻率,其原因參加下文原理部分。

4、原理

   4.1 Terms

  為更加詳細的說明異步I/O的實現過程,先說明幾個term,其中也會涉及其基本用法,若分析原理只看其含義即可。

  1)AsyncFunction:異步I/O的觸發接口

    AsyncFunction在AsyncWaitOperator中作為一個用戶函數,類似FlatMap,有open()/processElement(StreamRecord< in > record)/processWatermark(Watermark mark)方法。
 對於用戶自己實現的AsyncFunction,必須重寫asyncInvoke(IN input, AsyncCollector collector)來提供調用異步操作的代碼。

  2)AsyncWaitOperator:調用AsyncFunction的流算子,是個抽象的概念,具體算子是unorderedWait(...)或orderedWait(...)

  3)AsyncCollector:

    AsyncCollector由AsyncWaitOperator創建,並傳遞給AsyncFunction,在這里它應該被添加到用戶的回調函數中。它充當從用戶代碼中獲取結果或錯誤的角色,並通知AsyncCollectorBuffer發出結果。

  4)AsyncCollectorBuffer:AsyncCollectorBuffer保存所有的AsyncCollector,並將結果發送給下一個節點。

  上述概念是工作示意圖可參見Ref[2]

  4.2 架構圖

  在流式計算中,涉及異步I/O的整體過程圖如下:

 

圖2 異步I/O架構圖(Ref[2])

  1)消息達到AsyncWaitOperator后正常處理過程如下:

  AsyncWaitOperator調用AsyncFunction,並創建AsyncCollector傳遞給AsyncFunction。AsyncCollector等待獲取到返回結果(異常)之后將入到AsyncCollectorBuffer保存時,會將一條mark消息放入AsyncCollectorBuffer中,然后一個signal信息將會發送到Emitter 線程,若此時是將消息發送出去的signal,則會將消息發送出去並通知task thread加消息到collector buffer中。至於怎么發要依據代碼中設置的模式是有序還是無序,若是有序則發head,刪head。該過程的更詳細過程如下圖:

 

圖3 異步I/O正常處理消息圖(Ref[2])

  2)checkpoint過程

  AsyncWaitOperator先是對AsyncCollectorBuffer中所有的輸入流數據進行掃描,完成后就刪除state中老的數據,然后將AsyncCollectorBuffer中數據存入到state中,而不是在處理時對單個輸入流一個接一個的存入state,具體過程圖見圖2或圖4。

  3)故障恢復

  在恢復AsyncWaitOperator的狀態時,AsyncWaitOperator將scan狀態中的所有元素,獲取AsyncCollectors,調用AsyncFunction.asyncInvoke()並將它們插入AsyncCollectorBuffer中,具體的如下:

 

圖4 故障恢復和checkpoint流程圖(Ref[2])

 總結:

  關於具體使用的方法見后期的博客,建議大伙看看原文,一千個讀者就有一千個哈姆雷特!

Ref

[1]https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/asyncio.html

[2]https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673

[3]https://blog.icocoro.me/2019/05/26/1905-apache-flinkv2-asyncio/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM