Pulsar 學習記錄


一、簡介

Apache Pulsar is a cloud-native, distributed messaging and streaming platform originally created at Yahoo! and now a top-level Apache Software Foundation project。

Apache Pulsar 是 Apache 軟件基金會頂級項目,是下一代雲原生分布式消息流平台,集消息、存儲、輕量化函數式計算為一體。

采用計算與存儲分離架構設計,支持多租戶、持久化存儲、多機房跨區域數據復制,具有強一致性、高吞吐、低延時及高可擴展性等流數據存儲特性。

在消息領域,Pulsar 是第一個將存儲計算分離雲原生架構落地的開源項目。

該系統源於 Yahoo,最初在 Yahoo 內部開發和部署,支持 Yahoo 應用服務平台 140 萬個主題,日處理超過 1000 億條消息。

Pulsar 於 2016 年由 Yahoo 開源並捐贈給 Apache 軟件基金會進行孵化,2018 年成為 Apache 軟件基金會頂級項目。

 

Apache Pulsar 提供了統一的消費模型,支持 Stream(如 Kafka)和 Queue(如 RabbitMQ)兩種消費模型, 支持 exclusive、failover 和 shared 三種消費模式。

同時,Pulsar 提供和 Kafka 兼容的 API,以及 Kafka-On-Pulsar(KoP) 組件來兼容 Kafka 的應用程序,

KoP 在 Pulsar Broker 中解析 Kafka 協議,用戶不用改動客戶端的任何 Kafka 代碼就能直接使用 Pulsar。

目前,Apache Pulsar 已經應用部署在國內外眾多大型互聯網公司和傳統行業公司,

案例分布在人工智能、金融、電信運營商、直播與短視頻、物聯網、零售與電子商務、在線教育等多個行業,

如美國有線電視網絡巨頭Comcast、Yahoo!、騰訊、中國電信、中國移動、BIGO、VIPKID 等。

目前 Apache Pulsar 項目原生核心貢獻者已組成創業公司 StreamNative,進一步為 Apache Pulsar 提供更好的企業級服務支持與生態建設。

 

 

二、核心概念

instance:實例

cluster:集群

tenant:租戶

namespace:命名空間

topic:主題

partitioned topic:可分區主題

broker:Pulsar集群計算節點

bookie:BookKeeper存儲節點

 

pub-sub:發布-訂閱模型

producer:生產者

subscription:訂閱(相當於 consumer group),一個Topic可以添加多個訂閱。訂閱不包含消息的數據,只包含元數據和游標。

consumer:消費者

 

 

Pulsar鏈接:

Pulsar 2.0版本之前:{persistent|non-persistent}://property/cluster/namespace/topic
Pulsar 2.0版本之后:{persistent|non-persistent}://tenant/namespace/topic

 

 

與kafka中概念對應關系:

 

三、核心組件

broker:計算層,整個消息層的生產和消費,無存儲狀態。

Apache BookKeeper:存儲層,數據持久化保存的節點,有存儲狀態。

RocksDB:內嵌在 BookKeeper 中的數據庫,存儲每個條目的位置索引。它只是將(LedgerId,EntryId) 映射到 (EntryLogId,文件中的偏移量)。

  <(LedgerID, EntryID), EntryLogID> 

  LedgerID 即 segmentID,EntryID 即是 Log Message 的邏輯 ID,EntryLogId 就是 Log消息在 Pulsar Fragment文件的物理 Offset。

Apache ZooKeeper:元數據管理,存儲 Pulsar 和 BookKeeper 元數據 和 節點狀態,以及服務發現(發現 broker ,發現bookie)。在 Pulsar 里的作用是

存儲 Pulsar 系統里元數據的存儲和集群的管理以及節點的發現等,節點發現是指發現集群里有多少個 broker,有多少 bookie。

 

一個Pulsar實例由一個或多個Pulsar集群組成。實例中的集群可以在它們之間復制數據。集群間可以通過 跨地域復制(Geo-Replication) 進行消息同步。

單個 Pulsar 集群由以下三部分組成:

  • 一個或者多個 broker 負責處理和負載均衡(loadmananger) producer 發出的消息,並將這些消息分派給 consumer;Broker 與 Pulsar 配置存儲交互來處理相應的任務,並將消息存儲在 BookKeeper 實例中(又稱 bookies);Broker 依賴 ZooKeeper 集群處理特定的任務,等等。
  • 包含一個或多個 bookie 的 BookKeeper 集群負責消息的持久化存儲。
  • 一個Zookeeper集群,用來處理多個Pulsar集群之間的協調任務。

 

 

brokers 和 bookies交互示意圖

 

 

 

 

集群示意圖

 

1、Broker:負責負載均衡和消息的讀取、寫入等,負責與業務系統進行通信,承擔協議轉換,序列化和反序列化、選主等功能。

Broker 主要由四個模塊組成。

  • Dispatcher:調度分發模塊,承擔協議轉換、序列化反序列化等。
  • Load balancer:負載均衡模塊,對訪問流量進行控制管理。
  • Global replicators:跨集群復制模塊,承擔異步的跨集群消息同步功能。
  • Service discovery:服務發現模塊,為每個 topic 選擇無狀態的主節點。

2、Bookie:存儲設備

3、ZooKeeper:存儲元數據,集群配置,協調

  • local zk負責Pulsar Cluster內部的配置等
  • global zk則用於Pulsar Cluster之間的數據復制等。

    Configuration Store(老版本中稱為 Global ZooKeeper),它存儲的是集群復制信息,讓集群之間互相了解各自的地址。同時還包括一些 clients 或 namespace 的相關配置信息。這樣做的目的可以簡化操作,只需一步即可將各個集群信息一同更新。

三、架構

計算與存儲分離架構

 

 

 

 

 

 

 

分層分片存儲架構

分層:

  • 計算層、存儲層。
  • 層級化存儲。

分片:segment。

  • Pulsar 將 topic 分區划分為分片,然后將這些分片存儲在 Apache BookKeeper 的存儲節點上,以提高性能、可伸縮性和可用性。
  • 分片架構將消息流數據的存儲粒度從分區拉低到了分片,以及相應的層級化存儲。

 

  • Pulsar 的無限分布式日志以分片為中心,借助擴展日志存儲(通過 Apache BookKeeper)實現,內置分層存儲支持,因此分片可以均勻地分布在存儲節點上。

由於與任一給定 topic 相關的數據都不會與特定存儲節點進行捆綁,因此很容易替換存儲節點或縮擴容。另外,集群中最小或最慢的節點也不會成為存儲或帶寬的短板。

 

  • Pulsar 的架構無分區,也沒有重平衡,保證了及時可伸縮性和高可用性。

這兩個重要特性使 Pulsar 尤其適用於構建與關鍵任務相關的服務,如金融用例的計費平台,電子商務和零售商的交易處理系統,金融機構的實時風險控制系統等。

 

  • 通過利用性能強大的 Netty 架構,數據從 producers 到 broker,再到 bookie 的轉移都是零拷貝,都不會生成副本。

這一特性對所有流用例都非常友好,因為數據直接通過網絡或磁盤進行傳輸,沒有任何性能損失。

 

 

 

 

 

隔離架構

保證了 Pulsar 的優良性能,主要體現在以下幾個方面:

  • IO 隔離:寫入、追尾讀和追趕讀隔離。
  • 利用網絡流入帶寬和磁盤順序寫入的特性實現高吞吐寫:傳統磁盤在順序寫入時,帶寬很高,零散讀寫導致磁盤帶寬降低,采取順序寫入方式可以提升性能。
  • 利用網絡流出帶寬和多個磁盤共同提供的 IOPS 處理能力實現高吞吐讀:收到數據后,寫到性能較好的 SSD 盤里,進行一級緩存,然后再使用異步線程,將數據寫入到傳統的 HDD 硬盤中,降低存儲成本。
  • 利用各級緩存機制實現低延遲投遞:生產者發送消息時,將消息寫入 broker 緩存中;實時消費時(追尾讀),首先從 broker 緩存中讀取數據,避免從持久層 bookie 中讀取,從而降低投遞延遲。讀取歷史消息(追趕讀)場景中,bookie 會將磁盤消息讀入 bookie 讀緩存中,從而避免每次都讀取磁盤數據,降低讀取延時。

 

 

消息模型

 

 

 

 

消息分發模型

  • 設置了 msg key,消息會基於 key 做 hash,將消息分發到不同的 partitions 中。
  • 未設置 msg key,消息會以 round robin 的形式,分發到不同的 partitions 中。

 

訂閱模型

Exclusive:獨占型,一個訂閱只能有一個消息者消費消息。

Failover:災備型,一個訂閱同時只有一個消費者,可以有多個備份消費者。一旦主消費者故障則備份消費者接管。不會出現同時有兩個活躍的消費者。

Shared:共享型,一個訂閱中同時可以有多個消費者,多個消費者共享Topic中的消息。

Key_Shared:鍵共享型。

 

Ack機制

單條 Ack(AckIndividual):是指 Consumer 可以根據消息的 messageID 來針對某一個特定的消息進行 Ack 操作。

批量 Ack(AckCumulative):是指一次 Ack 多條消息。

 

存儲模型

topic(partition):主題(分區),Pulsar 中的分區也是Topic。Pulsar中的概念。

ledger:賬簿。Ledger是一個只追加的數據結構,並且只有一個寫入器,這個寫入器負責多個BookKeeper存儲節點(就是Bookies)的寫入。 Ledger的條目會被復制到多個bookies。

            可以將 ledger 理解為 一個具體的 segment, (entry log 和 index 文件)

entry(record):記錄,存儲一條消息,或者一批消息

msg:消息

batch msg:批量消息

 

========================================

backlog:積壓,用來表述 Topic 中沒有被消費的數據。

  • Topic Backlog: 最慢的那個訂閱的 Backlog 的集合
  • Subscription Backlog: 指針對單個訂閱級別的沒有消費的數據的集合
  • backlogSize:記錄的是所有沒有被 Ack 的消息的大小
  • msgBacklog: 記錄的是所有未被 Ack 的 entries 的集合

cursor:

是用來存儲一個訂閱下 Consumer 的消費狀態(位置)。等價於 offset(kafka)+ individualDeletes。

每個Subscription都存儲一個Cursor。Cursor是日志中的當前偏移量。Subscription將其Cursor存儲至BookKeeper的Ledger中。這使Cursor跟蹤可以像Topic一樣進行擴展。

LAC(Last Add Confirmed ID) :記下最后一條寫成功的消息的 ID。

LAP(Last-Add-Pushed):已經發送給 Bookie 但是尚未收到 Ack 的日志條目,其原理類似於 TCP 發送端的滑動窗口機制。

strorage size:表述當前消息占用的存儲空間的大小。

 

 

分層存儲(Tiered Storage)

  在一些流數據用例場景中,用戶希望將數據長時間存儲在流中。

雖然 Apache Pulsar 對 topic backlog 的大小沒有限制,但將所有數據存儲在 Pulsar 中較長時間,存儲成本比較大。

Apache Pulsar 的分層存儲特性(在 2.1 及之后的版本可用),分層存儲支持在不影響終端用戶的條件下,將較舊的數據移動到長期存儲中。

  Pulsar 允許用戶存儲任意大小的 topic backlog。當集群將要耗盡空間時,用戶只需添加新的存儲節點,系統將會自動重新平衡數據。

但是,這樣的操作運行一段時間后,運維成本十分昂貴。

  Pulsar 通過提供分層存儲(Apache Pulsar 2.1 起新增的特性)減少了成本/大小的損失。分層存儲為用戶提供大小不受限制的 backlog,且無需添加存儲節點;

卸載較舊的 topic 數據到長期存儲中,長期存儲的成本比在 Pulsar 集群中存儲的成本低一個數量級。

對於終端用戶來說,消費存儲在 Pulsar 集群或分層存儲中的 topic 數據沒有明顯差別。位於 Pulsar 集群和分層存儲中的 topic 生產和消費消息的方式也完全相同。

  Pulsar 通過分片架構實現了分層存儲。Pulsar topic 的消息日志由一系列分片組成。序列中的最后一個分片是 Pulsar 當前寫入的分片。

當前序列之前的所有分片都已封裝,也就是說,這些分片中的數據不可變。由於數據不可變,因此可以輕易地將數據復制到另一個存儲系統,而不必擔心一致性的問題。

復制完成后,可以立即更新消息日志元數據中的數據指針,並且可以刪除 Pulsar 在 Apache BookKeeper 中存儲的數據副本。

 

Pulsar 支持通過 Amazon S3、GCS(Google Cloud Storage)、Filesystem 進行長期存儲

 

 

 

主題分配 Topic  Assignment 

namespace -> topic -> namespace hash ring -> bundle -> broker

topic、bundle、broker 的映射關系存儲在zookeeper中。

由 broker 中的 loadmanager 進行分配 並 負載均衡

注意:這里談論的 Topic 是無分區的 Topic 或者 Partition Topic 中的一個 Partition。 

 

主題查找 Topic Discovery

producer | consumer -> topic | topic partition -> broker ->  topic | topic partition owner(broker)。

注:因為任意一個broker 都可以通過 zookeeper 拿到 topic 和 broker 的對應關系。

所以,broker 前端可以配置 proxy (loadbalance、dns、iplist)。

 

消息讀寫流程

首先進行 Topic Discovery,定位broker;

然后,發送讀取:producer | consumer -> topic partition owner(broker)。

 

消息生命周期

Retention 機制

BookKeeper 消息保留機制。

默認情況下,持久化的機制是關閉的。即消息被 Ack 之后,就會進入刪除的邏輯。

配置 Retention 策略時,有如下兩個參數可以指定:

  • size:指持久化大小的閾值。0 代表不配置 Retention 大小策略,-1 代表設置的大小無限大
  • time:指持久化時間的閾值。0 代表不配置 Retention 時間策略,-1 代表時間無限大

 

TTL機制

沒有消費者消費訂閱的消息的處理。

 

跨機房復制 (geo replication)(full mesh replication)

  • 雙向復制
  • broker 區域去重機制
  • 異步復制

 

 

Bookie異常恢復

AutoRecovery 進程,自動處理,將掛掉的Bookie中的數據重新存儲到其他 Bookie 中。

AutoRecovery 任務是由若干個 worker 線程構成的線程池執行的,每個 worker 線程從由自己負責的 zookeeper path 上找到要恢復數據的 Ledger 進行數據復制。

如果集群發生擴容,則由 Auditor 線程負責 Segment 數據的遷移復制。

 

 

Pulsar 負載均衡的幾個問題


1、broker 是否是對等的?
【答】是對等的。

2、每一個 broker 實例中包含 loadbalance 模塊?
【答】是
那做負載均衡的時候由哪個 broker 的 loadbalance 來做呢?
【答】每一個broker都可能來發起這個操作。
在運行過程中,當某一個broker發現負載達到閾值時,此時就由該broker發起 loadbalance 。

3、負載均衡后,重新給topic 分配 了 ownership (broker),保存在 zk 中?
下次如何 根據hash環來定位到 這個新的 ownership 呢?

topic discovery 由 service discovery模塊來做嗎?
【答】由 broker 內部的特定邏輯來處理。


4、負載均衡
broker 負載均衡 :由 loadbalance 來做。
bookie 負載均衡 :由 bookie 自身的負載均衡機制來做。【條帶化寫】

 

5、broker如何選定bookie?

【答】隨機,因為 bookie 是條帶化寫入,大部分時間是出於比較均衡的狀態,所以可以隨機給定。

 

 

 

疑問

1、資源池化

雲計算和Web數據中心業務要求計算中心的CPU,內存,存儲和網絡資源能作為一個全局的資源池進行動態,靈活的調配和綁定。

服務器設備資源池化是對硬件的重構,而服務器的硬件重構關乎大規模數據中心的效率和成本問題,

目標是降低服務器硬件購置成本、提高服務器硬件資源的利用率、降低服務器運維成本,降低綜合TCO。

服務器資源池化的幾個資源為:CPU池、內存池、存儲池、I/O池

 

2、消息消費

Pulsar 的消費模型采用了流拉取的方式。流拉取是長輪詢的改進版,不僅實現了單個調用和請求之間的零等待,還可以提供雙向消息流。

通過流拉取模型,Pulsar 實現了比所有現有長輪詢消息系統(如 Kafka)都低的端到端延遲。

 

3、自動負載均衡功能

Pulsar 的自動負載均衡功能可以自動並立即使用集群中新加的計算和存儲能力。

這使得 broker 之間可以遷移 topic 來平衡負載,新 bookie 節點可以立即接受新數據分片的寫入流量,而無需手動重新平衡或管理 broker。

 

4、消息路由

通過 Pulsar IO、Pulsar Functions、Pulsar Protocol Handler,Pulsar 具有全面路由的功能。Pulsar 的路由功能包括基於內容的路由、消息轉換,和消息擴充。

 

5、BookKeeper API

【低階】Ledger API: 底層API,可以直接操作ledger,比較靈活。

【高階】Stream API: higher-level面向流的API,通過Apache DistributedLog提供,直接操作流,不用關心與ledger交互的復雜性。

 

7、消息刪除

在 BookKeeper 中,允許操作的最小的單元是一個 segment,所以在具體的 msg(entry)級別,是沒辦法針對一條消息進行刪除的,刪除操作需要針對一個 segment 來進行操作。

 

8、消息順序

邏輯全局有序(客戶端保持順序),物理存儲無序。

 

9、broker是對等的嗎

broker是對等的,負載均衡由 loadmanager 執行,進行topic卸載操作,以便平衡broker流量負載。 

 

相關

AMQP:高級消息隊列協議

AMQP,即Advanced Message Queuing Protocol,一個提供統一消息服務的應用層標准高級消息隊列協議,是應用層協議的一個開放標准,為面向消息的中間件設計。

基於此協議的客戶端與消息中間件可傳遞消息,並不受客戶端/中間件不同產品,不同的開發語言等條件的限制。Erlang中的實現有RabbitMQ等。

 

MQTT:輕量級物聯網消息推送協議

MQTT是機器對機器(M2M)/物聯網(IoT)連接協議。它被設計為一個極其輕量級的發布/訂閱消息傳輸協議。

對於需要較小代碼占用空間和/或網絡帶寬非常寶貴的遠程連接非常有用,是專為受限設備和低帶寬、高延遲或不可靠的網絡而設計。

這些原則也使該協議成為新興的“機器到機器”(M2M)或物聯網(IoT)世界的連接設備,以及帶寬和電池功率非常高的移動應用的理想選擇。

例如,它已被用於通過衛星鏈路與代理通信的傳感器、與醫療服務提供者的撥號連接,以及一系列家庭自動化和小型設備場景。

它也是移動應用的理想選擇,因為它體積小,功耗低,數據包最小,並且可以有效地將信息分配給一個或多個接收器。

 

MQTT on Pulsar (MoP)

MQTT-on-Pulsar (aka MoP) is developed to support MQTT protocol natively on Apache Pulsar.

 

KoP(Kakfa on Pulsar)

 

AoP(AMQP on Pulsar)

 

ANSI標准

ANSI:美國國家標准學會(AMERICAN NATIONAL STANDARDS INSTITUTE: ANSI)成立於1918年。

 

什么是條帶化(striping)

  當多個進程同時訪問一個磁盤時,可能會出現磁盤沖突。大多數磁盤系統都對訪問次數(每秒的I/O操作,IOPS)和數據傳輸率(每秒傳輸的數據量,TPS)有限制。

當達到這些限制時,后面需要訪問磁盤的進程就需要等待,這時就是所謂的磁盤沖突。

  避免磁盤沖突是優化I/O 性能的一個重要目標,而 I/O 性能的優化與其他資源(如CPU和內存)的優化有着很大的區別 ,I/O 優化最有效的手段是將I/O 最大限度的進行平衡。

  條帶化技術就是一種自動的將I/O 的負載均衡到多個物理磁盤上的技術,條帶化技術就是將一塊連續的數據分成很多小部分並把他們分別存儲到不同磁盤上去。

這就能使多個進程同時訪問數據的多個不同部分而不會造成磁盤沖突,而且在需要對這種數據進行順序訪問的時候可以獲得最大程度上的I/O 並行能力,從而獲得非常好的性能。

很多操作系統、磁盤設備供應商、各種第三方軟件都能做到條帶化。

  圖1 描述的是一個未經條帶化處理的連續數據的分布,圖 2 描述的是一個已經被條帶化處理的連續數據的分布,

從中比較,我們可以發現圖 2中對連續數據的讀寫都有最大的並發能力。

 

圖 1. 未經條帶化處理的連續數據

 

 

圖 2. 已經被條帶化處理的連續數據

由於條帶化在I/O 性能問題上的優越表現,以致於在應用系統所在的計算環境中的多個層次或平台都涉及到了條帶化的技術,如操作系統和存儲系統這兩個層次中都可能使用條帶化技術。

 

其他:

Kafka 采用單片架構模型,將服務與存儲相結合。即 一個Kafka broker節點。

 

參考資料

官網

github

tgip-cn

StreamNative video

年貨禮包 | 資料大全: Apache Pulsar 從入門到實踐

假期充電包 | Apache Pulsar 從入門到實踐

Pulsar負載均衡

 

思否開源項目推介丨Apache Pulsar:下一代雲原生分布式消息流平台

Pulsar 與 Kafka 全方位對比(上篇):功能、性能、用例

博文推薦|多圖詳解 Apache Pulsar 消息存儲模型

博文推薦 | 一文帶你看懂 Pulsar 的消息保留和過期策略

譯文|簡明指南:Apache Pulsar 的分層存儲

理解Apache Pulsar工作原理

Apache Pulsar指北

Pulsar筆記(好)

Pulsar學習筆記之 編譯Jar包、構建鏡像、部署集群

 

Apache Pulsar 源碼走讀(一)啟動

Apache Pulsar 源碼走讀(二)二進制協議

Apache Pulsar 源碼走讀(三)TopicLookup 請求處理(一)

Apache Pulsar 源碼走讀(四)TopicLookup請求處理(二)

 

理解磁盤條帶化

 

分布式系統之Quorum機制


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM