Flink之state processor api原理


無論您是在生產環境中運行Apache Flink or還是在過去將Flink評估為計算框架,您都可能會問自己一個問題:如何在Flink保存點中訪問,寫入或更新狀態?不再詢問!Apache Flink 1.9.0引入了State Processor API,它是DataSet API的強大擴展,它允許讀取,寫入和修改Flink的保存點和檢查點中的狀態。

在這篇文章中,我們解釋了為什么此功能對Flink來說是重要的一步,它的用途以及使用方法。最后,我們將討論狀態處理器API的未來,以及它如何與我們將Flink發展成為用於統一批處理和流處理的系統的計划保持一致

很多流處理應用程序都是有狀態的,並且大多數設計運行數月或數年。隨着時間的流逝,它們中的許多會累積大量有價值的狀態,如果狀態由於故障而丟失,則可能非常昂貴,甚至無法重建。為了保證應用程序狀態的一致性和持久性,Flink從一開始就采用了復雜的檢查點和恢復機制。在每個版本中,Flink社區都添加了越來越多的與狀態相關的功能,以提高檢查點和恢復速度,應用程序的維護以及管理應用程序的實踐。

但是,Flink用戶通常要求的功能是“從外部”訪問應用程序狀態的能力。此請求的動機是需要驗證或調試應用程序的狀態,將應用程序的狀態遷移到另一個應用程序,將應用程序從堆狀態后端演進到RocksDB狀態后端或導入的初始狀態。來自外部系統(如關系數據庫)的應用程序。

盡管有所有令人信服的理由在外部公開應用程序狀態,但到目前為止,您的訪問選項一直受到限制。Flink的Queryable State功能僅支持鍵查找(點查詢),並且不能保證返回值的一致性(在應用程序從故障中恢復之前和之后,鍵的值可能不同)。而且,可查詢狀態不能用於添加或修改應用程序的狀態。另外,保存點是應用程序狀態的一致快照,因此無法訪問,因為應用程序狀態是使用自定義二進制格式編碼的。

使用狀態處理器API讀取和寫入應用程序狀態

Flink 1.9附帶的State Processor API確實改變了應用程序狀態的游戲規則!簡而言之,它使用Input和OutputFormats擴展了DataSet API以讀取和寫入保存點或檢查點數據。由於DataSet和Table API互操作性,您甚至可以使用關系Table API或SQL查詢來分析和處理狀態數據。

例如,您可以獲取正在運行的流處理應用程序的保存點,並使用DataSet批處理程序對其進行分析,以驗證該應用程序的行為是否正確。或者,您可以從任何存儲中讀取一批數據,對其進行預處理,然后將結果寫入保存點,以用於引導流應用程序的狀態。現在也可以修復不一致的狀態條目。最后,狀態處理器API開辟了許多方法來開發有狀態的應用程序,這些方法以前被參數和設計選擇所阻塞,這些參數和設計選擇在啟動后不會丟失應用程序的所有狀態的情況下無法更改。例如,您現在可以任意修改狀態的數據類型,調整運算符的最大並行度,拆分或合並運算符狀態,重新分配運算符UID等。

將應用程序狀態映射到數據集

狀態處理器API將流應用程序的狀態映射到一個或多個可以單獨處理的數據集。為了能夠使用API​​,您需要了解此映射的工作方式。

但是,讓我們首先看看有狀態的Flink作業是什么樣的。Flink作業由operator組成,通常是一個或多個source operator,一些實際處理的operator以及一個或多個sink operator。每個operator在一個或多個任務中並行運行,並且可以使用不同類型的狀態。operator可以具有零個,一個或多個“operator states”,這些狀態被組織為以operator任務為范圍的列表。如果將運算符應用於鍵控流,它還可以具有零個,一個或多個“keyed states”,它們的作用域范圍是從每個已處理記錄中提取的鍵。您可以將keyed states視為分布式鍵-值映射。

下圖顯示了應用程序“ MyApp”,該應用程序由稱為“ Src”,“ Proc”和“ Snk”的三個運算符組成。Src具有一個operator state(os1),Proc具有一個operator state(os2)和兩個keyed states(ks1,ks2),而Snk是無狀態的。

 

 MyApp的savepoint或checkpoint由所有狀態的數據組成,這些數據的組織方式可以恢復每個任務的狀態。在使用批處理作業處理savepoint(或checkpoint)的數據時,我們需要一個思維模型,將每個任務狀態的數據映射到數據集或表中。實際上,我們可以將savepoint視為數據庫。每個operator(由其UID標識)代表一個名稱空間。operator的每個operator state都通過一個列映射到名稱空間中的專用表,該列保存所有任務的狀態數據。operator的所有keyed states都映射到一個表,該表由用於key的列和用於每個key state的一列組成。下圖顯示了MyApp的保存點如何映射到數據庫

 

 該圖顯示了“Src”的operator state的值如何映射到具有一列和五行的表,一行數據代表對於Src的所有並行任務中的一個並行實例。“ Proc”的operator state——os2,類似地映射到單個表。keyed state,ks1和ks2被組合到具有三列的單個表中,一列用於鍵,一列用於ks1,一列用於ks2。keyed表為兩個keyed state的每個不同key保持一行。由於“ Snk”沒有任何狀態,因此其名稱空間為空。

狀態處理器API現在提供了創建,加載和編寫savepoint的方法。您可以從已加載的savepoint讀取dataSet,也可以將dataSet轉換為狀態並將其添加到savepoint。可以使用DataSet API的全部功能集來處理DataSet。使用這些構建塊,可以解決所有前面提到的用例(以及更多用例)。如果您想詳細了解如何使用State Processor API,請查看文檔

為什么要使用DataSet API?

如果您熟悉Flink的路線圖,您可能會對State Processor API基於DataSet API感到驚訝。Flink社區計划使用BoundedStreams的概念擴展DataStream API,並棄用DataSet API。在設計此功能時,我們還評估了DataStream API或Table API,但他們都不能提供相應的支持。由於我們不想在Flink API的開發過程中阻止此功能,因此我們決定在DataSet API上構建該功能,但將其對DataSet API的依賴性降到最低。因此,將其遷移到另一個API應該相當容易。

總結

Flink用戶很長時間以來一直要求一種功能來從外部訪問和修改流應用程序的狀態。使用state processor api,Flink 1.9.0最終將應用程序狀態公開為可以操縱的數據格式。此功能為用戶如何維護和管理Flink流應用程序打開了許多新可能性,包括流應用程序的任意演變以及應用程序狀態的導出和引導。簡而言之,state processor api使得savepoint不再是一個黑匣子。

本文翻譯自https://flink.apache.org/feature/2019/09/13/state-processor-api.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM