簡介: 如何通過 Apache Pulsar 原生的存儲計算分離的架構提供批流融合的基礎,以及 Apache Pulsar 如何與 Flink 結合,實現批流一體的計算。
簡介:StreamNative 聯合創始人翟佳在本次演講中介紹了下一代雲原生消息流平台 Apache Pulsar,並講解如何通過 Apache Pulsar 原生的存儲計算分離的架構提供批流融合的基礎,以及 Apache Pulsar 如何與 Flink 結合,實現批流一體的計算。
Apache Pulsar 相對比較新,它於 2017 年加入 Apache 軟件基金會,2018 年才從 Apache 軟件基金會畢業並成為一個頂級項目。Pulsar 由於原生采用了存儲計算分離的架構,並且有專門為消息和流設計的存儲引擎 BookKeeper,結合 Pulsar 本身的企業級特性,得到了越來越多開發者的關注。今天的分享分為 3 個部分:
- Apache Pulsar 是什么;
- Pulsar 的數據視圖;
- Pulsar 與 Flink 的批流融合。
一、Apache Pulsar 是什么
下圖是屬於消息領域的開源工具,從事消息或者基礎設施的開發者對這些一定不會陌生。雖然 Pulsar 在 2012 年開始開發,直到 2016 年才開源,但它在跟大家見面之前已經在雅虎的線上運行了很長時間。這也是為什么它一開源就得到了很多開發者關注的原因,它已經是一個經過線上檢驗的系統。
Pulsar 跟其他消息系統最根本的不同在於兩個方面:
- 一方面,Pulsar 采用存儲計算分離的雲原生架構;
- 另一方面,Pulsar 有專門為消息而設計的存儲引擎,Apache BookKeeper。
架構
下圖展示了 Pulsar 存儲計算分離的架構:
- 首先在計算層,Pulsar Broker 不保存任何狀態數據、不做任何數據存儲,我們也稱之為服務層。
- 其次,Pulsar 擁有一個專門為消息和流設計的存儲引擎 BookKeeper,我們也稱之為數據層。
這個分層的架構對用戶的集群擴展十分方便:
- 如果想要支持更多的 Producer 和 Consumer,可以擴充上面無狀態的 Broker 層;
- 如果要做更多的數據存儲,可以單獨擴充底層存儲層。
這個雲原生的架構有兩個主要特點:
- 第一個是存儲計算的分離;
- 另外一個特點是每一層都是一個節點對等的架構。
從節點對等來說,Broker 層不存儲數據,所以很容易實現節點對等。但是 Pulsar 在底層的存儲也是節點對等狀態:在存儲層,BookKeeper 沒有采用 master/slave 這種主從同步的方式,而是通過 Quorum 的方式。
如果是要保持多個數據備份,用戶通過一個 broker 並發地寫三個存儲節點,每一份數據都是一個對等狀態,這樣在底層的節點也是一個對等的狀態,用戶要做底層節點的擴容和管理就會很容易。有這樣節點對等的基礎,會給用戶帶來很大的雲原生的便捷,方便用戶在每一層單獨擴容,也會提高用戶的線上系統的可用性和維護性。
同時,這種分層的架構為我們在 Flink 做批流融合打好了基礎。因為它原生分成了兩層,可以根據用戶的使用場景和批流的不同訪問模式,來提供兩套不同的 API。
- 如果是實時數據的訪問,可以通過上層 Broker 提供的 Consumer 接口;
- 如果是歷史數據的訪問,可以跳過 Broker,用存儲層的 reader 接口,直接訪問底層存儲層。
存儲 BookKeeper
Pulsar 另一個優勢是有專門為流和消息設計的存儲引擎 Apache BookKeeper。它是一個簡單的 write-ahead-log 抽象。Log 抽象和流的抽象類似,所有的數據都是源源不斷地從尾部直接追加。
它給用戶帶來的好處就是寫入模式比較簡單,可以帶來比較高的吞吐。在一致性方面,BookKeeper 結合了 PAXOS 和 ZooKeeper ZAB 這兩種協議。BookKeeper 暴露給大家的就是一個 log 抽象。你可以簡單認為它的一致性很高,可以實現類似 Raft 的 log 層存儲。BookKeeper 的誕生是為了服務我們在 HDFS naming node 的 HA,這種場景對一致性要求特別高。這也是為什么在很多關鍵性的場景里,大家會選擇 Pulsar 和 BookKeeper 做存儲的原因。
BookKeeper 的設計中,有專門的讀寫隔離,簡單理解就是,讀和寫是發生在不同的磁盤。這樣的好處是在批流融合的場景可以減少與歷史數據讀取的相互干擾,很多時候用戶讀最新的實時數據時,不可避免會讀到歷史數據,如果有一個專門為歷史數據而准備的單獨的磁盤,歷史數據和實時數據的讀寫不會有 IO 的爭搶,會對批流融合的 IO 服務帶來更好的體驗。
應用場景
Pulsar 場景應用廣泛。下面是 Pulsar 常見的幾種應用場景:
- 第一,因為 Pulsar 有 BookKeeper,數據一致性特別高,Pulsar 可以用在計費平台、支付平台和交易系統等,對數據服務質量,一致性和可用性要求很高的場景。
- 第二種應用場景是 Worker Queue / Push Notifications / Task Queue,主要是為了實現系統之間的相互解耦。
- 第三種場景,與 Pulsar 對消息和隊列兩種場景的支持比較相關。Pulsar 支持 Queue 消費模式,也支持 Kafka 高帶寬的消費模型。后面我會專門講解 Queue 消費模型與 Flink 結合的優勢。
- 第四個場景是 IoT 應用,因為 Pulsar 在服務端有 MQTT 協議的解析,以及輕量級的計算 Pulsar Functions。
- 第五個方面是 unified data processing,把 Pulsar 作為一個批流融合的存儲的基礎。
我們在 2020 年 11 月底的 Pulsar Summit 亞洲峰會,邀請 40 多位講師來分享他們的 Pulsar 落地案例。如果大家對 Pulsar 應用場景比較感興趣,可以關注 B 站上 StreamNative 的賬號,觀看相關視頻。
二、Pulsar 的數據視圖
在這些應用場景中,Unified Data Processing 尤為重要。關於批流融合,很多國內用戶的第一反應是選擇 Flink。我們來看 Pulsar 和 Flink 結合有什么樣的優勢?為什么用戶會選擇 Pulsar 和 Flink 做批流融合。
首先,我們先從 Pulsar 的數據視圖來展開。跟其他的消息系統一樣,Pulsar 也是以消息為主體,以 Topic 為中心。所有的數據都是 producer 交給 topic,然后 consumer 從 topic 訂閱消費消息。
Partition 分區
為了方便擴展,Pulsar 在 topic 內部也有分區的概念,這跟很多消息系統都類似。上面提到 Pulsar 是一個分層的架構,它采用分區把 topic 暴露給用戶,但是在內部,實際上每一個分區又可以按照用戶指定的時間或者大小切成一個分片。一個 Topic 最開始創建的時候只有一個 active 分片,隨着用戶指定的時間到達以后,會再切一個新的分片。在新開一個分片的過程中,存儲層可以根據各個節點的容量,選擇容量最多的節點來存儲這個新的分片。
這樣的好處是,topic 的每一個分片都會均勻地散布在存儲層的各個節點上,實現數據存儲的均衡。如果用戶願意,就可以用整個存儲集群來存儲分區,不再被單個節點容量所限制。如下圖所示,該 Topic 有 4 個分區,每一個分區被拆成多個分片,用戶可以按照時間(比如 10 分鍾或者一個小時),也可以按照大小(比如 1G 或者 2G)切一個分片。分片本身有順序性,按照 ID 逐漸遞增,分片內部所有消息按照 ID 單調遞增,這樣很容易保證順序性。
Stream 流存儲
我們再從單個分片來看一下,在常見流(stream)數據處理的概念。用戶所有的數據都是從流的尾部不斷追加,跟流的概念相似,Pulsar 中 Topic 的新數據不斷的添加在 Topic 的最尾部。不同的是,Pulsar 的 Topic 抽象提供了一些優勢:
- 首先,它采用了存儲和計算分離的架構。在計算層,它更多的是一個消息服務層,可以快速地通過 consumer 接口,把最新的數據返回給用戶,用戶可以實時的獲取最新的數據;
- 另外一個好處是,它分成多個分片,如果用戶指定時間,從元數據可以找到對應的分片,用戶可以繞過實時的流直接讀取存儲層的分片;
- 還有一個優勢是,Pulsar 可以提供無限的流存儲。
做基礎設施的同學,如果看到按照時間分片的架構,很容易想到把老的分片搬到二級存儲里面去,在 Pulsar 里也是這樣做的。用戶可以根據 topic 的消費熱度,設置把老的,或者超過時限或大小的數據自動搬到二級存儲中。用戶可以選擇使用 Google,微軟的 Azure 或者 AWS 來存儲老的分片,同時也支持 HDFS 存儲。
這樣的好處是:對最新的數據可以通過 BookKeeper 做快速返回,對於老的冷數據可以利用網絡存儲雲資源做一個無限的流存儲。這就是 Pulsar 可以支持無限流存儲的原因,也是批流融合的一個基礎。
總體來說,Pulsar 通過存儲計算分離,為大家提供了實時數據和歷史數據兩套不同的訪問接口。用戶可以依據內部不同的分片位置,根據 metadata 來選擇使用哪種接口來訪問數據。同時根據分片機制可以把老的分片放到二級存儲中,這樣可以支撐無限的流存儲。
Pulsar 的統一體現在對分片元數據管理的方面。每個分片可以按照時間存放成不同的存儲介質或格式,但 Pulsar 通過對每個分片的 metadata 管理,來對外提供一個分區的邏輯概念。在訪問分區中的一個分片的時候我可以拿到它的元數據,知道它的在分區中的順序,數據的存放位置和保存類型 Pulsar 對每一個分片的 metadata 的管理,提供了統一的 topic 的抽象。
三、Pulsar 和 Flink 的批流融合
在 Flink 中,流是一個基礎的概念,Pulsar 可以作為流的載體來存儲數據。如果用戶做一個批的計算,可以認為它是一個有界的流。對 Pulsar 來說,這就是一個 Topic 有界范圍內的分片。
在圖中我們可以看到,topic 有很多的分片,如果確定了起止的時間,用戶就可以根據這個時間來確定要讀取的分片范圍。對實時的數據,對應的是一個連續的查詢或訪問。對 Pulsar 的場景來說就是不停的去消費 Topic 的尾部數據。這樣,Pulsar 的 Topic 的模型就可以和 Flink 流的概念很好的結合,Pulsar 可以作為 Flink 流計算的載體。
- 有界的計算可以視為一個有界的流,對應 Pulsar 一些限定的分片;
- 實時的計算就是一個無界的流,對 Topic 中最新的數據做查詢和訪問。
對有界的流和無界的流,Pulsar 采取不同的響應模式:
-
第一種是對歷史數據的響應。如下圖所示,左下角是用戶的 query,給定起止的時間限定流的范圍。對 Pulsar 的響應分為幾步:
- 第一步,找到 Topic,根據我們統一管理的 metadata,可以獲取這個 topic 里面所有分片的 metadata 的列表;
- 第二步,根據時間限定在 metadata 列表中,通過兩分查找的方式來獲取起始分片和終止的分片,選擇需要掃的分片;
- 第三步,找到這些分片以后通過底層存儲層的接口訪問需要訪問的這些分片,完成一次歷史數據的查找。
- 對實時數據的查找,Pulsar 也提供和 Kafka 相同的接口,可以通過 consumer 的方式來讀取最尾端分片(也就是最新的數據),通過 consumer 接口對數據進行實時訪問。它不停地查找最新的數據,完成之后再進行下一次查找。這種情況下,使用 Pulsar Pub/Sub 接口是一種最直接最有效的方式。
簡單來說,Flink 提供了統一的視圖讓用戶可以用統一的 API 來處理 streaming 和歷史數據。以前,數據科學家可能需要編寫兩套應用分別用來處理實時數據和歷史數據,現在只需要一套模型就能夠解決這種問題。
Pulsar 主要提供一個數據的載體,通過基於分區分片的架構為上面的計算層提供流的存儲載體。因為 Pulsar 采用了分層分片的架構,它有針對流的最新數據訪問接口,也有針對批的對並發有更高要求的存儲層訪問接口。同時它提供無限的流存儲和統一的消費模型。
四、Pulsar 現有能力和進展
最后我們額外說一下 Pulsar 現在有怎樣的能力和最近的一些進展。
現有能力
schema
在大數據中,schema 是一個特別重要的抽象。在消息領域里面也是一樣,在 Pulsar 中,如果 producer 和 consumer 可以通過 schema 來簽訂一套協議,那就不需要生產端和消費端的用戶再線下溝通數據的發送和接收的格式。在計算引擎中我們也需要同樣的支持。
在 Pulsar-Flink connector 中,我們借用 Flink schema 的 interface,對接 Pulsar 自帶的 Schema,Flink 能夠直接解析存儲在Pulsar 數據的 schema。這個 schema 包括兩種:
- 第一種是我們常見的對每一個消息的元數據(meatdata)包括消息的 key、消息產生時間、或是其他元數據的信息。
- 另一種是對消息的內容的數據結構的描述,常見的是 Avro 格式,在用戶訪問的時候就可以通過Schema知道每個消息對應的數據結構。
同時我們結合 Flip-107,整合 Flink metadata schema 和 Avro 的 metadata,可以將兩種 Schema 結合在一起做更復雜的查詢。
source
有了這個 schema,用戶可以很容易地把它作為一個 source,因為它可以從 schema 的信息理解每個消息。
Pulsar Sink
我們也可以把在 Flink 中的計算結果返回給 Pulsar 把它做為 Sink。
Streaming Tables
有了 Sink 和 Source 的支持,我們就可以把 Flink table 直接暴露給用戶。用戶可以很簡單的把 Pulsar 作為 Flink 的一個 table,查找數據。
write to straming tables
下圖展示如何把計算結果或數據寫到 Pulsar 的 Topic 中去。
Pulsar Catalog
Pulsar 自帶了很多企業流的特性。Pulsar 的 topic(e.g. persistent://tenant_name/namespace_name/topic_name)不是一個平鋪的概念,而是分很多級別。有 tenant 級別,還有 namespace 級別。這樣可以很容易得與 Flink 常用的 Catalog 概念結合。
如下圖所示,定義了一個 Pulsar Catalog,database 是 tn/ns,這是一個路徑表達,先是 tenant,然后是 namespace,最后再掛一個 topic。這樣就可以把Pulsar 的 namespace 當作 Flink 的 Catalog,namespace 下面會有很多 topic,每個 topic 都可以是 Catalog 的 table。這就可以很容易地跟 Flink Cataglog 做很好的對應。在下圖中,上方的是 Catalog 的定義,下方則演示如何使用這個 Catalog。不過,這里還需要進一步完善,后邊也有計划做 partition 的支持。
FLIP-27
FLIP-27 是 Pulsar - Flink 批流融合的一個代表。前面介紹了 Pulsar 提供統一的視圖,管理所有 topic 的 metadata。在這個視圖中,根據 metadata 標記每個分片的信息,再依靠 FLIP-27 的 framework 達到批流融合的目的。FLIP-27 中有兩個概念:Splitter 和 reader。
它的工作原理是這樣的,首先會有一個 splitter 把數據源做切割,之后交給 reader 讀取數據。對 Pulsar 來說,splitter 處理的還是 Pulsar 的一個 topic。抓到 Pulsar topic 的 metadata 之后,根據每個分片的元數據來判斷這個分片存儲在什么位置,再選最合適的 reader 進行訪問。Pulsar 提供統一的存儲層,Flink 根據 splitter 對每個分區的不同位置和格式的信息,選擇不同的 reader 讀取 Pulsar 中的數據。
Source 高並發
另一個和 Pulsar 消費模式緊密相關的是。很多 Flink 用戶面臨的問題是如何讓 Flink 更快地執行任務。例如,用戶給了 10 個並發度,它會有 10 個 job 並發,但假如一個 Kafka 的 topic 只有 5 個分區,由於每個分區只能被一個 job 消費,就會有 5 個 Flink job 是空閑的。如果想要加快消費的並發度,只能跟業務方協調多開幾個分區。這樣的話,從消費端到生產端和后邊的運維方都會覺得特別復雜。並且它很難做到實時的按需更新。
而 Pulsar 不僅支持 Kafka 這種每個分區只能被一個 active 的 consumer 消費的情況,也支持 Key-Shared 的模式,多個 consumer 可以共同對一個分區進行消費,同時保證每個 key 的消息只發給一個 consumer,這樣就保證了 consumer 的並發,又同時保證了消息的有序。
對前面的場景,我們在 Pulsar Flink 里做了 Key-shared 消費模式的支持。同樣是 5 個分區,10 個並發 Flink job。但是我可以把 key 的范圍拆成 10 個。每一個 Flink 的子任務消費在 10 個 key 范圍中的一個。這樣從用戶消費端來說,就可以很好解耦分區的數量和 Flink 並發度之間的關系,也可以更好提供數據的並發。
自動 Reader 選擇
另外一個方向是上文提到的 Pulsar 已經有統一的存儲基礎。我們可以在這個基礎上根據用戶不同的 segment metadata 選擇不同的 reader。目前,我們已經實現該功能。
近期工作
最近,我們也在做和 Flink 1.12 整合相關的工作。Pulsar-Flink 項目也在不停地做迭代,比如我們增加了對 Pulsar 2.7 中事務的支持,並且把端到端的 Exactly-Once 整合到 Pulsar Flink repo 中;另外的工作是如何讀取 Parquet 格式的二級存儲的列數據;以及使用 Pulsar 存儲層做 Flink 的 state 存儲等。