上一篇聊了聊批處理的缺點,對於無界數據來說,流處理會是更好的選擇,“流”指的是隨着時間的推移逐步增加的數據。消息隊列可以將這些流組織起來,快速的在應用程序中給予反饋。但是消息隊列與傳統的數據庫之間又存在着“剪不斷,理還亂”的“糾葛”,最后我們將探討通過消息隊列之中與時序有關的一些問題。
文件是批處理作業的輸入和輸出,而在流處理之中,作業的輸入輸出等價物是什么呢?
在流處理之中,當輸入是文件時,第一個處理步驟通常是將其解析為一連串的記錄。在流處理之中,記錄通常被稱為事件,每個事件都是一個小的、獨立的、不可變的對象,通常每個事件包含一個時間戳,表明事件產生的時間。 在流處理之中,事件由生產者產生,然后可能由多個對應消費者,相關的事件通常被分組到同一個主題之中。
可以由數據庫來串聯生產者與消費者:生產者可以將事件寫入數據庫,之后每一個消費者定期輪詢數據庫檢查新出現的事件。但是數據庫是不適合這種頻繁輪詢的操作的,因為輪詢的次數越多,返回新事件的百分比越低,由此產生額外的開銷也就越高。 (其實可以通過觸發器的方式實現,但是數據庫觸發器也是基於數據庫內部的關聯的表進行操作的),所以引入了消息系統來處理流處理的需求。
1.消息系統
消息系統的運行邏輯很簡單:由生產者發送包含事件的消息,然后將消息推送給消費者,可以由多個生產者節點發送消息到同一個主題,並允許多個消費節點在一個主題中接收消息。 但是消息系統會有這樣幾個問題:
-
- 如果生產者發送消息的速度比消費者處理的速度快,系統會怎么樣處理呢 ?
- 刪除消息
- 在隊列中緩存消息
- 負反饋(也稱為流量控制,阻止生產者發送更多消息)
- 如果生產者發送消息的速度比消費者處理的速度快,系統會怎么樣處理呢 ?
-
- 如果節點崩潰或暫時離線,會出現消息丟失嗎?消息系統與數據庫相似,需要實現消息持久化需要一些進行磁盤讀寫或消息復制,這顯然是有代價的。如果可以容忍消息丟失,那么可以在同一硬件上獲得更高的吞吐量和更低的延遲。
消息的傳遞機制
許多消息系統使用生產者和消費者之間的直接網絡通信,而無需通過中間節點,如ZeroMQ 采取了TCP/IP組播的形式。所以如果消費者在網絡上公開服務,生產者可以直接通過HTTP或RPC請求將消息推送給消費者。雖然直接消息傳遞的系統在通常情況下在協議檢測和消息重傳的機制下工作的很好,但是應用程序通常需要能夠容忍消息丟失的情況,因為有一個問題很明顯生產者和消費者不一定時刻在線。 而如果消費者離線,它可能錯過消息。有些協議允許生產者重試失敗的消息,但一旦生產者崩潰,這種方法可能失效,因為重試的消息的緩沖區會丟失。
而另一種廣泛使用方案是通過消息隊列來發送消息,它作為與生產者和消費者的中間連接而存在,生產者將消息寫入消息隊列,而消費者從消息隊列讀取需要接收的消息。 通過消息隊列傳輸的數據,系統容忍消費者和生產者的在線問題,消息持久性選擇被交給了消息隊列。這時我們可以更加靈活的處理消息,有些消息可以僅僅保存在內存中,而某些消息將寫入磁盤,以便在消息隊列崩潰時不會丟失這些消息。 面對處理速度緩慢的消費者,消息隊列通常允許無界的排隊規則,而不是丟棄消息或負反饋調整,這些機制都成為可以定制的選項。 但是消息隊列的消息傳遞是異步的:當生產者發送消息時,它通常只等待消息隊列的確認,而不會等到消費者處理消息。
與數據庫的區別與聯系
消息系統在許多性質上與數據庫非常相似,但是依然存在一些重要的差異:
-
數據庫會持久化的保存數據,直到數據被顯式刪除,而大多數消息系統將消息成功地傳遞給消費者時自動刪除它,所以消息系統不適合作為長期存儲。
-
數據庫通常通過索引來分類檢索數據,而消息系統通常通過主題配置的模式來分類檢索數據的。
-
數據庫的讀寫操作都是主動的,而消息系統不支持隨機查詢,當數據發生變化時,它會通知消費者。
消息的分發與確認
當多個消費者讀取消息時,消息系統存在兩種分發模型:
- 負載均衡
每個消息傳遞給所有消費者中的一個,由所有消費者共享處理主題中的消息的工作。消息隊列可以任意的向消費者分配消息,來實現負載均衡。
- 消息廣播
每條消息都傳遞給所有的消費者。消息廣播使所有消費者收到同樣的消息,而不影響彼此流,相當於有幾個不同的批處理作業讀取相同的輸入文件。

這兩種模式可以進行合並:例如,兩個獨立的消費者組可以各自訂閱一個主題,使得每個組集體接收所有消息,但在每個組中,只有一個節點接收每個消息。
消費者可能在任意時刻崩潰,所以向消費者傳遞的消息未必會被處理或者只是在崩潰前部分處理它。 為了保證消息不丟失,消息代理使用確認機制:消費者需要明確反饋給消息隊列,對應的消息得到了處理,消息隊列會在隊列之中移除對應的消息。 如果消費者的連接關閉或超時,而消息隊列沒有收到確認,則它假定消息沒有被處理,因此它將消息再次發送給另一個消費者。(注意,可能會出現消息完全被處理的情況,但是確認在網絡中丟失了,再次處理消息時需要確保消息的處理是冪等的。)所以如下圖所示,這種情況會導致消息的交付順序與生產者的發送的順序不一致:

通常來說如果消息是完全獨立的,那么消息的重新排序不會產生問題,但是如果消息之間有因果依賴關系,這回導致因果的不一致性,為了避免這個問題,可以為每個消費者使用單獨的隊列,但是這樣就失去了負載均衡的優勢。
日志與消息系統
對於有持久化需求的消息隊列,則考慮通過日志來實現持久化存儲,來滿足消息隊列低延遲的要求。在前文之中我們討論過日志的模式,同樣相同的日志模型可以用來實現消息隊列的持久化:生產者將消息追加到日志的末尾,而消費者通過依次讀取日志來接收消息。如下圖所示:為了比單個磁盤所能提供更高的吞吐量,可以對日志進行分區操作。在不同的代理節點上托管不同的分區,使每個分區保存獨立的日志:

在每個分區之中,每個消息都會有一個單調遞增的序列號,這樣能夠保證分區之中所有的消息是完全有序的,而不同分區之間的消息則沒有順序保證。通過這種方式可以很容易地分辨出哪些消息已被處理,比當前偏移量小的消息已經被處理,而后面的消息還沒有被處理。因此,消息隊列不需要追蹤每一個消息,它只需要定期記錄消費者偏移。這樣有助於提高基於日志系統的吞吐量。而一旦消費者節點失效,則消費者組中的另一個節點被分配到日志分區,並開始在最后記錄的偏移量上消費消息。 但如果之前的消息處理了偏移量之后的消息,但沒有記錄新的偏移量,則這些消息會被二次處理。
如果消費者無法跟上生產者發送消息的速率,則日志記錄消息可以作為一種緩沖機制 。 當一個消費者所需要的消息比比日志上保留的信息要老,它將丟失過舊消息。所以需要監視消費者的消費速率,如果它顯著落后,則發出警報。由於基於日志的磁盤緩沖區很大,有足夠的時間讓管理員介入。即使消費者落后太多,開始出現丟失消息的情況,也只有單個消費者受到影響,它不會破壞其他消費者的運行。 前文提到的消息確認是一種破壞性的操作,因為它會導致消息被消息隊列刪除。而在基於日志的消息隊列中,消息的讀取時只讀的操作,不會改變日志。這使得基於日志的消息隊列更像是前文提及的批處理過程。
2.與數據庫共同工作
上文已經提到過,沒有一個系統能夠滿足所有的數據存儲、查詢和處理需求。在實踐中,應用需要結合不同的技術來滿足要求,所以本節我們來看看消息隊列與數據庫是怎么樣並肩作戰的。
變化數據捕獲(CDC) 是常常被使用到的技術,通過觀察所有寫入數據庫的數據變化並將它們轉換成可復制到其他系統數據的過程。如下圖所示,通過捕獲到數據庫中的更改,並繼續對搜索索引等應用更改,通過以相同的順序應用更改日志,搜索索引中的數據與數據庫中的數據相匹配。

變化數據捕獲的實現
變化數據捕獲是一種機制,用於確保對記錄系統的所有更改也反映在派生數據系統中,以便派生系統具有准確的數據副本。 從本質上講,更改數據捕獲使一個數據庫成為Leader,並將其他數據系統變成Follower。基於日志的消息隊列很適合從源數據庫接受消息的變化,並且保留的消息的順序。 數據庫的觸發器同樣可用於實現變化數據捕獲,通過觀察數據表的所有變化並將變化添加到記錄表之中,但是觸發器會帶顯著的性能開銷。變化數據捕獲通常是異步的:記錄數據庫系統在提交之后不會等待更改應用於消費者。
快照與日志壓縮
如果擁有對數據庫所做的所有更改的日志,那么可以通過日志來重建數據庫的整個狀態。但是將所有更改保存在內存中,會耗費大量的磁盤空間,並且載入並應用日志將耗費太長的時間,因此需要截斷日志並配合快照來使用。構建一個新的全文索引需要整個數據庫的完整副本,這里可以通過快照開始,並且載入快照后生成的日志便可以將索引恢復到最新的狀態。所以數據庫快照必須與日志中的偏移量相對應,以便確定在處理完快照后,在哪一點開始應用日志更改。
因為只能保留有限的日志記錄,所以每次需要添加新的派生數據系統時,都需要經歷快照的過程。這增加了系統的復雜性,而日志壓縮提供了一個很好的替代方案,日志壓縮的原理很簡單:存儲引擎周期性地查找具有相同Key的日志記錄,丟棄重復的記錄,並且只保存每個Key的最新值。 日志的壓縮和合並過程在后台運行,如果需要重建派生數據系統(如:搜索索引)時,可以從壓縮日志中啟動一個新的用戶,並依次掃描日志中的所有消息,就可以獲取數據庫內容的完整副本,而不必通過額外的快照。
3.流處理的時間依賴
流處理與數據庫相比最核心的差別是:查詢和數據之間的關系是相反的。通常,數據庫會持久地存儲數據,而查詢是一個臨時的操作。而流處理反轉兩者的角色:查詢是長期存儲的,輸入流的事件不斷地流過,並尋找查詢模式匹配的數據。所以,二者的應用場景也差距很大,流處理擅長監控變化的數據並且給予反饋。一旦涉及到變化,則是一個時間敏感問題,數據是隨着時間的推移而變化的,流處理通常需要處理時間,特別是用於分析的數據變化時,需要使用時間窗口。例如 “過去五分鍾的平均時間”。許多流處理框架使用了本地系統時鍾來確定時間窗口。如果事件的發生和事件的處理之間的延遲很小,這個模型就十分簡單易行。然而,前文我們提到了,事件很有可能會產生延遲,事件的處理可能明顯晚於事件的發生。
事件時間與處理時間
有許多原因會導致處理的延遲如:排隊、網絡故障,消息隊列處理緩慢,代碼的bug等。消息延遲會導致事件的不可預知排序。例如,假設用戶首先創建一個Web請求(由Web服務器A處理),然后再進行第二個請求(由服務器B處理)。a和b發出描述它們所處理請求的事件,但b事件在事件發生前到達消息代理。現在流處理器將首先看到b事件,然后才是a事件,盡管它們實際上是以相反的順序發生的。
事件發生的時間和事件的處理時間是兩個完全不同的概念,混淆他們會導致數據的損壞。如下圖所示,Web服務器上事件發生的頻率是穩定的,但是流處理器需要定期處理事件,可能這時會停下來一分鍾,處理擠壓的事件,如果這時以事件的處理事件來測量數據,會導致異常的波動結果。

如何確定時間戳
確定事件的時間戳是一件很困難的事,按理來說,事件上的時間戳應該是與用戶交互發生的時間,但是,用戶控制的設備上的時鍾通常不能被信任,因為它可能是偶然或故意設置到錯誤的時間。服務器接收到事件的時間(根據服務器的時鍾)更可能是准確的,但在描述用戶交互方面沒有什么意義。所以這里有三個時間戳的法則:
-
1 .事件發生的時間 (設備時鍾)
-
2 設備將事件發送到服務器的時間 (傳輸計時)
-
3 服務器接收事件的時間 (服務器時鍾)
由第三個時間戳減去第二個時間戳,可以估計設備時鍾和服務器時鍾之間的偏移量,通過這樣的方式來估計事件實際發生的真實時間。
小結:
通過流處理與批處理,我們可以完成一個分布式系統需要的絕大多數計算任務。我們用了16篇的時間走完了對這本書絕大多數內容的梳理,最后一章是一篇大雜燴,作者帶領我們展望自己對於未來數據系統發展的看法,也對之前的內容做了總結。
