本文來自官網翻譯: Asynchronous I/O for External Data Access
本頁介紹了Flink API與外部數據存儲的異步I / O的使用。對於不熟悉異步或事件驅動編程的用戶,有關Futures和事件驅動編程可能是有用的准備。
注:有關異步I / O實用程序的設計和實現的詳細信息,請參閱提議和設計文檔 FLIP-12:異步I / O設計和實現。
需要異步I / O操作
當與外部系統交互時(例如,當使用存儲在數據庫中的數據來豐富流事件時),需要注意與外部系統的通信延遲不會主導流應用程序的工作。
訪問外部數據庫中的數據,例如在 MapFunction中
,通常意味着同步交互:將請求發送到數據庫並MapFunction
等待直到收到響應。在許多情況下,這種等待構成了功能的絕大部分時間。
與數據庫的異步交互意味着單個並行函數實例可以同時處理許多請求並同時接收響應。這樣,等待時間可以覆蓋發送其他請求和接收響應。至少,等待時間是在多個請求上分攤的。可以使大多數情況下流量吞吐量更高。
注意:MapFunction
在某些情況下,僅通過擴展到非常高的並行度來提高吞吐量,但通常會產生非常高的資源成本:擁有更多並行MapFunction實例意味着更多任務,線程,Flink內部網絡連接,網絡與數據庫,緩沖區和一般內部數據開銷。
先決條件
如上一節所示,對數據庫(或鍵/值存儲)實現適當的異步I / O需要客戶端訪問支持異步請求的數據庫。許多流行的數據庫提供這樣的客戶端
在沒有這樣的客戶端的情況下,可以通過創建多個客戶端並使用線程池處理同步調用來嘗試將同步客戶端轉變為有限的並發客戶端。但是,這種方法通常比適當的異步客戶端效率低。
異步I / O API
Flink的Async I / O API允許用戶將異步請求客戶端與數據流一起使用。API處理與數據流的集成,以及處理順序,事件時間,容錯等。
假設有一個目標數據庫的異步客戶端,需要三個部分來實現對數據庫的異步I / O流轉換:
- 實現
AsyncFunction分派
請求 - 一個callback,它接收操作的結果並將其交給
ResultFuture
- 在DataStream上應用異步I / O操作作為轉換
以下代碼示例說明了基本模式:
Java
// This example implements the asynchronous request and callback with Futures that have the // interface of Java 8's futures (which is the same one followed by Flink's Future) /** * An implementation of the 'AsyncFunction' that sends requests and sets the callback. */ class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> { /** The database specific client that can issue concurrent requests with callbacks */ private transient DatabaseClient client; @Override public void open(Configuration parameters) throws Exception { client = new DatabaseClient(host, post, credentials); } @Override public void close() throws Exception { client.close(); } @Override public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception { // issue the asynchronous request, receive a future for result final Future<String> result = client.query(key); // set the callback to be executed once the request by the client is complete // the callback simply forwards the result to the result future CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { return result.get(); } catch (InterruptedException | ExecutionException e) { // Normally handled explicitly. return null; } } }).thenAccept( (String dbResult) -> { resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult))); }); } } // create the original stream DataStream<String> stream = ...; // apply the async I/O transformation DataStream<Tuple2<String, String>> resultStream = AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
scala
/** * An implementation of the 'AsyncFunction' that sends requests and sets the callback. */ class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] { /** The database specific client that can issue concurrent requests with callbacks */ lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials) /** The context used for the future callbacks */ implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor()) override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit = { // issue the asynchronous request, receive a future for the result val resultFutureRequested: Future[String] = client.query(str) // set the callback to be executed once the request by the client is complete // the callback simply forwards the result to the result future resultFutureRequested.onSuccess { case result: String => resultFuture.complete(Iterable((str, result))) } } } // create the original stream val stream: DataStream[String] = ... // apply the async I/O transformation val resultStream: DataStream[(String, String)] = AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100)
重要提示:ResultFuture
在第一次通話時完成ResultFuture.complete
。所有后續complete
調用都將被忽略。
以下兩個參數控制異步操作:
-
Timeout:超時定義異步請求在被視為失敗之前可能需要多長時間。此參數可防止死亡/失敗請求。
-
Capacity:此參數定義可以同時進行的異步請求數。盡管異步I / O方法通常會帶來更好的吞吐量,但異步I / O 的操作仍然可能成為流應用程序的瓶頸。限制並發請求的數量可確保操作不會不斷累積,積壓增加的待處理請求,一旦容量耗盡,它將觸發反壓。
超時處理
當異步I / O請求超時時,默認情況下會引發異常並重新啟動作業。如果要處理超時,可以覆蓋該AsyncFunction#timeout
方法。
結果順序
由AsyncFunction
一些未定義的順序經常完成的並發請求,基於哪個請求首先完成。為了控制發出結果記錄的順序,Flink提供了兩種模式:
-
無序:異步請求完成后立即發出結果記錄。在異步I / O運算符之后,流中記錄的順序與以前不同。當使用處理時間作為基本時間特性時,此模式具有最低延遲和最低開銷。使用
AsyncDataStream.unorderedWait(...)
此模式。 -
有序:在這種情況下,保留流順序。結果記錄的發出順序與觸發異步請求的順序相同(運算符輸入記錄的順序)。為此,運算符緩沖結果記錄,直到其所有先前記錄被發出(或超時)。這通常會在檢查點中引入一些額外的延遲和一些開銷,因為與無序模式相比,記錄或結果在檢查點狀態下保持更長的時間。使用
AsyncDataStream.orderedWait(...)
此模式。
活動時間
當流應用程序與事件時間一起工作時,異步I / O操作符將正確處理水印。這意味着兩種訂單模式具體如下:
-
Unordered:水印不會超過記錄,反之亦然,這意味着水印建立了一個順序邊界。記錄僅在水印之間無序發出。只有在發出水印后才會發出某個水印后發生的記錄。反過來,只有在發出水印之前輸入的所有結果記錄之后才會發出水印。
這意味着,在水印的存在,將無序的方式介紹了一些相同的延遲和管理開銷的順序模式一樣。開銷量取決於水印頻率。
-
Ordered:保留記錄的水印順序,就像保留記錄之間的順序一樣。與處理時間相比,開銷沒有顯着變化。
請記住,攝取時間是事件時間的特殊情況,其中自動生成的水印基於源處理時間。
容錯保證
異步I / O運算符提供精確一次的容錯保證。它在檢查點中存儲正在進行的異步請求的記錄,並在從故障中恢復時恢復/重新觸發請求。
實施技巧
對於使用Executor(或Scala中的ExecutionContext)進行回調的Futures實現,我們建議使用DirectExecutor,因為回調通常只做最小的工作,而DirectExecutor避免了額外的線程到線程的切換開銷。 回調通常只將結果傳遞給ResultFuture,后者將其添加到輸出緩沖區。 從那里開始,包括記錄發射和與檢查點簿記交互的重要邏輯無論如何都發生在專用線程池中。
警告
AsyncFunction不是為多線程
我們想在這里明確指出的常見混淆AsyncFunction
是以多線程方式調用。只存在一個實例,AsyncFunction
並且對於流的相應分區中的每個記錄順序調用它。除非該asyncInvoke(...)
方法快速返回並依賴於回調(由客戶端),否則它將不會導致適當的異步I / O.
例如,以下模式會導致阻塞asyncInvoke(...)
函數,從而使異步行為無效:
-
使用其查找/查詢方法調用阻塞的數據庫客戶端,直到收到結果為止
-
阻止/等待
asyncInvoke(...)
方法內異步客戶端返回的future-type對象