非常榮幸有機會和大家分享一下 Apache Pulsar 怎樣為批流處理提供融合的存儲。希望今天的分享對做大數據處理的同學能有幫助和啟發。
這次分享,主要分為四個部分:
- 介紹與其他消息系統相比, Apache Pulsar 的獨特優勢
- 分析批流處理中的存儲需求
- 講述 Apache Pulsar 如何完美匹配批流處理中的存儲需求
- 介紹怎樣使用 Apache Pulsar 提供批流融合的存儲
Apache Pulsar 簡介

Apache Pulsar 是新近開源的一個大規模分布式消息系統,是 Apache 的頂級項目,在 Yahoo 全球數十個機房大規模部署並線上穩定使用了 4 年多。Apache Pulsar 設計中學習和借鑒了其他優秀的分布式系統,在保證一致性和高吞吐的同時,也提供了其他優秀特性,比如支持上百萬的 Topic、無縫的多中心互備、靈活的擴展性等。
這里我們簡單介紹一下,與其他消息系統相比, Apache Pulsar 擁有的獨特優勢,大致有以下3點:
- 獨特的軟件架構(存儲和計算分離,分層分片的存儲)
- 靈活的消費模型( Exclusive、Failover、Shared 和 KeyShared)
- 豐富的企業特性(多租戶)
在介紹 Apache Pulsar 時,通常會用這樣一句話,“Flexible Pub-Sub Messaging backed by durable log Storage”。這句話表明了 Pulsar 和其他消息系統的根本不同,它采用了存儲和計算分離的架構。
Pulsar 的服務層使用 Broker,存儲層使用 BookKeeper,來提供高效和一致的存儲。

從架構上來說,Apache Pulsar 采用了分層和分片的架構。這是 Pulsar 滿足批流處理中存儲需求的基礎。
在 Apache Pulsar 的分層架構中,服務層 Broker 和存儲層 BookKeeper 的每個節點都是對等的。Broker 僅僅負責消息的服務支持,不存儲數據。這為服務層和存儲層提供了瞬時的節點擴展和無縫的失效恢復。
存儲層 BookKeeper 為 WAL(Write Ahead Log)提供了存儲,是一個分布式的 Log 存儲系統。
WAL 和數據處理中的流有很多相似性,都是數據源源不斷地追加,都對順序和一致性有嚴格要求。
BookKeeper 通過 Quorum Vote 的方式來實現數據的一致性,跟 Master/Slave 模式不同,BookKeeper 中每個節點也是對等的,對一份數據會並發地同時寫入指定數目的存儲節點。對等的存儲節點,保證了多個備份可以被並發訪問;也保證了存儲中即使只有一份數據可用,也可以對外提供服務。
Apache Pulsar 通過分層分片的架構,將邏輯的分區轉化為分片來作為存儲單元。這為數據的並發訪問提供了基礎。

除了架構的不同,從用戶接口來說,Apache Pulsar 通過訂閱的抽象,提供了靈活的消費模型。每一個訂閱類似一個 Consumer Group,接收一個 topic 的所有的消息。用戶可以使用不同的訂閱類型、以不同的模式來共同消費同一個 Topic 中的消息。
如果對順序性有要求,可以使用 Exclusive 和 Failover 的訂閱模式,這樣同一個 Topic 只有一個 Consumer 在消費,可以保證順序性。
如果使用 Shared 訂閱模式,多個 Consumer 可以並發消費同一個 Topic。通過動態增加 Consumer 的數量,可以加速 Topic 的消費,減少消息在服務端的堆積。
Pulsar 即將發布的 2.4.0 版本添加了一種新的訂閱模式: KeyShared。KeyShared 模式保證在 Shared 模式下同一個 Key 的消息也會發送到同一個 Consumer,在並發的同時也保證了順序性。
Apache Pulsar 靈活的消費模型,避免了因為不同的消費場景需要部署多套消息系統的場景,消除了數據生產端的數據分離。

此外,Apache Pulsar 是以多租戶為基礎的豐富的企業級特性。企業內部可以搭建一套 Pulsar 集群,在集群中給各個部門分配不同的租戶,並設置租戶的管理權限。租戶的管理員再根據部門的不同業務和場景需求,創建不同的 Namespace。在 Namespace 中可以設置管理策略,比如流控,Quota,互備的集群,數據副本數等。這樣為 Topic 的管理提供了一個層級的可控的視圖。
Apache Pulsar 的企業級特性,為企業搭建統一大集群提供了基礎,方便了集群的管理和數據的共享。
以上是關於 Apache Pulsar 的簡單介紹,歡迎參閱 Apache Pulsar 的官網和微信公眾號了解更多內容。
批流處理中的存儲現狀

在大數據處理剛剛興起的時候,一般用戶會采用 λ 架構,維護批流兩套系統:批系統主要處理歷史數據; 流系統處理實時的數據,對批系統的結果進行補充來提高時效。兩套系統造成數據冗余,增加維護成本。
在存儲層,批處理常使用 HDFS 和網絡對象存儲等;流處理常使用 Kafka 或其他的消息系統。
為了解決 λ 架構的問題,逐漸演化出 κ 架構,使用一套系統來滿足實時數據處理和歷史數據處理的需求。
在 κ 架構中,數據的“可重復處理”是關鍵。一方面要求實時數據能及時獲取最新數據,處理完立即導出給其他系統使用;另一方面要滿足處理歷史數據的需求,需要具備讀大量歷史數據的能力。實時數據的處理決定了必須使用消息系統,但是消息系統並不能完全滿足批處理的並發需求。
在前面的分享中,百度和阿里的專家分享了計算層的批流融合。我們認為批流融合存儲層的需求是一個融合的存儲表征: 消息系統 + 並發的存儲訪問。
為什么 Apache Pulsar 能滿足批流處理中的存儲需求
下面我們從 “Apache Pulsar 提供的存儲抽象”、“批流處理中的 IO 模式”和 “Apache Pulsar 提供的無限流存儲” 這三個方面來解釋為什么 Apache Pulsar 能滿足批流融合的存儲需求。
Segmented Stream 存儲表征

前面我們介紹了 Apache Pulsar 首先是一個消息系統,它和其他消息系統類似,提供了簡潔的以 Topic,Producer,Consumer 為基礎的 Pub/Sub 模型。
Pulsar 靈活的訂閱模式和高帶寬、低延遲特性,能夠很好的滿足流處理的需求。
Apache Pulsar 的 Topic 可以分為不同的分區。和其他消息系統不同的是 Apache Pulsar 利用分片的架構,每個邏輯分區又進行了分片。
在分層分片的架構中,分片是存儲的單元,可以類比 HDFS 中的一個文件塊,分片被均勻地分布在存儲層的 BookKeeper 節點中。

我們再從批流處理的角度來看 Apache Pulsar 的這種分片(Segment)的架構:
- 對於流處理來說,Apache Pulsar 的每個 Partition 就是流處理的一個流,它通過 Pub/Sub 的接口來給流處理提供數據交互。
- 對於批處理來說,Apache Pulsar 以分片為粒度,可以為批處理提供數據的並發訪問。
一方面,Apache Pulsar 中每個 Partition 都可以看做是源源不斷流入數據的載體,借助於分片和二級存儲,Apache Pulsar 有能力將 Partition 所有流入的數據都保存下來。這樣每個 Partition 都可以看作是 Stream 的存儲抽象。
另一方面, Apache Pulsar 的 Partition 是邏輯分區的概念,分區內部又被分成分片,作為存儲和 IO 訪問的單元。
結合這兩個概念,我們把 Apache Pulsar 對每個 Partiton 的存儲表征稱為 Segmented Stream。
通過 Pulsar 的 Segmented Stream 抽象,為批流處理提供了一個統一的存儲表征。
匹配批流處理中的 IO 模式
介紹了 Apache Pulsar 的 Segmented Stream 的存儲表征后,下面我們結合批流處理中數據的三種常用的訪問模式: Write,Tailing Read 和 Catchup Read,來看看 Apache Pulsar 這種架構的合理性。這里主要會討論延遲、IO 的並發和隔離,並用大家比較熟悉的 Kafka 系統來對比說明。
- Write:往 Stream 中添加新的數據。
- Tailing Read:讀最新的數據。
- Catchup Read:讀歷史老數據。

對於 Write 這種模式,所有的寫都直接追加在 Stream 的尾部。對於和 Kafka 類似的 Master/Slave 架構系統來說,數據會先寫入 Leader Broker,再發送給其他 Follower Broker。
Apache Pulsar 的寫先發送到 broker,然后 broker 作為存儲代理,並發將數據發送給存儲層的多個 Bookie 節點。兩種架構都會有兩次網絡跳躍。
對於 Write 模式,延遲差別不大。

Tailing Read 是流處理中的常用模式。它從 Stream 的尾部讀取最新寫入的數據。
對於和 Kafka 類似的系統,Tailing Read 會從 Leader Broker 直接讀取。對於 Apache Pulsar,在 Broker 中有一段自維護的 Cache 來緩存剛剛寫入的最新數據,Tailing Read 直接從 Broker 獲取數據並返回。
兩種架構都只有 1 次網絡跳躍。對 Tailing Read 模式,延遲差別不大。

Catchup Read 是批處理中常用的讀取模式。它從 Stream 的指定位置,讀取一定量的歷史數據。這種場景一般對數據的讀取量比較大,注重讀取的帶寬。
對於 Kafka 類似的系統,Catchup Read 一般還是會使用 Pub/Sub 的接口,從 Leader Broker 直接讀取。對於 Apache Pulsar,我們可以從 Broker 中讀取元數據,獲取 partition 中分片的起始位置和分片在 BookKeeper 中的存儲信息,繞過 Pub/Sub 接口,利用 BookKeeper 的 Read 接口,直接從存儲層並發訪問多個分片。BookKeeper 提供了多副本的高可用,提升了讀取歷史數據的並發能力。

如果我們把這三種 IO 模式放在一起看就更有意思了。 這可以類比用戶在某時間段,對 Stream 既有最新數據讀寫,也有歷史數據讀寫的情形。這是在批流融合中經常遇到的場景。
對和 Kafka 類似的系統,這三種 IO 模式都會發生在 Leader Broker。在 Leader Broker 中,系統的數據都需要通過文件系統的 Pagecache,歷史數據和最新的數據會爭用 Pagecache 資源,造成讀寫響應不及時。
如果這時再遇到 Broker 磁盤空間寫滿,需要擴容的情況,那就需要等待數據的搬移和 rebalance 的操作。這時,IO 的延遲和服務質量很難得到保障。
Apache Pulsar Segmented Stream 的存儲表征,結合分層分片的架構,為新數據和歷史數據做了天然的隔離。最新的數據 IO 發生在 Broker 層。
對歷史數據的並發讀寫,直接發生在存儲節點。冷熱數據被天然隔離,用戶完全不用擔心 IO 的沖突和爭用。Apache Pulsar 在節點擴容和錯誤恢復的過程中,也不會有數據大量拷貝和 rebalance,因此提升了系統的高可用性。
通過這三種 IO 模式的說明和對比,我們發現 Pulsar Segmented Stream 的存儲表征,再結合分層分片的架構,可以很好地滿足批流處理中對存儲系統的需求。
無限的流存儲支持

Pulsar Segmented Stream 的存儲表征,很好地模擬了現實中 Stream 數據。對於流存儲的另一個需求是理論上無限的存儲空間。這樣可以滿足對歷史數據的存儲和訪問需求。Apache Pulsar 從兩個方面解決了這個問題。
一方面 Pulsar 的存儲層中,分片會均衡地分布到所有的存儲節點中,這避免了其他系統中單一broker 存儲容量的限制,進而可以利用整個集群的存儲空間。
另一方面,Pulsar 的分片架構,為數據的二級存儲擴展提供了很好的基礎。對於Segmented Stream,用戶可以設置 Segment 在 BookKeeper 中保留的時間或大小。如果超過設定的值,將舊的 Segment 遷移到廉價的二級存儲,比如 Aws S3,Google Cloud Storage,或者HDFS 中。二級存儲的帶寬一般有保障,可以滿足歷史數據的批處理模式。 通過二級存儲可以減輕無限存儲的成本。
小結
Pulsar 利用自身的分層分片的架構,提供了 Segmented Stream 的存儲表征,滿足了批流融合的存儲需求。
- 通過 Pulsar Pub/Sub 接口訪問 Segmented Stream,可以滿足流處理的存儲需求;
- 通過 Pulsar 存儲層對 segment 的訪問接口(Segment Reader),可以滿足批處理的並發訪問需求。
從批流處理的 IO 模式分析中可以發現,Pulsar 的架構可以很好地處理批流處理中的 IO 並發和隔離。並且 Pulsar 提供了理論上無限流存儲的能力,能夠滿足批處理中,對海量歷史數據的存儲需求。
怎樣使用 Pulsar 提供批流融合的存儲
前面我們介紹了為什么 Pulsar 的架構能滿足批流融合的存儲需求。接着我們會介紹 Pulsar是如何在工程上實現的。

基於 Segmented Stream 存儲的表征,我們很容易區分和支持批處理和流處理。批處理所請求的數據可以看做是一個有邊界的流(Bounded Stream)。流處理所請求的數據可以看做是一個沒有邊界的流(UnBounded Stream)。
下面我們看在 Pulsar 內部,批處理和流處理會怎樣訪問 Segmented Stream。

這里的代碼是一個計算廣告點擊率的 SQL 語句。如果用戶想要查詢某個時間段內的點擊率,會提供點擊事件的起止時間。起止時間可以確定一個流的起止邊界,進而確定一個 Bounded Stream。這是一個典型的批處理場景。
對 Pulsar 的處理來說,首先根據起止時間來確定和獲取所需要的 Segments 列表;然后選擇這些Segments,繞過 pub/sub 接口,直接通過 Pulsar 的 Segment Reader 接口,來訪問 Pulsar 的存儲層。

流處理是一系列不會停止的 Windows 訪問和查詢。與批處理相比,流處理它沒有截止的時間點,即使查詢到當前時刻,它仍然繼續對當前的 window 不斷地查詢,一個 window 處理結束,接着處理下一個 window。它的 SQL 查詢語句不會變化,但是查詢 window 中的數據會不斷實時更新,它是一個源源不斷的、不停處理最新數據的方式。
對於這種訪問模式,直接使用 Pulsar 的 pub/sub 接口就可以直接獲取最新的消息,滿足流處理的需求。

對批流融合,在計算層,更多關注的是批流融合的計算模型、API 和運行時的統一。在存儲層,通過 Segmented Stream 的存儲表征,為批流數據提供了統一的數據存儲和組織方式。
針對批流處理的不同訪問模式,Pulsar 提供了兩套 API 接口。流處理使用 Pub/Sub 的接口;批處理使用 Parallel Segment Read(PSegment)的接口。
對於批處理的接口,我們在 Pulsar SQL 里面做了一個嘗試,Pulsar SQL 借助 Presto,對寫入Pulsar 中的數據進行交互式的查詢。
如果你想體驗 Pulsar SQL,可以查看 Pulsar 的 SQL手冊。
Pub/Sub 的接口已經比較完善,我們最近在豐富和完善 PSegment 接口。

在 PSegment 中,我們的主要工作是集成Pulsar 和 Flink、Spark、Hive 及 Presto 。這些工作主要集中在 API 的實現和 Schema 的整合。這些工作完成之后,我們會開源這部分的代碼。
總結
Pulsar 是下一代雲原生的消息和流存儲的平台。我們認為消息和流是一份數據的兩種不同表征方式。Pulsar 采用了存儲計算分離的分層架構和分區內再分片的存儲架構,這種架構能夠提供基於Segmented Stream 的存儲表征,能為批和流處理提供融合的存儲基礎。
作者翟佳,StreamNative 聯合創始人兼 CTO,本文為其 InfoQ 技術大會演講的內容整理。
