【Flink】flink 內部 Akka and Actors


使用Akka,所有遠程過程調用現在都實現為異步消息。 這主要影響JobManager,TaskManager和JobClient的組件。 將來,甚至可能會將更多的組件轉換為參與者,從而允許它們發送和處理異步消息。

Akka and the Actor Model

Akka是開發並發,容錯和可擴展應用程序的框架。 它是參與者模型的實現,因此類似於Erlang的並發模型。 在參與者模型的上下文中,所有代理實體都被視為獨立的參與者。 角色通過彼此發送異步消息與其他角色進行通信。 參與者模型的優勢在於這種異步性。 也可以顯式等待響應,以便您執行同步操作。 但是,強烈建議不要使用同步消息,因為它們會限制系統的可伸縮性。 每個參與者都有一個郵箱,其中存儲了收到的消息。 此外,每個參與者都保持自己的孤立狀態。 下面是幾個參與者的示例網絡。


角色具有單個處理線程,該線程輪詢角色的郵箱並連續處理接收到的消息。 作為已處理消息的結果,參與者可以更改其內部狀態,發送新消息或生成新參與者。 如果一個actor的內部狀態是從其處理線程內部專門操縱的,則無需使actor的狀態線程安全。 即使單個參與者本質上是順序的,由多個參與者組成的系統也是高度並發且可擴展的,因為處理線程在所有參與者之間共享。 這種共享也是為什么從不應該從參與者線程內部調用阻塞調用的原因。 這樣的調用將阻止該線程被其他參與者用來處理自己的消息。

Actor Systems

Actor系統是所有演員生活的容器。 它提供共享服務,例如計划,配置和日志記錄。 參與者系統還包含線程池,所有參與者線程都從該線程池中募集。

多角色系統可以共存於一台機器上。 如果actor系統以RemoteActorRefProvider啟動,則可以從可能駐留在遠程計算機上的另一個actor系統進行訪問。 演員系統自動識別演員消息是發給同一個演員系統中還是遠程演員系統中的演員的。 在本地通信的情況下,可以使用共享內存有效地傳輸消息。 在遠程通信的情況下,消息是通過網絡堆棧發送的。

所有參與者都按層次結構組織。 每個新創建的actor都會將其創建actor作為父級分配。 該層次結構用於監督。 每個父母都有對其子女的監護權。 如果其中一個子項發生錯誤,則會通知他。 如果演員可以解決問題,那么他可以繼續或重新開始孩子。 如果問題超出了他的處理范圍,他可以將錯誤上報給自己的父母。 逐步升級錯誤僅表示當前層之上的層次結構層現在負責解決問題。

系統創建的第一個參與者由系統提供的守護者參與者/用戶監督。 角色層次在此進行了詳細說明。

Actors本身就是狀態和行為的容器。 它是actor線程順序處理傳入的消息。 因為一個actor一次僅活動一個線程,所以它使用戶擺脫了易於出錯的鎖定和線程管理任務。 但是,必須確保僅從此參與者線程訪問參與者的內部狀態。 actor的行為由接收函數定義,該函數為每個消息包含在接收到此消息時執行的某些邏輯。

Flink系統由三個必須通信的分布式組件組成:JobClient,JobManager和TaskManager。 JobClient從用戶那里獲取Flink作業,並將其提交給JobManager。 然后JobManager負責編排作業執行。 首先,它分配所需的資源量。 這主要包括TaskManager上的執行插槽。

分配資源后,JobManager將作業的各個任務部署到相應的TaskManager中。一旦收到任務,TaskManager會生成一個執行任務的線程。 狀態更改(例如開始計算或完成計算)將發送回JobManager。 根據這些狀態更新,JobManager將引導作業執行直到完成。 作業完成后,其結果將發送回給JobClient,以告知用戶相關信息。 下圖描述了作業執行過程。

JobManager & TaskManager

JobManager是中央控制單元,負責執行Flink作業。 因此,它控制着資源分配,任務調度和狀態報告。

必須先啟動一個JobManager和一個或多個TaskManager,然后才能執行任何Flink作業。 然后TaskManager通過向JobManager發送RegisterTaskManager消息在JobManager上注冊。 JobManager通過``確認注冊''消息確認注冊成功。 如果TaskManager已在JobManager上注冊,則由於發送了多個RegisterTaskManager消息,則JobManager返回一個AlreadyRegistered消息。 如果注冊被拒絕,則JobManager將以RefuseRegistration消息作為響應。

通過向作業管理器發送帶有相應JobGraph的SubmitJob消息向作業管理器提交作業。 收到JobGraph后,JobManager將在JobGraph中創建一個ExecutionGraph,作為分布式執行的邏輯表示。 ExecutionGraph包含有關必須執行才能部署到TaskManager的任務的信息。

JobManager的調度程序負責在可用TaskManager上分配執行插槽。 在TaskManager上分配執行插槽后,帶有執行任務所需的所有必要信息的SubmitTask消息將發送到相應的TaskManager。 TaskOperationResult確認任務部署成功。 一旦部署並運行了提交作業的源,作業提交也被認為是成功的。 JobManager通過發送帶有相應作業ID的成功消息來通知JobClient此狀態。

在TaskManager上運行的單個任務的狀態更新通過UpdateTaskExecutionState消息發送回JobManager。 使用這些更新消息,可以更新ExecutionGraph以反映執行的當前狀態。

JobManager還充當數據源的輸入拆分分配器。 它負責在所有TaskManager之間分配工作,以便在可能的情況下保留數據局部性。 為了動態平衡負載,任務在完成對舊輸入的處理后,會請求新的輸入拆分。 該請求是通過將RequestNextInputSplit發送到JobManager來實現的。 JobManager用NextInputSplit消息響應。 如果沒有更多輸入拆分,則消息中包含的輸入拆分為null。

任務被延遲部署到任務管理器。 這意味着消耗數據的任務僅在其生產者之一完成生產某些數據之后才部署。 生產者執行此操作后,就會將ScheduleOrUpdateConsumers消息發送到JobManager。 此消息表明,消費者現在可以讀取新生成的數據。 如果使用任務尚未運行,它將被部署到TaskManager。

JobClient

JobClient代表分布式系統的面向用戶的組件。 它用於與JobManager進行通信,因此它負責提交Flink作業,查詢已提交作業的狀態並接收當前正在運行的作業的狀態消息。

JobClient還是您通過消息與之通信的參與者。 存在與作業提交有關的兩條消息:SubmitJobDetached和SubmitJobWait。 第一條消息提交作業,並從接收任何狀態消息和最終作業結果中注銷。 如果您想以丟臉的方式將作業提交到Flink群集,則分離模式非常有用。

SubmitJobWait消息將作業提交到JobManager並注冊以接收該作業的狀態消息。 在內部,這是通過生成輔助角色來完成的,該輔助角色用作狀態消息的接收者。 作業終止后,由JobManager將帶有持續時間和累加器結果的JobResultSuccess發送給產生的助手角色。 收到此消息后,輔助角色將消息轉發給客戶端,該客戶端最初發出了SubmitJobWait消息,然后終止。

Asynchronous vs. Synchronous Messages

Flink盡可能嘗試使用異步消息並將響應作為將來處理。 期貨和少數現有的阻塞調用都有一個超時,在此之后該操作將被視為失敗。 這樣可以防止消息丟失或分布式組件崩潰時系統陷入僵局。 但是,如果您碰巧擁有非常大的群集或緩慢的網絡,則可能會錯誤地觸發超時。 因此,可以通過配置中的“ akka.ask.timeout”指定這些操作的超時時間。

演員可以與其他演員交談之前,必須為其檢索ActorRef。 此操作的查找也需要超時。 為了使Actor未啟動時系統快速故障,將查找超時設置為比常規超時更小的值。 如果遇到查找超時的情況,可以通過配置中的“ akka.lookup.timeout”來增加查找時間。

Akka的另一個特點是它設置了可以發送的最大郵件大小的限制。 原因是它保留了相同大小的序列化緩沖區,並且不想浪費內存。 如果由於消息超出最大大小而遇到傳輸錯誤,則可以通過配置中的“ akka.framesize”來增加幀大小。

Failure Detection

分布式系統中的故障檢測對其魯棒性至關重要。 在商品集群上運行時,總是會發生某些組件發生故障或無法再訪問的情況。 此類故障的原因是多態的,從硬件故障到網絡中斷都可能造成故障。 一個強大的分布式系統應該能夠檢測出故障的組件並從中恢復。

Flink通過使用Akka的DeathWatch機制來檢測故障組件。 即使沒有受到該演員的監督,甚至不在另一個演員系統中,DeathWatch也可以讓演員觀看其他演員。 一旦被觀看的演員死亡或無法聯系,終止消息就會發送給觀看的演員。 因此,在接收到這樣的消息時,系統可以針對它采取步驟。 在內部,DeathWatch被實現為心跳和故障檢測器,它基於心跳間隔,聽音暫停和故障閾值來估計演員何時可能死亡。 可以通過在配置中設置“ akka.watch.heartbeat.interval”值來控制心跳間隔。 可以通過“ akka.watch.heartbeat.pause”指定可接受的心跳暫停。 心跳暫停應為心跳間隔的倍數,否則丟失的心跳將直接觸發DeathWatch。 可以通過“ akka.watch.threshold”指定故障閾值,它可以有效地控制故障檢測器的靈敏度。 您可以在此處找到有關DeathWatch機制和故障檢測器的更多詳細信息。

在Flink中,JobManager監視所有已注冊的TaskManager,而TaskManager監視JobManager。 這樣,兩個組件都知道何時不再可訪問另一個組件。 JobManager的反應是將各個TaskManager標記為已死,以防止將來的任務部署到該TaskManager。 此外,它將使當前正在此任務管理器上運行的所有任務失敗,並在其他TaskManager上重新安排其執行時間。 如果TaskManager僅因暫時的連接丟失而被標記為死,那么一旦重新建立連接,它就可以在JobManager中簡單地重新注冊自己。

TaskManager還監視JobManager。 此監視允許TaskManager在檢測到JobManager失敗時通過使所有當前正在運行的任務失敗來進入清除狀態。 此外,如果觸發的死亡僅由網絡擁塞或連接丟失引起,TaskManager將嘗試重新連接到JobManager。

原文地址


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM