Flink基礎(三):DS簡介(3) 流處理基礎


1 數據流編程簡介

在我們深入研究流處理的基礎知識之前,讓我們來看看在數據流程編程的背景和使用的術語。

1.1 數據流圖

  顧名思義,數據流程序描述了數據如何在算子之間流動。數據流程序通常表示為有向圖,其中節點稱為算子用來表示計算,邊表示數據之間的依賴性。算子是數據流程序的基本功能單元。他們從輸入消耗數據,對它們執行計算,並生成數據輸出用於進一步處理。一個數據流圖必須至少有一個數據源和一個數據接收器。

 

 

   像圖2-1中的數據流圖被稱為邏輯流圖,因為它們表示了計算邏輯的高級視圖。為了執行一個數據流程序,Flink會將邏輯流圖轉換為物理數據流圖,詳細說明程序的執行方式。例如,如果我們使用分布式處理引擎,每個算子在不同的物理機器可能有幾個並行的任務運行。圖2-2顯示了圖2-1邏輯圖的物理數據流圖。而在邏輯數據流圖中節點表示算子,在物理數據流圖中,節點是任務。“Extract hashtags”和“Count”算子有兩個並行算子任務,每個算子任務對輸入數據的子集執行計算

1.2 數據並行和任務並行

  我們可以以不同方式利用數據流圖中的並行性。第一,我們可以對輸入數據進行分區,並在數據的子集上並行執行具有相同算子的任務並行。這種類型的並行性被稱為數據並行性。數據並行是有用的,因為它允許處理大量數據,並將計算分散到不同的計算節點上。第二,我們可以將不同的算子在相同或不同的數據上並行執行。這種並行性稱為任務並行性。使用任務並行性,我們可以更好地利用計算資源。

1.3 數據交換策略

  數據交換策略定義了在物理執行流圖中如何將數據分配給任務。數據交換策略可以由執行引擎自動選擇,具體取決於算子的語義或我們明確指定的語義。在這里,我們簡要回顧一些常見的數據交換策略,如圖2-3所示。

 

 

 

  • 前向策略將數據從一個任務發送到接收任務。如果兩個任務都位於同一台物理計算機上(這通常由任務調度器確保),這種交換策略可以避免網絡通信。
  • 廣播策略將所有數據發送到算子的所有的並行任務上面去。因為這種策略會復制數據和涉及網絡通信,所以代價相當昂貴。
  • 基於鍵控的策略通過Key值(鍵)對數據進行分區保證具有相同Key的數據將由同一任務處理。在圖2-2中,輸出“Extract hashtags”算子使用鍵來分區(hashtag),以便count算子的任務可以正確計算每個#標簽的出現次數。
  • 隨機策略統一將數據分配到算子的任務中去,以便均勻地將負載分配到不同的計算任務。

2 並行處理流數據

既然我們熟悉了數據流編程的基礎知識,現在是時候看看這些概念如何應用於並行的處理數據流了。但首先,讓我們定義術語數據流:數據流是一個可能無限的事件序列。

數據流中的事件可以表示監控數據,傳感器測量數據,信用卡交易數據,氣象站觀測數據,在線用戶交互數據,網絡搜索數據等。在本節中,我們將學習如何並行處理無限流,使用數據流編程范式。

2.1 延遲和吞吐量

  流處理程序不同與批處理程序。在評估性能時,要求也有所不同。對於批處理程序,我們通常關心一個作業的總的執行時間,或我們的處理引擎讀取輸入所需的時間,執行計算,並回寫結果。由於流處理程序是連續運行的,輸入可能是無界的,所以數據流處理中沒有總執行時間的概念。 相反,流處理程序必須盡可能快的提供輸入數據的計算結果。我們使用延遲和吞吐量來表征流處理的性能要求。

2.2 延遲

  延遲表示處理事件所需的時間。它是接收事件和看到在輸出中處理此事件的效果之間的時間間隔。要直觀的理解延遲,考慮去咖啡店買咖啡。當你進入咖啡店時,可能還有其他顧客在里面。因此,你排隊等候直到輪到你下訂單。收銀員收到你的付款並通知准備飲料的咖啡師。一旦你的咖啡准備好了,咖啡師會叫你的名字,你可以到櫃台拿你的咖啡。服務延遲是從你進入咖啡店的那一刻起,直到你喝上第一口咖啡之間的時間間隔。

  在數據流中,延遲是以時間為單位測量的,例如毫秒。根據應用程序,我們可能會關心平均延遲,最大延遲或百分位延遲。例如,平均延遲值為10ms意味着處理事件的平均時間在10毫秒內。或者,延遲值為95%,10ms表示95%的事件在10ms內處理完畢。平均值隱藏了處理延遲的真實分布,可能會讓人難以發現問題。如果咖啡師在准備卡布奇諾之前用完了牛奶,你必須等到他們從供應室帶來一些。雖然你可能會因為這么長時間的延遲而生氣,但大多數其他客戶仍然會感到高興。

  確保低延遲對於許多流應用程序來說至關重要,例如欺詐檢測,系統警報,網絡監控和提供具有嚴格服務水平協議的服務。低延遲是流處理的關鍵特性,它實現了我們所謂的實時應用程序。像Apache Flink這樣的現代流處理器可以提供低至幾毫秒的延遲。相比之下,傳統批處理程序延遲通常從幾分鍾到幾個小時不等。在批處理中,首先需要收集事件批次,然后才能處理它們。因此,延遲是受每個批次中最后一個事件的到達時間的限制。所以自然而然取決於批的大小。真正的流處理不會引入這樣的人為延遲,因此可以實現真正的低延遲。真的流模型,事件一進入系統就可以得到處理。延遲更密切地反映了在每個事件上必須進行的實際工作。

2.3 吞吐量

  吞吐量是衡量系統處理能力的指標,也就是處理速率。也就是說,吞吐量告訴我們每個時間單位系統可以處理多少事件。重溫咖啡店的例子,如果商店營業時間為早上7點至晚上7點。當天為600個客戶提供了服務,它的平均吞吐量將是每小時50個客戶。雖然我們希望延遲盡可能低,但我們通常也需要吞吐量盡可能高。

  吞吐量以每個時間單位系統所能處理的事件數量或操作數量來衡量。值得注意的是,事件處理速率取決於事件到達的速率,低吞吐量並不一定表示性能不佳。 在流式系統中,我們通常希望確保我們的系統可以處理最大的預期事件到達的速率。也就是說,我們主要的關注點在於確定的峰值吞吐量是多少,當系統處於最大負載時性能怎么樣。為了更好地理解峰值吞吐量的概念,讓我們考慮一個流處理 程序沒有收到任何輸入的數據,因此沒有消耗任何系統資源。當第一個事件進來時,它會盡可能以最小延遲立即處理。例如,如果你是第一個出現在咖啡店的顧客,在早上開門后,你將立即獲得服務。理想情況下,您希望此延遲保持不變 ,並且獨立於傳入事件的速率。但是,一旦我們達到使系統資源被完全使用的事件傳入速率,我們將不得不開始緩沖事件。在咖啡店里 ,午餐后會看到這種情況發生。許多人出現在同一時間,必須排隊等候。在此刻,咖啡店系統已達到其峰值吞吐量,進一步增加 事件傳入的速率只會導致更糟糕的延遲。如果系統繼續以可以處理的速率接收數據,緩沖區可能變為不可用,數據可能會丟失。這種情況是眾所周知的 作為背壓,有不同的策略來處理它。

 

2.4 延遲與吞吐量的對比

  此時,應該清楚延遲和吞吐量不是獨立指標。如果事件需要在處理流水線中待上很長時間,我們不能輕易確保高吞吐量。同樣,如果系統容量很小,事件將被緩沖,而且必須等待才能得到處理。

  讓我們重溫一下咖啡店的例子來闡明一下延遲和吞吐量如何相互影響。首先,應該清楚存在沒有負載時的最佳延遲。也就是說,如果你是咖啡店的唯一客戶,會很快得到咖啡。然而,在繁忙時期,客戶將不得不排隊等待,並且會有延遲增加。另一個影響延遲和吞吐量的因素是處理事件所花費的時間或為每個客戶提供服務所花費的時間。想象一下,期間聖誕節假期,咖啡師不得不為每杯咖啡畫聖誕老人。這意味着准備一杯咖啡需要的時間會增加,導致每個人花費 更多的時間在等待咖啡師畫聖誕老人,從而降低整體吞吐量。

  那么,你可以同時獲得低延遲和高吞吐量嗎?或者這是一個無望的努力?我們可以降低得到咖啡的延遲 ,方法是:聘請一位更熟練的咖啡師來准備咖啡。在高負載時,這種變化也會增加吞吐量,因為會在相同的時間內為更多的客戶提供服務。 實現相同結果的另一種方法是雇用第二個咖啡師來利用並行性。這里的主要想法是降低延遲來增加吞吐量。當然,如果系統可以更快的執行操作,它可以在相同的時間內執行更多操作。 事實上,在流中利用並行性時也會發生這種情況。通過並行處理多個流,在同時處理更多事件的同時降低延遲。

3 數據流上的操作

  流處理引擎通常提供一組內置操作:攝取(ingest),轉換(transform)和輸出流(output)。這些操作可以 結合到數據流圖中來實現邏輯流處理程序。在本節中,我們描述最常見的流處理操作。

  操作可以是無狀態的或有狀態的。無狀態操作不保持任何內部狀態。也就是說,事件的處理不依賴於過去看到的任何事件,也沒有保留歷史。 無狀態操作很容易並行化,因為事件可以彼此獨立地處理,也獨立於事件到達的順序(和事件到達順序沒有關系)。 而且,在失敗的情況下,無狀態操作可以是簡單的重新啟動並從中斷處繼續處理。相反, 有狀態操作可能會維護之前收到的事件的信息。此狀態可以通過傳入事件更新,也可以用於未來事件的處理邏輯。有狀態的流 處理應用程序更難以並行化和以容錯的方式來運行,因為狀態需要有效的進行分區和在發生故障的情況下可靠地恢復。

3.1 數據攝入和數據吞吐量

  數據攝取和數據出口操作允許流處理程序與外部系統通信。數據攝取是操作從外部源獲取原始數據並將其轉換為其他格式(ETL)。實現數據提取邏輯的運算符被稱為數據源。數據源可以從TCP Socket,文件,Kafka Topic或傳感器數據接口中提取數據。數據出口是以適合消費的形式產出到外部系統。執行數據出口的運算符稱為數據接收器,包括文件,數據庫,消息隊列和監控接口。

3.2 轉換算子

  轉換算子是單遍處理算子,碰到一個事件處理一個事件。這些操作在使用后會消費一個事件,然后對事件數據做一些轉換,產生一個新的輸出流。轉換邏輯可以集成在 操作符中或由UDF函數提供,如圖所示圖2-4。程序員編寫實現自定義計算邏輯。

  操作符可以接受多個輸入流並產生多個輸出流。他們還可以通過修改數據流圖的結構要么將流分成多個流,要么將流合並為一條流。

3.3 滾動聚合

  滾動聚合是一種聚合,例如sum,minimum和maximum,為每個輸入事件不斷更新。 聚合操作是有狀態的,並將當前狀態與傳入事件一起計算以產生更新的聚合值。請注意能夠有效地將當前狀態與事件相結合 產生單個值,聚合函數必須是關聯的和可交換的。否則,操作符必須存儲完整的流數據歷史。圖2-5顯示了最小滾動 聚合。操作符保持當前的最小值和相應地為每個傳入的事件來更新最小值。

3.4 窗口操作符

  轉換和滾動聚合一次處理一個事件產生輸出事件並可能更新狀態。但是,有些操作必須收集並緩沖數據以計算其結果。 例如,考慮不同流之間的連接或整體聚合這樣的操作,例如中值函數。為了在無界流上高效運行這些操作符,我們需要限制 這些操作維護的數據量。在本節中,我們將討論窗口操作,提供此服務。

  窗口還可以在語義上實現關於流的比較復雜的查詢。我們已經看到了滾動聚合的方式,以聚合值編碼整個流的歷史數據來為每個事件提供低延遲的結果。 但如果我們只對最近的數據感興趣的話會怎樣?考慮給司機提供實時交通信息的應用程序。這個程序可以使他們避免擁擠的路線。在這種場景下,你想知道某個位置在最近幾分鍾內是否有事故發生。 另一方面,了解所有發生過的事故在這個應用場景下並沒有什么卵用。更重要的是,通過將流歷史縮減為單一聚合值,我們將丟失這段時間內數據的變化。例如,我們可能想知道每5分鍾有多少車輛穿過 某個路口。

  窗口操作不斷從無限事件流中創建有限的事件集,好讓我們執行有限集的計算。通常會基於數據屬性或基於時間的窗口來分配事件。 要正確定義窗口運算符語義,我們需要確定如何給窗口分配事件以及對窗口中的元素進行求值的頻率是什么樣的。 窗口的行為由一組策略定義。窗口策略決定何時創建新的窗口以及要分配的事件屬於哪個窗口,以及何時對窗口中的元素進行求值。 而窗口的求值基於觸發條件。一旦觸發條件得到滿足,窗口的內容將會被發送到求值函數,求值函數會將計算邏輯應用於窗口中的元素。 求值函數可以是sum或minimal或自定義的聚合函數。 求值策略可以根據時間或者數據屬性計算(例如,在過去五秒內收到的事件或者最近的一百個事件等等)。 接下來,我們描述常見窗口類型的語義。

  • 滾動窗口是將事件分配到固定大小的不重疊的窗口中。當通過窗口的結尾時,全部事件被發送到求值函數進行處理。基於計數的滾動窗口定義了在觸發求值之前需要收集多少事件。圖2-6顯示了一個基於計數的翻滾窗口,每四個元素一個窗口。基於時間的滾動窗口定義一個時間間隔,包含在此時間間隔內的事件。圖2-7顯示了基於時間的滾動窗口,將事件收集到窗口中每10分鍾觸發一次計算。

  • 滑動窗口將事件分配到固定大小的重疊的窗口中去。因此,事件可能屬於多個桶。我們通過提供窗口的長度和滑動距離來定義滑動窗口。滑動距離定義了創建新窗口的間隔。基於滑動計數的窗口,圖2-8的長度為四個事件,三個為滑動距離。

  • 會話窗口在常見的真實場景中很有用,一些場景既不能使用滾動窗口也不能使用滑動窗口。考慮一個分析在線用戶行為的應用程序。在應用程序里,我們想把源自同一時期的用戶活動或會話事件分組在一起。會話由一系列相鄰時間發生的事件組成,接下來有一段時間沒有活動。例如,用戶在App上瀏覽一系列的新聞,然后關掉App,那么瀏覽新聞這段時間的瀏覽事件就是一個會話。會話窗口事先沒有定義窗口的長度,而是取決於數據的實際情況,滾動窗口和滑動窗口無法應用於這個場景。相反,我們需要將同一會話中的事件分配到同一個窗口中去,而不同的會話可能窗口長度不一樣。會話窗口會定義一個間隙值來區分不同的會話。間隙值的意思是:用戶一段時間內不活動,就認為用戶的會話結束了。圖2-9顯示了一個會話窗口。

到目前為止,所有窗口類型都是在整條流上去做窗口操作。但實際上你可能想要將一條流分流成多個邏輯流並定義並行窗口。 例如,如果我們正在接收來自不同傳感器的測量結果,那么可能想要在做窗口計算之前按傳感器ID對流進行分流操作。 在並行窗口中,每條流都獨立於其他流,然后應用了窗口邏輯。圖2-10顯示了一個基於計數的長度為2的並行滾動窗口,根據事件顏色分流。

  在流處理中,窗口操作與兩個主要概念密切相關:時間語義和狀態管理。時間也許是流處理最重要的方面。即使低延遲是流處理的一個有吸引力的特性,它的真正價值不僅僅是快速分析。真實世界的系統,網絡和通信渠道遠非完美,流數據經常被推遲或無序(亂序)到達。理解如何在這種條件下提供准確和確定的結果是至關重要的。 更重要的是,流處理程序可以按原樣處理事件制作的也應該能夠處理相同的歷史事件方式,從而實現離線分析甚至時間旅行分析。 當然,前提是我們的系統可以保存狀態,因為可能有故障發生。到目前為止,我們看到的所有窗口類型在產生結果前都需要保存之前的數據。實際上,如果我們想計算任何指標,即使是簡單的計數,我們也需要保存狀態。考慮到流處理程序可能會運行幾天,幾個月甚至幾年,我們需要確保狀態可以在發生故障的情況下可靠地恢復。 並且即使程序崩潰,我們的系統也能保證計算出准確的結果。本章,我們將在流處理應用可能發生故障的語境下,深入探討時間和狀態的概念。

4 時間語義

在本節中,我們將介紹時間語義,並描述流中不同的時間概念。我們將討論流處理器在亂序事件流的情況下如何提供准確的計算結果,以及我們如何處理歷史事件流,如何在流中進行時間旅行。

4.1 在流處理中一分鍾代表什么?

  在處理可能是無限的事件流(包含了連續到達的事件),時間成為流處理程序的核心方面。假設我們想要連續的計算結果,可能每分鍾就要計算一次。在我們的流處理程序上下文中,一分鍾的意思是什么?

  考慮一個程序需要分析一款移動端的在線游戲的用戶所產生的事件流。游戲中的用戶分了組,而應用程序將收集每個小組的活動數據,基於小組中的成員多快達到了游戲設定的目標,然后在游戲中提供獎勵。例如額外的生命和用戶升級。例如,如果一個小組中的所有用戶在一分鍾之內都彈出了500個泡泡,他們將升一級。Alice是一個勤奮的玩家,她在每天早晨的通勤時間玩游戲。問題在於Alice住在柏林,並且乘地鐵去上班。而柏林的地鐵手機信號很差。我們設想一個這樣的場景,Alice當她的手機連上網時,開始彈泡泡,然后游戲會將數據發送到我們編寫的應用程序中,這時地鐵突然進入了隧道,她的手機也斷網了。Alice還在玩這個游戲,而產生的事件將會緩存在手機中。當地鐵離開隧道,Alice的手機又在線了,而手機中緩存的游戲事件將發送到應用程序。我們的應用程序應該如何處理這些數據?在這個場景中一分鍾的意思是什么?這個一分鍾應該包含Alice離線的那段時間嗎?下圖展示了這個問題。

 

  在線手游是一個簡單的場景,展示了應用程序的運算應該取決於事件實際發生的時間,而不是應用程序收到事件的時間。如果我們按照應用程序收到事件的時間來進行處理的話,最糟糕的后果就是,Alice和她的朋友們再也不玩這個游戲了。但是還有很多時間語義非常關鍵的應用程序,我們需要保證時間語義的正確性。如果我們只考慮我們在一分鍾之內收到了多少數據,我們的結果會變化,因為結果取決於網絡連接的速度或處理的速度。相反,定義一分鍾之內的事件數量,這個一分鍾應該是數據本身的時間。

在Alice的這個例子中,流處理程序可能會碰到兩個不同的時間概念:處理時間和事件時間。我們將在接下來的部分,討論這兩個概念。

4.2 處理時間

處理時間是處理流的應用程序的機器的本地時鍾的時間(牆上時鍾)。處理時間的窗口包含了一個時間段內來到機器的所有事件。這個時間段指的是機器的牆上時鍾。如下圖所示,在Alice的這個例子中,處理時間窗口在Alice的手機離線的情況下,時間將會繼續行走。但這個處理時間窗口將不會收集Alice的手機離線時產生的事件。

 

4.3 事件時間

  事件時間是流中的事件實際發生的時間。事件時間基於流中的事件所包含的時間戳。通常情況下,在事件進入流處理程序前,事件數據就已經包含了時間戳。下圖展示了事件時間窗口將會正確的將事件分發到窗口中去。可以如實反應事情是怎么發生的。即使事件可能存在延遲。

  事件時間使得計算結果的過程不需要依賴處理數據的速度。基於事件時間的操作是可以預測的,而計算結果也是確定的。無論流處理程序處理流數據的速度快或是慢,無論事件到達流處理程序的速度快或是慢,事件時間窗口的計算結果都是一樣的。

  可以處理遲到的事件只是我們使用事件時間所克服的一個挑戰而已。普遍存在的事件亂序問題可以使用事件時間得到解決。考慮和Alice玩同樣游戲的Bob,他恰好和Alice在同一趟地鐵上。Alice和Bob雖然玩的游戲一樣,但他們的手機信號是不同的運營商提供的。當Alice的手機沒信號時,Bob的手機依然有信號,游戲數據可以正常發送出去。

  如果使用事件時間,即使碰到了事件亂序到達的情況,我們也可以保證結果的正確性。還有,當我們在處理可以重播的流數據時,由於時間戳的確定性,我們可以快進過去。也就是說,我們可以重播一條流,然后分析歷史數據,就好像流中的事件是實時發生一樣。另外,我們可以快進歷史數據來使我們的應用程序追上現在的事件,然后應用程序仍然是一個實時處理程序,而且業務邏輯不需要改變。

4.4 水位線

  在我們對事件時間窗口的討論中,我們忽略了一個很重要的方面:我們應該怎樣去決定何時觸發事件時間窗口的計算?也就是說,在我們可以確定一個時間點之前的所有事件都已經到達之前,我們需要等待多久?我們如何知道事件是遲到的?在分布式系統無法准確預測行為的現實條件下,以及外部組件所引發的事件的延遲,以上問題並沒有准確的答案。在本小節中,我們將會看到如何使用水位線來設置事件時間窗口的行為。

  水位線是全局進度的度量標准。系統可以確信在一個時間點之后,不會有早於這個時間點發生的事件到來了。本質上,水位線提供了一個邏輯時鍾,這個邏輯時鍾告訴系統當前的事件時間。當一個運算符接收到含有時間T的水位線時,這個運算符會認為早於時間T的發生的事件已經全部都到達了。對於事件時間窗口和亂序事件的處理,水位線非常重要。運算符一旦接收到水位線,運算符會認為一段時間內發生的所有事件都已經觀察到,可以觸發針對這段時間內所有事件的計算了。

  水位線提供了一種結果可信度和延時之間的妥協。激進的水位線設置可以保證低延遲,但結果的准確性不夠。在這種情況下,遲到的事件有可能晚於水位線到達,我們需要編寫一些代碼來處理遲到事件。另一方面,如果水位線設置的過於寬松,計算的結果准確性會很高,但可能會增加流處理程序不必要的延時。

  在很多真實世界的場景里面,系統無法獲得足夠的知識來完美的確定水位線。在手游這個場景中,我們無法得知一個用戶離線時間會有多長,他們可能正在穿越一條隧道,可能正在乘飛機,可能永遠不會再玩兒了。水位線無論是用戶自定義的或者是自動生成的,在一個分布式系統中追蹤全局的時間進度都不是很容易。所以僅僅依靠水位線可能並不是一個很好的主意。流處理系統還需要提供一些機制來處理遲到的元素(在水位線之后到達的事件)。根據應用場景,我們可能需要把遲到事件丟棄掉,或者寫到日志里,或者使用遲到事件來更新之前已經計算好的結果。

4.5 處理時間和事件時間

  大家可能會有疑問,既然事件時間已經可以解決我們的所有問題,為什么我們還要對比這兩個時間概念?真相是,處理時間在很多情況下依然很有用。處理時間窗口將會帶來理論上最低的延遲。因為我們不需要考慮遲到事件以及亂序事件,所以一個窗口只需要簡單的緩存窗口內的數據即可,一旦機器時間超過指定的處理時間窗口的結束時間,就會觸發窗口的計算。所以對於一些處理速度比結果准確性更重要的流處理程序,處理時間就派上用場了。另一個應用場景是,當我們需要在真實的時間場景下,周期性的報告結果時,同時不考慮結果的准確性。一個例子就是一個實時監控的儀表盤,負責顯示當事件到達時立即聚合的結果。最后,處理時間窗口可以提供流本身數據的忠實表達,對於一些案例可能是很必要的特性。例如我們可能對觀察流和對每分鍾事件的計數(檢測可能存在的停電狀況)很感興趣。簡單的說,處理時間提供了低延遲,同時結果也取決於處理速度,並且也不能保證確定性。另一方面,事件時間保證了結果的確定性,同時還可以使我們能夠處理遲到的或者亂序的事件流。

5 狀態和持久化模型

  我們現在轉向另一個對於流處理程序非常重要的話題:狀態。在數據處理中,狀態是普遍存在的。任何稍微復雜一點的計算,都涉及到狀態。為了產生計算結果,一個函數在一段時間內的一定數量的事件上來累加狀態(例如,聚合計算或者模式匹配)。有狀態的運算符使用輸入的事件以及內部保存的狀態來計算得到輸出。例如,一個滾動聚合運算符需要輸出這個運算符所觀察到的所有事件的累加和。這個運算符將會在內部保存當前觀察到的所有事件的累加和,同時每輸入一個事件就更新一次累加和的計算結果。相似的,當一個運算符檢測到一個“高溫”事件緊接着十分鍾以內檢測到一個“煙霧”事件時,將會報警。直到運算符觀察到一個“煙霧”事件或者十分鍾的時間段已經過去,這個運算符需要在內部狀態中一直保存着“高溫”事件。

  當我們考慮一下使用批處理系統來分析一個無界數據集時,會發現狀態的重要性顯而易見。在現代流處理器興起之前,處理無界數據集的一個通常做法是將輸入的事件攢成微批,然后交由批處理器來處理。當一個任務結束時,計算結果將被持久化,而所有的運算符狀態就丟失了。一旦一個任務在計算下一個微批次的數據時,這個任務是無法訪問上一個任務的狀態的(都丟掉了)。這個問題通常使用將狀態代理到外部系統(例如數據庫)的方法來解決。相反,在一個連續不間斷運行的流處理任務中,事件的狀態是一直存在的,我們可以將狀態暴露出來作為編程模型中的一等公民。當然,我們的確可以使用外部系統來管理流的狀態,即使這個解決方案會帶來額外的延遲。

  由於流處理運算符默認處理的是無界數據流。所以我們必須要注意不要讓內部狀態無限的增長。為了限制狀態的大小,運算符通常情況下會保存一些之前所觀察到的事件流的總結或者概要。這個總結可能是一個計數值,一個累加和,或者事件流的采樣,窗口的緩存操作,或者是一個自定義的數據結構,這個數據結構用來保存數據流中感興趣的一些特性。

我們可以想象的到,支持有狀態的運算符可能會碰到一些實現上的挑戰:

狀態管理

  系統需要高效的管理狀態,並保證針對狀態的並發更新,不會產生競爭條件(race condition)。

狀態分區

  並行會帶來復雜性。因為計算結果同時取決於已經保存的狀態和輸入的事件流。幸運的是,大多數情況下,我們可以使用Key來對狀態進行分區,然后獨立的管理每一個分區。例如,當我們處理一組傳感器的測量事件流時,我們可以使用分區的運算符狀態來針對不同的傳感器獨立的保存狀態。

狀態恢復

  第三個挑戰是有狀態的運算符如何保證狀態可以恢復,即使出現任務失敗的情況,計算也是正確的。

下一節,我們將討論任務失敗和計算結果的保證。

5.1 任務失敗

  流任務中的運算符狀態是很寶貴的,也需要抵御任務失敗帶來的問題。如果在任務失敗的情況下,狀態丟失的話,在任務恢復以后計算的結果將是不正確的。流任務會連續不斷的運行很長時間,而狀態可能已經收集了幾天甚至幾個月。在失敗的情況下,重新處理所有的輸入並重新生成一個丟失的狀態,將會很浪費時間,開銷也很大。

  在本章開始時,我們看到如何將流的編程建模成數據流模型。在執行之前,流程序將會被翻譯成物理層數據流圖,物理層數據流圖由連接的並行任務組成,而一個並行任務運行一些運算符邏輯,消費輸入流數據,並為其他任務產生輸出流數據。真實場景下,可能有數百個這樣的任務並行運行在很多的物理機器上。在長時間的運行中,流任務中的任意一個任務在任意時間點都有可能失敗。我們如何保證任務的失敗能被正確的處理,以使任務能繼續的運行下去呢?事實上,我們可能希望我們的流處理器不僅能在任務失敗的情況下繼續處理數據,還能保證計算結果的正確性以及運算符狀態的安全。我們在本小節來討論這些問題。

什么是任務失敗?

  對於流中的每一個事件,一個處理任務分為以下步驟:

  (1)接收事件,並將事件存儲在本地的緩存中;

  (2)可能會更新內部狀態;

  (3)產生輸出記錄。這些步驟都能失敗,而系統必須對於在失敗的場景下如何處理有清晰的定義。如果任務在第一步就失敗了,事件會丟失嗎?如果當更新內部狀態的時候任務失敗,那么內部狀態會在任務恢復以后更新嗎?在以上這些場景中,輸出是確定性的嗎?

  在批處理場景下,所有的問題都不是問題。因為我們可以很方便的重新計算。所以不會有事件丟失,狀態也可以得到完全恢復。在流的世界里,處理失敗不是一個小問題。流系統在失敗的情況下需要保證結果的准確性。接下來,我們需要看一下現代流處理系統所提供的一些保障,以及實現這些保障的機制。

結果的保證

  當我們討論保證計算的結果時,我們的意思是流處理器的內部狀態需要保證一致性。也就是說我們關心的是應用程序的代碼在故障恢復以后看到的狀態值是什么。要注意保證應用程序狀態的一致性並不是保證應用程序的輸出結果的一致性。一旦輸出結果被持久化,結果的准確性就很難保證了。除非持久化系統支持事務。

AT-MOST-ONCE

  當任務故障時,最簡單的做法是什么都不干,既不恢復丟失的狀態,也不重播丟失的事件。At-most-once語義的含義是最多處理一次事件。換句話說,事件可以被丟棄掉,也沒有任何操作來保證結果的准確性。這種類型的保證也叫“沒有保證”,因為一個丟棄掉所有事件的系統其實也提供了這樣的保障。沒有保障聽起來是一個糟糕的主意,但如果我們能接受近似的結果,並且希望盡可能低的延遲,那么這樣也挺好。

AT-LEAST-ONCE

  在大多數的真實應用場景,我們希望不丟失事件。這種類型的保障成為at-least-once,意思是所有的事件都得到了處理,而且一些事件還可能被處理多次。如果結果的正確性僅僅依賴於數據的完整性,那么重復處理是可以接受的。例如,判斷一個事件是否在流中出現過,at-least-once這樣的保證完全可以正確的實現。在最壞的情況下,我們多次遇到了這個事件。而如果我們要對一個特定的事件進行計數,計算結果就可能是錯誤的了。

  為了保證在at-least-once語義的保證下,計算結果也能正確。我們還需要另一套系統來從數據源或者緩存中重新播放數據。持久化的事件日志系統將會把所有的事件寫入到持久化存儲中。所以如果任務發生故障,這些數據可以重新播放。還有一種方法可以獲得同等的效果,就是使用結果承認機制。這種方法將會把每一條數據都保存在緩存中,直到數據的處理等到所有的任務的承認。一旦得到所有任務的承認,數據將被丟棄。

EXACTLY-ONCE

  恰好處理一次是最嚴格的保證,也是最難實現的。恰好處理一次語義不僅僅意味着沒有事件丟失,還意味着針對每一個數據,內部狀態僅僅更新一次。本質上,恰好處理一次語義意味着我們的應用程序可以提供准確的結果,就好像從未發生過故障。

  提供恰好處理一次語義的保證必須有至少處理一次語義的保證才行,同時還需要數據重放機制。另外,流處理器還需要保證內部狀態的一致性。也就是說,在故障恢復以后,流處理器應該知道一個事件有沒有在狀態中更新。事務更新是達到這個目標的一種方法,但可能引入很大的性能問題。Flink使用了一種輕量級快照機制來保證恰好處理一次語義。

端到端恰好處理一次

  目前我們看到的一致性保證都是由流處理器實現的,也就是說都是在Flink流處理器內部保證的。而在真實世界中,流處理應用除了流處理器以外還包含了數據源(例如Kafka)和持久化系統。端到端的一致性保證意味着結果的正確性貫穿了整個流處理應用的始終。每一個組件都保證了它自己的一致性。而整個端到端的一致性級別取決於所有組件中一致性最弱的組件。要注意的是,我們可以通過弱一致性來實現更強的一致性語義。例如,當任務的操作具有冪等性時,比如流的最大值或者最小值的計算。在這種場景下,我們可以通過最少處理一次這樣的一致性來實現恰好處理一次這樣的最高級別的一致性。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM