Pulsar雲原生分布式消息和流平台v2.8.0


Pulsar雲原生分布式消息和流平台

**本人博客網站 **IT小神 www.itxiaoshen.com

Pulsar官方網站

Apache Pulsar是一個雲原生的分布式消息和流媒體平台,最初創建於雅虎!現在是Apache軟件基金會的頂級項目

官網首頁列舉一些關鍵特性和目前使用公司包括國內深度合作騰訊,目前最新版本為2.8.0,背后的開源流數據公司 StreamNative,2019年創立一家公司,作為雲原生時代專注技術細分領域的佼佼者

什么是Pulsar

Pulsar即可以支持queue模式的消息中間件比如RabbitMQ和RocketMQ,也可以支持stream流模式的Kafka,幾乎涵蓋消息應用的領域,加上豐富企業特性如多租戶隔離、百萬級Topics、跨地域復制、鑒權認證,是雲原生時代其他消息中間件的演化或者說是替代品也不為過

image-20210822122753839

部署模式

支持多種部署模式,比如本地開發測試環境下單機運行環境,生產使用集群部署或多集群部署,還有基於容器化的Docker和K8s部署等

image-20210822114211181

概覽

Pulsar 是一個用於服務器到服務器的消息系統,具有多租戶、高性能等優勢。 Pulsar 最初由 Yahoo 開發,目前由 Apache 軟件基金會管理。

Pulsar 的關鍵特性如下:

Pulsar需要解決問題

  • 企業需求和數據規模
    • 多租戶-百萬Topics-低延遲-持久化-跨地域復制
    • 解除存儲計算耦合
      • 運維痛點:替換機器、服務擴容、數據rebalance
    • 減少文件系統依賴
      • 性能難保障:持久化、一致性、多Topic
      • IO不隔離:消費者度Backlog的時候會影響其他生產者和消費者,Kakfa采用順序寫的機制提升性能,當Topic和分區數大量增大后便會退化為隨機寫而極大減低起IO性能

架構

Pulsar底層最為關鍵技術是采用存儲和計算分離以及分層+分片的架構,節點是對等的可以獨立擴展並支持靈活擴容和快速容錯機制,這也是為什么說Pulsar是雲原生架構的主要原因;

image-20210822123703298

pulsar-system-architecture

Pulsar企業級存儲層采用的是Apache BookKeeper持久化存儲。 BookKeeper是一個分布式的預寫日志(WAL)系統,滿足低延遲、高吞吐、持久化、強一致性、高可用、I/O隔離,元數據服務基於rocksdb legder存儲,而基於New Sql新一代分布式關系數據庫TiDb的k-v存儲節點底層也是采用 性能非常強大單機存儲引擎rocksdb,關於 BookKeeper我們本篇不做延展介紹,后續有時間再單獨闡述。

Broker作為計算層依靠Zookeeper作為生產者和消費者的橋梁,天然屬於無狀態的服務,擴容通過服務發現自動動態感知;另一方面底層是分布式存儲,因此擴容直接添加存儲節點即可,原來在Kafka擴容節點后,如果沒有屬於該節點的分區數據則擴容節點是無法起作用的,需要做分區管理或rebalance,而在Pulsar中新增加節點則會實時增加數據進來,這個得益於Pulsar的架構設計,采用分層+分片的邏輯存儲概念,每一塊存儲是可以存儲不同Topic不同分區的數據,然后依賴於索引系統原理實現檢索;存儲節點出現故障后由於是對等架構,分布式存儲有多副本機制,所以可繼續提供正常服務且也不需要立即進行故障轉移,可以在合適時機再做副本遷移,所以對於應用來說是無感知的

Pulsar穩定的IO質量底層機制

image-20210822132544822

Ledger是一個只追加的數據結構,並且只有一個寫入器,這個寫入器負責多個BookKeeper存儲節點(就是Bookies)的寫入。 Ledger的條目會被復制到多個bookies。 Ledgers本身有着非常簡單的語義:

  • Pulsar Broker可以創建ledeger,添加內容到ledger和關閉ledger。
  • 當一個ledger被關閉后,除非明確的要寫數據或者是因為寫入器掛掉導致ledger關閉,這個ledger只會以只讀模式打開。
  • 最后,當ledger中的條目不再有用的時候,整個legder可以被刪除(ledger分布是跨Bookies的)。

Ledger讀一致性

BookKeeper的主要優勢在於他能在有系統故障時保證讀的一致性。 由於Ledger只能被一個進程寫入(之前提的寫入器進程),這樣這個進程在寫入時不會有沖突,從而寫入會非常高效。 在一次故障之后,ledger會啟動一個恢復進程來確定ledger的最終狀態並確認最后提交到日志的是哪一個條目。 在這之后,能保證所有的ledger讀進程讀取到相同的內容。

Managed ledgers

Given that Bookkeeper ledgers provide a single log abstraction, a library was developed on top of the ledger called the managed ledger that represents the storage layer for a single topic. managed ledger即消息流的抽象,有一個寫入器進程不斷在流結尾添加消息,並且有多個cursors 消費這個流,每個cursor有自己的消費位置。

Internally, a single managed ledger uses multiple BookKeeper ledgers to store the data. There are two reasons to have multiple ledgers:

  1. 在故障之后,原有的某個ledger不能再寫了,需要創建一個新的。
  2. A ledger can be deleted when all cursors have consumed the messages it contains. This allows for periodic rollover of ledgers.

日志存儲

In BookKeeper, journal files contain BookKeeper transaction logs. 在更新到 ledger之前,bookie需要確保描述這個更新的事務被寫到持久(非易失)存儲上面。 在bookie啟動和舊的日志文件大小達到上限(由 journalMaxSizeMB 參數配置)的時候,新的日志文件會被創建。

Palsar Schema

啟用 schema 后,Pulsar 會解析數據,即接收字節作為輸入並發送字節作為輸出。 雖然數據不僅是字節,但的確需要解析這些數據,解析時還可能發生解析異常,解析異常主要出現在以下幾種情況中:

  • 字段不存在
  • 字段類型已更改(例如,將 string 更改為 int

簡單來說,當我們使用 schema 去創建 producer 生產者則不再需要將消息序列化為字節,因為Pulsar schema 會在后台幫我們執行序列化操作。

Producer<User> producer = client.newProducer(JSONSchema.of(User.class))
        .topic(topic)
        .create();
User user = new User("Tom", 28);
producer.send(user);

Parsar Functions

Pulsar Functions 是輕量級計算流程,具有以下特點:

  • 從一個或多個 Pulsar topic 中消費消息;
  • 將用戶提供的處理邏輯應用於每條消息;
  • 將運行結果發布到另一個 topic。
  • Pulsar Functions可以看做是一種編程模型,背后的核心目標是使您能夠輕松創建各種級別的復雜的的處理邏輯,而無需部署單獨的類似系統(例如 Apache Storm, Apache Heron, Apache Flink, 等等) Pulsar Functions are computing infrastructure of Pulsar messaging system. The core goal is tied to a series of other goals:
    • 提高開發者的生產力(用開發者熟悉的語言和Pulsar Function 的函數SDK)
    • 簡單的故障排查
    • 操作簡單(不需要外部處理系統)

Function 抽象,計算對象是消息,Function 將收到消息進行計算執行業務邏輯並寫進 Output topic,Function 為開發者提供了很多便利,簡單的計算都可以通過 Function 完成。可以將 Function 結合起來,由此提出一個新概念 Function Mesh,其主要基於 K8s 開發。

package org.example.functions;

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

import java.util.Arrays;

public class WordCountFunction implements Function<String, Void> {
    // This function is invoked every time a message is published to the input topic
    @Override
    public Void process(String input, Context context) throws Exception {
        Arrays.asList(input.split(" ")).forEach(word -> {
            String counterKey = word.toLowerCase();
            context.incrCounter(counterKey, 1);
        });
        return null;
    }
}

將上面的代碼編譯成可部署的 JAR 文件,可以使用如下命令行將 JAR 包部署到 Pulsar 集群中。

$ bin/pulsar-admin functions create \
  --jar target/my-jar-with-dependencies.jar \
  --classname org.example.functions.WordCountFunction \
  --tenant public \
  --namespace default \
  --name word-count \
  --inputs persistent://public/default/sentences \
  --output persistent://public/default/count

Pulsar IO

Pulsar IO連接器使您能夠輕松創建、部署和管理與外部系統交互的連接器,如Apache Cassandra、Aerospike等。

可以通過 Connector Admin CLI並結合 sourcessinks 子命令來管理 Pulsar 連接器(例如,創建、更新、啟動、停止、重啟、重載、刪除以及其他操作)。

連接器(sources 和 sinks)和 Functions 是實例的組成部分,都在 Functions workers 上運行。 通過 Connector Admin CLIFunctions Admin CLI 管理 source、sink 或者 function 時,在 worker 上就啟動了一個實例。 了解更多信息,參閱 Functions worker

image-20210822135353477

image-20210822164207858

Pulsar SQL

Apache Pulsar 用於存儲事件數據流,事件數據結構由預定義字段組成。 借助 Schema Registry 的實現,你可以在 Pulsar 中存儲結構化數據,並通過使用Trino(原先叫 Presto SQL)查詢這些數據。

作為 Pulsar SQL 的核心,Presto Pulsar 連接器支持 Presto 集群中的 Presto worker 查詢 Pulsar 數據。

Pulsar的消費者和讀取器接口

查詢性能高效且高度可擴展,這得益於 Pulsar 的 分層分片架構

Pulsar 中的主題以分片形式存儲在 Apache BookKeeper 中。 每個主題分片會被復制到多個 BookKeeper 節點,可以支撐並發讀和高吞吐。 你可以配置 BookKeeper 節點的數量,默認節點數是 3。 在 Presto Pulsar 連接器中,數據直接從 BookKeeper 讀取,所以 Presto worker 能從水平擴展的 BookKeeper 節點中並發讀取數據。

Pulsar的消費者和讀取器接口

Tiered Storage

Pulsar 的分層存儲 功能允許將歷史 backlog 數據從 BookKeeper 中轉移到更加低廉的存儲介質中並且允許客戶端訪問無變化的 backlog 數據。

Transactions

Pulsar事務 (txn) 使事件流應用程序能夠在一個原子操作中消費、處理和生成消息。

img

Pulsar事務支持端到端的恰好一次流處理,這意味着消息不會從源算子(source operator)丟失,並且消息不會重復發給接收算子(sink operator)。

隨着Pulsar 2.8.0中引入的事務,Pulsar Flink接收器連接器可以通過實現指定的 TwoPhaseCommitSinkFunction 並使用Pulsar事務 API 連接 Flink 接收器消息生命周期來支持exactly-once語義

Pulsar 周邊和生態

Pulsar 作為一個流原生消息平台,主要包括存儲(Stream Storage)、消息(Messaging)、計算(Processing)三個方面的工作。

Messaging 是 Pulsar 誕生之初的一個主要方向。通過 Pulsar IO 和外部系統打。

下圖藍色 Processing 方面 Queries 的引擎比如 Presto 和 HIVE 進行深度整合,讓 Presto 和 HIVE 能夠直接讀取 Pulsar 的 topic ,再結合 Pulsar 本身自帶的 Schema,將 Pulsar topic 作為的一個表直接查找 Pulsar topic 中的數據。

在 Streaming & Batch Processing 方面,與大數據處理引擎包括 Storm、Flink、Spark 進行深度整合。

在 Processing 方向的思路是與現有的大數據生態做深度的融合,讓大數據生態能夠更好地訪問 Pulsar,把 Pulsar 當作數據的存儲引擎。

除此之外,Pulsar 推出了 Pulsar Function — 一個輕量級的計算框架。Pulsar Function 可以減輕很多數據的傳輸,可以靠近數據端完成計算,目前很多 IoT 場景的用戶如塗鴉智能、EMQ、中國電信及一些車聯網公司都在使用 Pulsar Function。

除了 Messaging 和 Processing ,Pulsar 擁有一個很堅實的基礎,就是擁有專門為消息、流存儲而設計的存儲引擎 Apache BookKeeper。結合 Pulsar 對分區再分片的存儲特性,我們很自然地把老的分片遷移到二級存儲中,所以 Pulsar 的架構很容支持二級存儲。二級存儲的介質包括雲上的各種資源:S3、HDFS

image-20210822133954918

消息領域

很多用戶在使用 Pulsar 的過程中,會發現客戶端應用的改造和遷移會很難落地。比如 Kafka 往 Pulsar 遷移過程中,客戶很可能也會有大量基於 Kafka Clients 的應用需要更改,由於需要更改協議導致遷移很困難。

由於短時間不可能完全從 Kafka 遷移到 Pulsar,導致對后台的運維甚至整個業務的切換帶來很大的不便捷性。

Pulsar 和 Kafka 一樣是以 topic 作為基礎,以 log 作為抽象,Pulsar 的一致性、延遲、吞吐會更優,在這個基礎上要復用 Pulsar 的存儲層,在 Broker 端實現協議的解析,用戶的切換成本更低。Pulsar Broker 端提供 Protocol Handler 插件(現在已經實現 Kafka、AMQP、MQTT 協議的支持)的方式來支持多種協議。這種在 Broker 端做協議解析的方法,可以更方便地支持多種協議。其次還利用 Pulsar 在存儲層擁有存儲、計算分離的優勢,服務上層多種協議。

KoP(Kafka on Pulsar)

目前 StreamNative 聯合合作伙伴已推出了 KoP 項目,主要滿足想要從 Kafka 應用程序切換到 Pulsar 的用戶的強烈需求。

KoP 將 Kafka 協議處理插件引入 Pulsar broker,從而實現 Apache Pulsar 對原生 Apache Kafka 協議的支持。將 KoP 協議處理插件添加到現有 Pulsar 集群后,用戶不用修改代碼就可以將現有的 Kafka 應用程序和服務遷移到 Pulsar,從而使用 Pulsar 的強大功能。

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-9i0cziCO-1629624254691)(http://www.itxiaoshen.com:3001/assets/1629618989303RWFe8zct.png)]

KoP 相關特性:

  • Broker 的插件,Client 不需要做任何的改動;
  • 共享訪問;
  • 支持 Kafka 0.10-2.x 版本;
  • 連續 Offset:增加對連續 ID 的支持。
  • 性能改進:實現與 Kafka broker 類似的機制,無需 KoP 針對 Kafka 發送的 batch 消息進行拆包解包,將 Kafka 發送過來的消息直接以 Kafka 格式進行存儲,並在 Pulsar Client 增加對 kafka 協議的解析器。
  • 支持 Envoy,並實現 Pulsar Schema 與 Kafka Schema 的兼容。

AoP(AMQP on Pulsar)

AoP(AMQP on Pulsar)是 StreamNative 聯合中國移動共同開發推進的項目,類似 KoP,主要解決 AMQP 應用程序遷移到 Pulsar 的需求。當前 AoP 實現了對 AMQP 協議 0.9.1 版本的支持,2021 年計划對 AMQP 1.0 協議進行整合支持。目前除了中國移動正在大規模應用 AoP 外,國外也有越來越多的用戶正在使用 AoP,希望更多小伙伴加入到 AoP 使用中來,共同豐富 AoP 場景,協作增強 AoP 功能。

MoP(MQTT on Pulsar)

MQTT 協議在物聯網應用十分廣泛,類似 KoP、AoP,當前 Pulsar 也通過 MoP 項目提供了對 MQTT 協議的支持。當前 MoP 支持 QoS level 0、QoS level 1 協議,2021 年計划實現對 QoS level 2 協議的支持。

Apache Pulsar在日志系統中的應用

常見日志架構ELK

image-20210822161521359

消息隊列在日志場景中主要作用:削峰解耦、數據分發

日志系統常見挑戰:

image-20210822161907197

更多功能性要求:

image-20210822161943662

Kafka和Pulsar對比

image-20210822132134188

  • Kafka僅支持user/client-id級別、broker設置;而Pulsar則支持namespace/topic級別,粒度較小

  • Kafka增加新節點需要reassign partition才能使用;而Pulsar存儲和計算分離,可以按需增加計算或存儲節點,增加即生效,不需要reassign

  • Kafka消費能力受限Topic設定的partition數量;而Pulsar消費能力不受限Topic設定的partition數量,可以通過增加消費者數量增大消費能力

  • Kafka隨着partition增多,請求下降嚴重,追加寫模式退化為隨機些;而Pulsar topic/partition僅是邏輯概念,保證追加寫模式

Pulsar引入架構V1

image-20210822163841889

Pulsar引入架構V2

image-20210822163915632

Pulsar引入架構V3

image-20210822164606097

將ETL邏輯從LogStash遷移到Apache Pulsar Functions/IO中,從而起到降本提效,將數據offload到二級緩存中,滿足等保要求

Pulsar引入架構V4

image-20210822165122131

image-20210822164023510

可以通過Pulsar SQL實現千萬級別的精確數據查詢,注意是不支持模擬查詢,模擬還是需要在ES中進行

應用

  • Apache Pulsar在騰訊大數據場景落地實踐如TDBank-大數據實時接入平台騰訊慧聚
  • Apache Pulsar在華為物聯網(AMQP)之旅
  • Apache Pulsar在電信計費系統的應用
  • Apache Pulsar 在拉卡拉的技術實踐
  • KoP(Kafka on Pulsar)在 BIGO 的性能優化實踐


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM