本篇主要總結下Pulsar的整體架構以及各組件的工作原理。
1.概述
在最高級別,單個 Pulsar 實例由一個或多個 Pulsar 集群組成,同一實例中的集群之間可
以相互復制數據;單個 Pulsar 集群由以下三部分組成:
1)一個或者多個 Broker,負責處理和負載均衡生產者發出的消息,並將這些消息分派給消費者,它與配置存儲交互來處理相應的任務,並將消息存儲在 BookKeeper 實例中(又稱 bookies),Broker 依賴 ZooKeeper 集群處理特定的任務,等等;
2)由一個或多個 bookie 組成的 BookKeeper集群,負責消息的持久化存儲;
3)一個Zookeeper集群,用來處理集群級別的的配置和協調任務;
集群說明如下:
在更大的實例層面,有一個叫做配置存儲的ZooKeeper集群能訪問到全部實例,負責處理多集群間的協調任務,例如異地復制。
2.Broker
一個無狀態組件,主要運行兩個服務組件
1)http服務,監聽8080端口,暴露一個rest接口,供管理員管理集群和為producer和consumer提供topic查詢,可以使用命令行或url方式;
2)tcp服務,監聽6650端口,是一個調度器,用來處理所有數據的傳輸;
出於性能的考慮, 消息通常從 managed ledger 緩存中調度, 除非 backlog超過了緩存的大小,如果 backlog對於緩存來說太大了, 則Broker將開始從BookKeeper那里讀取數據條目;
最后,為了支持全局Topic異地復制,Broker會控制Replicators追蹤本地發布的條目,並通過Java 客戶端把這些條目重新發布到其他區域。
3.持久化存儲
Pulsar為應用提供有保證消息傳遞,即未確認送達的消息需要持久化存儲直到它們被確認送達,在Pulsar內部,所有消息都被保存並同步N份;
1)Apache BookKeeper
Pulsar用 Apache BookKeeper作為持久化消息存儲,BookKeeper是一個分布式的預寫日志(WAL)系統,有如下幾個特性特別適合Pulsar的應用場景:
- 允許Pulsar創建多個獨立的日志,這種獨立的日志就是ledgers,隨着時間的推移,Pulsar會為Topic創建多個ledgers;
- 為按條目復制的順序數據提供了非常高效的存儲;
- 保證了多系統掛掉時ledgers的讀取一致性;
- 提供了不同的Bookies之間IO均勻分布的特性;
- 容量和吞吐量都能水平擴展,並且容量可以通過在集群內添加更多的Bookies立刻得到提升;
- Bookies被設計成可以承載數千的並發讀寫的ledgers,通過使用多個磁盤設備,一個用於journal,另一個用於一般存儲,這樣Bookies可以將讀操作的影響和對於寫操作的延遲分隔開;
除了消息數據,cursors也會被持久化入BookKeeper,cursors是消費端訂閱消費的位置;
下圖展示了brokers和bookies是如何交互的:
2)Ledgers
Ledger是一個只追加的數據結構,並且只有一個寫入器,這個寫入器負責多個BookKeeper存儲節點(就是Bookies)的寫入,Ledger的條目會被復制到多個bookies;
Ledgers本身有着非常簡單的語義:
- Pulsar Broker可以創建ledeger,添加內容到ledger和關閉ledger;
- 當一個ledger被關閉后,無論是明確關閉或者是因為寫入器掛掉,這個ledger只會以只讀模式打開;
- 當ledger中的條目不再有用的時候,整個legder可以被刪除(跨所有Bookies);
Ledger讀一致性:
BookKeeper的主要優勢在於他能在有系統故障時保證讀的一致性, 由於Ledger只能被一個進程寫入(之前提的寫入器進程),在寫入時不需要考慮一致性,從而寫入會非常高效;在一次故障之后,ledger會啟動一個恢復進程來確定ledger的最終狀態並確認最后提交到日志的是哪一個條目;在這之后,能保證所有的ledger讀進程讀取到相同的內容;
Managed ledgers:
由於BookKeeper Ledgers提供了單一的日志抽象,在ledger的基礎上我們開發了一個叫managed ledger的庫,用以表示單個topic的存儲層;managed ledger即消息流的抽象,有一個寫入器進程不斷在流結尾添加消息,並且有多個cursors 消費這個流,每個cursor有自己的消費位置;
一個managed ledger在內部用多個BookKeeper ledgers保存數據,這么做有兩個原因:
- 在故障之后,原有的某個ledger不能再寫了,需要創建一個新的;
- ledger在所有cursors消費完它所保存的消息之后就可以被刪除,這樣可以實現ledgers的定期回滾;
說明:
Entry和Ledger都是BookKeeper中的概念,Entry相當於一條記錄,而Ledger相當於一種記錄的集合;一個topic擁有一個managed ledger,每個managed ledger下面可以有多個ledgers,一個ledger也是一個segment,即分片,在bookie中,數據是分片存儲的;
3)Journal storage
BookKeeper中的journal文件包含事務日志,在更新到 ledger之前,bookie需要確保描述這個更新的事務被寫到持久(非易失)存儲上面;在bookie啟動或舊的journal文件大小達到上限(由 journalMaxSizeMB 參數配置)的時候,新的journal文件會被創建;
4.元數據存儲
Pulsar 元數據存儲維護一個 Pulsar 集群的所有元數據,例如topic、schema、broker負載等,Pulsar 使用Apache ZooKeeper進行元數據存儲、集群配置和協調;Pulsar 元數據存儲可以部署在單獨的 ZooKeeper 集群上,也可以部署在現有的 ZooKeeper 集群上;您可以將一個 ZooKeeper 集群同時用於 Pulsar 元數據存儲和BookKeeper 元數據存儲,如果要部署連接到現有 BookKeeper 集群的 Pulsar broker,則需要分別為 Pulsar 元數據存儲和 BookKeeper 元數據存儲部署單獨的 ZooKeeper 集群;
在 Pulsar 實例中:
- 配置存儲保存了 tenants, namespaces以及其他需要全局一致的配置項;
- 每個集群有自己獨立的ZooKeeper集群內保存部配置和協調信息,例如topic歸屬信息,broker負載報告,BookKeeper ledger信息等;
5.配置存儲
配置存儲維護一個 Pulsar 實例的所有配置,例如集群、租戶、命名空間、分區主題相關配置等,一個 Pulsar 實例可以有一個本地集群、多個本地集群或多個跨區域集群;因此,配置存儲可以在 Pulsar 實例下的多個集群之間共享配置,配置存儲可以部署在單獨的 ZooKeeper 集群上,也可以部署在現有的 ZooKeeper 集群上;
6.Pulsar proxy
Pulsar客戶端和Pulsar集群交互的一種方式就是直連Pulsar brokers,然而,在某些情況下,這種直連既不可行也不可取,因為客戶端並不知道broker的地址,例如在雲環境或者 Kubernetes 以及其他類似的系統上面運行Pulsar,直連brokers就基本上不可能了;
Pulsar proxy提供了解決這個問題的方案,它可以作為集群中的所有brokers的統一網關,如果你選擇運行Pulsar Proxy(這是可選的),所有的客戶端連接將會通過這個代理而不是直接與brokers通信,出於性能和容錯的考慮,你可以運行任意個Pulsar proxy;
注意點:
- 連接客戶端不需要為使用Pulsar proxy提供任何特定配置;
- Pulsar proxy支持TLS 加密 和 認證;
7.服務發現
客戶端需要能夠使用單個 URL 與整個 Pulsar 集群進行通信,Pulsar內部提供了服務發現的機制,你也可以用你自己的服務發現系統,當客戶端發送一個HTTP請求時,例如發到http://pulsar.us-west.example.com:8080,客戶端需要被重定向到某些所需的集群中活躍的broker,或者通過DNS,或者通過HTTP和IP重定向,或者其他機制;
注意:
在Pulsar中,每個主題只由一個 broker 處理,客戶端發出的讀取,更新或刪除主題的初始請求可能發送給不是處理該主題的 broker,如果這個 broker 不能處理該主題的請求,broker 將會把該請求重定向到可以處理該主題請求的 broker上。
8.消息發送到落盤過程
幾個概念:
topic:用於將消息從生產者傳輸到消費者的通道,producer發布消息到topic, consumer訂閱topic並處理發布的消息
bundle:切分命名空間的一段哈希值范圍,被獨立的分配到不同的broker,每個topic會根據其名稱算出的哈希值來判斷需要分到哪一個特定的bundle,即分配到了不同的broker上,相當於所有topic的一個子集
broker:一個無狀態組件,主要運行兩個服務組件,http服務(8080)和tcp服務(6650),一個管理集群,一個傳輸數據
entry:存儲到bookkeeper中的一條記錄
ledger:用來存儲entry的,多個entry序列組成一個ledger
managed ledger:單個topic的存儲層,下面可以有多個ledgers,數據的寫入是通過managed ledger完成的
bookie:bookkeeper集群中的一個存儲節點,用於存儲ledger,因為存儲是分布式的,每個ledger會存儲在多個bookie上
entry log:存儲entry的文件
index file:ledger的索引文件,記錄每個ledger在entry log中的存儲位置以及數據在entry log文件中的長度
ledger cache:用於緩存數據和索引文件的,加快查找效率
journal:用於存儲bookkeeper的事務日志,在數據刷新到ledger之前,持久化存儲這個刷新的事務,防止緩存中的數據丟失,用於數據恢復,可定時刪除已刷盤數據,避免占用存儲空間
整個過程:
客戶端指定一個或多個broker地址,執行時會先和任意一個broker建立連接,發送一個http請求查詢topic所在broker,然后和該broker建立一個tcp連接並進行認證和鑒權,通過后,客戶端會為該broker創建一個生產者對象;
生產者發送消息到該broker,broker接收到消息后,會先緩存在自己的內存中,然后查找幾個合適的bookie節點,同時啟動幾個線程分別發送數據到不同的bookie節點,並等待bookie返回ack確認;
bookie在接收到broker發來的數據后,數據和索引文件會先在內存中緩存,不是立刻寫入磁盤,當內存達到一定值或者達到刷盤時間后,會觸發刷盤操作,將數據和索引持久化到磁盤,這樣的話,在刷盤之前可能會因為宕機或其他異常導致緩存中的數據丟失,所以在刷盤之前,還會把緩存中的數據持久化到journal文件中,並記錄journal文件的id和對應位置,這樣在數據丟失之后可以通過journal文件做數據恢復,而且在刷盤之后或者按周期時間,會刪除已刷盤數據之前的journal,避免占用存儲空間;
對於bookie來說,當數據持久化到journal文件后,即會給broker返回ack確認,然后broker給客戶端返回ack確認,一條數據的發送過程完成。
參考官方文檔 https://pulsar.apache.org/docs/zh-CN/next/concepts-architecture-overview/