雲原生消息系統 Pulsar


1.pulsar概述

  Apache Pulsar 是靈活的發布-訂閱消息系統(Flexible Pub/Sub messaging),采用分層分片架構(backed by durable log/stream storage)。

  Apache Pulsar 是一個開源的分布式pub-sub消息系統,最初是在雅虎創建的,現在是Apache Software Foundation的一部分,是下一代雲原生分布式流數據平台。

  Apache Pulsar 是服務和存儲分離的消息系統,主要分為BrokerBookKeeper兩大模塊,Broker提供服務能力,BookKeeper提供存儲能力。

  

 

 

2.pulsar之於kafka

  

1. 流式處理和隊列的合體

  Pulsar 就像是一個合二為一的產品,不僅可以像 Kafka 那樣處理高速率的實時場景,還能支持標准的消息隊列模式,比如多消費者、失效備援訂閱和消息扇出,等等。Pulsar 會自動跟蹤客戶端的讀取位置,並把這些信息保存在高性能的分布式 ledger(BookKeeper)當中。

  與 Kafka 不一樣的是,Pulsar 具備傳統消息隊列(如 RabbitMQ)那樣的功能,因此,只需要運行一個 Pulsar 系統就可以同時處理實時流和消息隊列。

2. 支持分區,但不是必需的

  如果你用過 Kafka,就一定知道分區是怎么回事。Kafka 中的所有主題都是分區的,這樣可以增加吞吐量。通過分區,單個主題的處理速率可以得到大幅提升。但如果某些主題不需要太高的處理速率,那該怎么辦?對於這類情況,就不需要考慮分區了,以避免復雜的 API 和管理方面的工作,這樣不是更好嗎?

  Pulsar 就可以做到。如果你只需要一個主題,而不需要分區,那使用一個主題就好了。如果你需要使用多個消費者實例來提升處理速率,其實也不需要使用分區,因為 Pulsar 的共享訂閱可以達到你的目的。

  如果你確實需要分區來進一步提升性能,你也可以使用分區。

3. 日志固然不錯,但 ledger 更勝一籌

  Kafka 開發團隊預見了日志對於一個實時數據交換系統的重要性。因為日志是通過追加的方式寫入系統的,所以數據寫入速度很快。又因為日志中的數據是串行的,所以可以按照寫入的順序快速讀取數據。相比隨機讀取和寫入,串行讀取和寫入速度更快。對於任何一個提供數據保證的系統來說,持久化存儲方面的交互都是一個瓶頸,而日志抽象最大限度地提升了這方面的效率。

  日志固然是好,但當它們的量增長到很大的時候,也會給我們帶來一些麻煩。在單台服務器上保存所有日志已經成為一個挑戰。在服務器存儲被日志填滿之后該怎么辦?如何進行伸縮?或者保存日志的服務器宕機,需要重新從副本創建新的服務器,該怎么辦?將日志從一台服務器拷貝到另一台服務器是很耗費時間的,特別是如果你想要在保持系統實時數據的情況下完成這個操作就更難了。

  Pulsar 對日志進行分段,從而避免了拷貝大塊的日志。它通過 BookKeeper 將日志分段分散到多台不同的服務器上。也就是說,日志並不是保存在單台服務器上,所以任何一台服務器都不會成為整個系統的瓶頸。這樣就可以更容易地處理故障,要進行伸縮也很容易,只需要加入新的服務器,不需要進行再均衡。

4. 無狀態

  對於雲原生應用程序開發人員來說,他們最喜歡的東西就是無狀態。無狀態組件啟動速度快,可替換,還可以實現無縫的伸縮。如果消息中間件也是無狀態的,那豈不是更好?

  Kafka 不是無狀態的,因為每個 broker 都包含了分區的所有日志,如果一個 broker 宕機,並非任意一 broker 都可以接替它的工作。如果工作負載太高,也不能隨意添加新的 broker 來分擔。broker 之間必須進行狀態同步。

  在 Pulsar 架構中,broker 是無狀態的。但是完全無狀態的系統是無法用來持久化消息的,所以 Pulsar 其實是有維護在狀態的,只是不是在 broker 上。在 Pulsar 架構中,數據的分發和保存是相互獨立的。broker 從生產者接收數據,然后將數據發送給消費者,但數據是保存在 BookKeeper 中的。

  因為 Pulsar 的 broker 是無狀態的,所以如果工作負載很高,就可以直接添加新的 broker。

5. 簡單的跨域復制

  跨域復制是 Pulsar 的拿手好戲。Pulsar 在設計之初就考慮到了這個特性,配置也很容易。要搭建一個全球化的分布式 Pulsar 集群,並不需要你擁有博士學位。

6. 穩定的表現

  一些基准測試(http://openmessaging.cloud/docs/benchmarks/pulsar/)表明,Pulsar 可以在提供較高吞吐量的同時保持較低的延遲。

7. 完全開源

  Pulsar 提供了很多與 Kafka 相似的特性,比如跨域復制、流式消息處理(Pulsar Function)、連接器(Pulsar IO)、基於 SQL 的主題查詢(Pulsar SQL)、schema registry,還有一些 Kafka 沒有的特性,比如分層存儲和多租戶,所有這些特性都是開源的。

3.pulsar特點

1.Pulsar函數

  使用友好的API輕松部署輕量級計算邏輯,無需其它的流處理引擎

2. 水平擴展

  將容量無縫擴展到成百上千個節點

3.高吞吐

  已經在Yahoo的生產環境中經受了考驗,每秒數百萬消息

4.低延遲

  設計用於大規模的低延遲(<5ms)場景,具有強大的耐用性保證

5.Geo-replication地域復制

  專為跨多個地理區域的數據中心之間的可配置復制而設計

6.多租戶

  構建為多租戶系統。支持隔離,身份驗證,授權和配額

7.持久存儲

  基於Apache BookKeeper的持久消息存儲。提供寫入和讀取操作之間的IO級隔離

8.客戶端庫

  靈活的消息傳遞模型,包含用於JavaC ++PythonGO的高級API

9.可操作性

  REST Admin API,用於配置,管理,工具和監視。部署在裸機或Kubernetes上。

4.架構概述 

  

  在最層來看一個Pulsar實例由一個或多個Pulsar本地集群組成。實例中的本地集群之間可以相互復制數據。

  一個Pulsar本地的集群由下面三部分組成:

    一個或者多個brokers負責處理和負載均衡從生產者源源不斷發送出的消息,並將他們發送給消費者。它與配置存儲交互來處理相應的任務,它將消息存儲在BookKeeper實例中(aka bookies)。它依賴ZooKeeper集群處理特定的任務,等等。

    一個包含一個或者多個BookiesBookKeeper,這個BookKeeper主要負責消息的 持久化存儲。

    一個Pulsar全局的集群(實例)包含Global-Zookeeper服務。

5. 組件角色描述

1.Brokers

   Pulsarbroker是一個無狀態組件, 主要負責運行另外的兩個組件:

    一個 HTTP 服務器, 它暴露了 REST 系統管理接口以及在生產者和消費者之間進行 Topic查找的API

    一個調度分發器, 它是異步的TCP服務器,通過自定義 二進制協議應用於所有相關的數據傳輸。

2.Zookeeper

  Pulsar利用Zookeeper進行元數據存儲,集群配置和協調。在一個Pulsar實例中:

  配置與仲裁存儲: 存儲租戶,命名域和其他需要全局一致的配置項

  每個本地集群有自己獨立的ZooKeeper保存集群內部配置和協調信息,例如歸屬信息,broker負載報告,BookKeeper ledger信息(這個是BookKeeper本身所依賴的),等等。

3.Global-Zookeeper

  Pulsar利用Global -Zookeeper進行全局集群的元數據存儲,集群配置和協調。使用ZookeeperObserver角色,提高伸縮性,部署跨地區的ZooKeeper數據中心。

4.BookKeeper

   Pulsar BookKeeper作為持久化存儲。BookKeeper是一個分布式的預寫日志(WAL系統,有如下幾個特性特別適合Pulsar的應用場景:

  能讓Pulsar創建多個獨立的日志,這種獨立的日志就是ledgers. 隨着時間的推移,Pulsar會為Topic創建多個ledgers

  為按條目復制的順序數據提供了非常高效的存儲。

  保證了多系統掛掉時ledgers的讀取一致性。

  提供不同的Bookies之間均勻的IO分布的特性。

  容量和吞吐量都能水平擴展。並且容量可以通過在集群內添加更多的Bookies立刻提升。

  Bookies被設計成可以承載數千的並發讀寫的ledgers。使用多個磁盤設備,一個用於日志,另一個用於一般存儲,這樣Bookies可以將讀操作的影響和對於寫操作的延遲分隔開。 

5.Pulsar proxy

  Pulsar客戶端和Pulsar集群交互的一種方式就是直連Pulsar brokers 。 然而,在某些情況下,這種直連既不可行也不可取,因為客戶端並不知道broker的地址。例如在雲環境或者 Kubernetes 以及其他類似的系統上面運行Pulsar,直連brokers就基本上不可能了。

   Pulsar proxy提供了解決這個問題的方案,它可以作為集群中的所有brokers的統一網關。 如果你選擇運行Pulsar Proxy(這是可選的),所有的客戶端連接將會通過這個代理而不是直接與brokers通信。

6.Service discovery

  連接到Pulsar Broker的客戶端需要能夠使用單個URL與整個Pulsar實例進行通信。Pulsar提供了一個內置的服務發現機制。

6.訂閱模型

  訂閱是命名好的配置規則,指導消息如何投遞給消費者。 Pulsar有三種訂閱模式:exclusivesharedfailover。下圖展示了這三種模式:

 

Exclusive(獨占)

  獨占模式,只能有一個消費者綁定到訂閱上。 如果多於一個消費者嘗試以同樣方式去訂閱主題,消費者將會收到錯誤。

 

 

 

Failover(災備)

  Failover模式中,多個consumer可以綁定到同一個subscriptionconsumer將會按字典順序排序,第一個consumer被初始化為唯一接受消息的消費者。這個consumer被稱為master consumer

  當master consumer斷開時,所有的消息(未被確認和后續進入的)將會被分發給隊列中的下一個consumer

 

Shared(共享)

  share或者round robin模式中,多個消費者可以綁定到同一個訂閱上。消息通過round robin輪詢機制分發給不同的消費者,並且每個消息僅會被分發給一個消費者。當消費者斷開連接,所有被發送給他,但沒有被確認的消息將被重新安排,分發給其它存活的消費者。

 

 

  注意:

  Shared模式的限制,使用shared模式時,需要重點注意以下兩點: * 消息的順序無法保證。* 你不可以使用累積確認。

  累積消息確認不能用於共享訂閱模式,因為共享模式中,一個訂閱會涉及到多個消費者。

Key_shared

  在Key_shared共享模式下,多個消費者可以附加到同一訂閱。消息在跨消費者的分發中傳遞,具有相同key或相同訂購key的消息僅傳遞給一個消費者。無論消息被重新傳遞多少次,它都會被傳遞給同一個消費者。當使用者連接或斷開連接時,將導致服務的消費者更改某些消息的key

多主題訂閱

  當consumer訂閱pulsar的主題時,它默認指定訂閱了一個主題,例如:persistent://public/default/my-topic

  Pulsar消費者可以同時訂閱多個topic。可以用以下兩種方式定義topic的列表:

  通過最基礎的 正則表達式(regex),例如 persistent://public/default/finance-.*

  通過明確指定的topic列表

  注意:

    通過正則訂閱多主題時,所有的主題必須在同一個namespace

    當訂閱多主題時,Pulsar客戶端會自動調用PulsarAPI來發現匹配表達式或者列表的所有topic,然后全部訂閱。如果此時有暫不存在的topic,那么一旦這些topic被創建,conusmer會自動訂閱。

    不能保證順序性當消費者訂閱多主題時,Pulsa所提供對單一主題訂閱的順序保證,就hold不住了。如果你在使用Pulsar的時候,遇到必須保證順序的需求,我們強烈建議不要使用此特性。

 

7. 其他核心問題

1.死信主題

  死信主題能夠在某些消息無法由consumer成功消費時消費新消息。在這種機制中,無法消費的消息存儲在一個單獨的主題中,稱為死信主題。開發人員可以決定如何處理死信主題中的消息。

  死信主題取決於消息的重新傳遞。由於確認超時或否定確認,消息將重新傳遞。如果要對消息使用否定確認,請確保在確認超時之前對其進行否定確認。

  注意:

    目前,死信主題僅在共享訂閱模式下啟用。

    如果不指定死信主題,默認會創建一個,格式如下:

    <topicname>-<subscriptionname>-DLQ

2.消息保留和到期

  Pulsar broker默認如下:

    立即刪除所有已經被消費者確認過的的消息

    以消息backlog的形式,持久保存所有的未被確認消息

  Pulsar有兩個特性,讓你可以覆蓋上面的默認行為。

    消息存留可以保存消費確認過的消息

    消息過期可以給未被確認的消息設置存活時長(TTL)

3.消息去重

  當消息被Pulsar持久化多於一次的時候,消息就會重復。消息去重是Pulsar可選的特性,阻止不必要的消息重復,每條消息僅處理一次,即使消息被接收多次。

  broker是通過sequence_id判斷消息是否重復,出現重復的原因可能是網絡超時,發送或ack失敗導致。

  通過修改配置文件決定是否開啟去重的功能,默認是不去重。

  如果在Pulsar broker中啟用消息重復數據刪除,則無需對Pulsar客戶端進行任何重大更改。但是,需要為客戶端生產者提供兩種設置:

    生產者必須設置一個名字

    消息發送超時需要設置為無窮大(即沒有超時)

8.pulsar應用

  騰訊計費(米大師)(https://cloud.tencent.com/product/midas) 是孵化於支撐騰訊內部業務千億級營收的互聯網計費平台,匯集國內外主流支付渠道,提供賬戶管理、精准營銷、安全風控、稽核分賬、計費分析等多維度服務。平台承載了公司每天數億收入大盤,為 180+ 個國家(地區)、萬級業務代碼、100W+ 結算商戶提供服務,托管賬戶總量300 多億,是一個全方位的一站式計費平台。

9.pulsar搭建試用

1.系統要求

  Pulsar目前適用於MacOSLinux

  默認情況下,Pulsar會分配2G JVM堆內存來啟動。它可以在conf/pulsar_env.sh文件下更改PULSAR_MEM

2. 集群組成說明

  搭建Pulsar集群至少需要3個組件:ZooKeeper集群、BookKeeper集群和broker集群(BrokerPulsar的自身實例)3個集群組件如下:

    本地ZooKeeper集群(3ZooKeeper節點組成)

    bookie集群(也稱為BookKeeper集群,3BookKeeper節點組成)

    broker集群(3Pulsar節點組成)

  Pulsar的安裝包已包含了搭建集群所需的各個組件庫。無需單獨下載ZooKeeper安裝包和BookKeeper安裝包。

  下載Pulsar安裝包apache-pulsar-2.5.1-bin.tar.gz

3.准備資源

主機

系統版本

JDK版本

Pulsar版本

192.168.10.10

CentOS 7.2

1.8

2.5.1

192.168.10.11

CentOS 7.2

1.8

2.5.1

192.168.10.12

CentOS 7.2

1.8

2.5.1

4.創建集群環境

  把下載的 Pulsar 安裝包上傳到 Linux 服務器,解壓安裝包。

  # 解壓安裝包

  tar -zxvf apache-pulsar-2.5.1-bin.tar.gz -C /opt

5.配置部署本地 ZooKeeper 集群

1.新建文件夾,並寫入配置內容。

  #修改zookeeper的端口

  clientPort=12181

  # 在/opt/apache-pulsar-2.5.1目錄中新建文件目錄

  mkdir -p data/zookeeper

 

  # 新建文件 myid,寫入 1

  echo 1 > data/zookeeper/myid

    注意:

    另外兩台服務器的 myid 文件內容分別寫入 2 和 3。

  # 服務器 2

  mkdir -p data/zookeeperecho 2 > data/zookeeper/myid

  # 服務器 3

  mkdir -p data/zookeeperecho 3 > data/zookeeper/myid

2.配置 zookeeper.conf 文件。

  # 指定dataDir目錄dataDir=/opt/apache-pulsar-2.5.1/data/zookeeper

  # zookeeper 節點地址

  server.1=192.168.10.10:12888:13888

  server.2=192.168.10.11:12888:13888

  server.3=192.168.10.12:12888:13888

  注意:在另外兩台服務器上,對 zookeeper.conf 文件進行完全相同的配置。

3. /opt/apache-pulsar-2.5.1目錄中,執行啟動命令。

  # 進入 zookeepers 目錄

  cd /opt/apache-pulsar-2.5.1

 

  # 執行后台運行命令

  bin/pulsar-daemon start zookeeper

4. 驗證 ZooKeeper 節點是否啟動成功。

  # 進入 zookeepers 目錄

  cd /opt/apache-pulsar-2.5.1

 

  # 執行 zookeeper 客戶端連接命令

  bin/pulsar zookeeper-shell -server 192.168.10.10:12181

 

  注意:

  Enter鍵進入命令行界面后,可完全使用ZooKeeper的各種命令,如 lsget等命令。使用 quit 命令退出命令行界面。

  1. 按照以上步驟,在另外兩台服務器上部署 ZooKeeper 節點。

5. 在任一個Pulsar節點,初始化集群元數據。

  # 進入 zookeepers 目錄

  cd /opt/apache-pulsar-2.5.1

  # 執行命令初始化集群元數據

  bin/pulsar initialize-cluster-metadata \

  --cluster pulsar-cluster \

  --zookeeper 192.168.10.10:12181 \

  --configuration-store 192.168.10.10:12181 \

  --web-service-url http://192.168.10.10:8083/ \

  --web-service-url-tls https://192.168.10.10:8443/ \

  --broker-service-url pulsar://192.168.10.10:6650/ \

  --broker-service-url-tls pulsar+ssl://192.168.10.10:6651/

6.配置部署 BookKeeper 集群

1. 修改配置文件 bookkeeper.conf。

  # 進入bookie 配置文件目錄

  cd /opt/apache-pulsar-2.5.1/conf

 

  # 編輯 bookkeeper.conf 文件

  vim bookkeeper.conf

  # advertisedAddress 修改為服務器對應的ip,在另外兩台服務器也做對應的修改advertisedAddress=192.168.10.10

  # 修改以下兩個文件目錄地址journalDirectories=/mnt/disk01/data/bookkeeper/journal
  ledgerDirectories=/mnt/disk01/data/bookkeeper/ledger

  # 修改zk地址和端口信息zkServers=192.168.10.10:12181, 192.168.10.11:12181, 192.168.10.12:12181

  # 修改Prometheus指標采集端口,該端口不修改會沖突

  prometheusStatsHttpPort=8100

  # 修改httpServerEnabled

  httpServerEnabled=true

 

  # 修改httpServerPort

  httpServerPort=8100

2. 初始化元數據,並啟動 bookie 集群。

  # 先執行初始化元數據命令;再執行啟動命令

 

  # 進入 bookies 目錄

  cd /opt/apache-pulsar-2.5.1

 

  # 執行初始化元數據命令;若出現提示,輸入 Y,繼續(只需在一個bookie節點執行一次)

  bin/bookkeeper shell metaformat

 

  # 以后台進程啟動bookie

  bin/pulsar-daemon start bookie

3. 按照以上步驟,啟動另外兩個 bookie 節點。

4. 驗證 bookie 是否啟動成功。

  # 進入 bookies 目錄

  cd /opt/apache-pulsar-2.5.1

 

  # 驗證是否啟動成功

  bin/bookkeeper shell bookiesanity

 

  # 出現如下顯示,表示啟動成功Bookie 

  sanity test succeeded.

7.部署配置 Broker 集群

1. 修改配置文件 broker.conf。

  # 進入配置文件目錄

  cd /opt/apache-pulsar-2.5.1/conf

 

  # 編輯 broker.conf 文件

 

  # 修改集群名,和ZooKeeper里初始化元數據時指定的集群名(--cluster pulsar-cluster)相同

  clusterName=pulsar-cluster

 

  # 修改如下兩個配置,指定的都是ZooKeeper集群地址和端口號zookeeperServers=192.168.10.10:12181, 192.168.10.11:12181, 192.168.10.12:12181

  configurationStoreServers=192.168.10.10:12181, 192.168.10.11:12181, 192.168.10.12:12181

  # 修改如下參數為本服務器ip地址,另外兩個broker節點配置文件也做對應修改advertisedAddress=192.168.10.10

 

  # 修改brokerServicePortTls端口

  brokerServicePortTls=6651

 

  # 修改webServicePortTls端口

  webServicePortTls=8443

 

  # 修改brokerDeleteInactiveTopicsEnabled,默認非活動的topic會被刪除

  brokerDeleteInactiveTopicsEnabled=false

2. 啟動 broker 節點。

  # 進入 brokers 目錄

  cd /opt/apache-pulsar-2.5.1

 

  # 以后台進程啟動 broker

  bin/pulsar-daemon start broker

3. 按照以上步驟,對另外兩個 broker 節點做對應配置,並啟動 broker 節點。

4. 查看集群中 brokers 節點信息,驗證 broker 是否都啟動成功。

  # 進入任一個 broker 目錄

  cd /opt/apache-pulsar-2.5.1

 

  # 查看集群 brokers 節點情況

  bin/pulsar-admin brokers list pulsar-cluster

  至此,集群 ZooKeeper,Broker,Bookie 節點啟動完畢,集群部署成功!接下來可以進行 HelloWorld 測試!

8.測試Pulsar

1. 依次創建集群、租戶、命名空間、分區 topic,並為命名空間指定集群名。

  # 進入 brokers 目錄,選取任一個 broker 節點執行命令即可

  cd /opt/apache-pulsar-2.5.1

 

  # 創建集群(集群名:pulsar-cluster)

  bin/pulsar-admin clusters create pulsar-cluster1

  # 創建租戶(租戶名:my-tenant)

  bin/pulsar-admin tenants create my-tenant

 

  # 創建命名空間(命名空間名:my-tenant/my-namespace,它指定了租戶 my-tenant)

  bin/pulsar-admin namespaces create my-tenant/my-namespace

 

  # 創建持久性分區topic(topic全名:persistent://my-tenant/my-namespace/my-topic;分區數為 3)

  bin/pulsar-admin topics create-partitioned-topic persistent://my-tenant/my-namespace/my-topic -p 3

 

  # 更新命名空間為其指定集群名

  bin/pulsar-admin namespaces set-clusters my-tenant/my-namespace --clusters pulsar-cluster

2. 設置 maven 依賴。

  <dependency>

      <groupId>org.apache.pulsar</groupId>

      <artifactId>pulsar-client</artifactId>

      <version>2.5.1</version>

  </dependency>

3. 創建生產者

public class PulsarProducerDemo3 {
    // 連接集群 broker
    private static String localClusterUrl = "pulsar://192.168.10.10:6650";

    public static void main(String[] args) {
        try {
            Producer<byte[]> producer = getProducer();
            String msg = "hello world pulsar!";

            Long start = System.currentTimeMillis();
            MessageId msgId = producer.send(msg.getBytes());
            System.out.println("spend=" + (System.currentTimeMillis() - start) + ";send a message msgId = " + msgId.toString());
        } catch (Exception e) {
            System.err.println(e);
        }
    }

    public static Producer<byte[]> getProducer() throws Exception {
        PulsarClient client;
        client = PulsarClient.builder().serviceUrl(localClusterUrl).build();
        Producer<byte[]> producer = client.newProducer().topic("persistent://my-tenant/my-namespace/my-topic").producerName("producerName").create();
        return producer;
    }
} 

4. 創建消費者。

public class PulsarConsumerDemo3 {
    private static String localClusterUrl = "pulsar:// 192.168.10.10:6650";

    public static void main(String[] args) {
        try {
            //將訂閱消費者指定的主題消息
            Consumer<byte[]> consumer = getClient().newConsumer()
                    .topic("persistent://my-tenant/my-namespace/my-topic")
                    .subscriptionName("my-subscription")
                    .subscribe();
            while (true) {
                Message msg = consumer.receive();
                System.out.printf("consumer-Message received: %s. \n", new String(msg.getData()));
                // 確認消息,以便broker刪除消息
                consumer.acknowledge(msg);
            }
        } catch (Exception e) {
            System.out.println(e);
        }
    }

    public static PulsarClient getClient() throws Exception {
        PulsarClient client;
        client = PulsarClient.builder().serviceUrl(localClusterUrl).build();
        return client;
    }
}

  

10.Pulsar函數

1.簡述

  開發人員使用友好的API輕松部署輕量級計算邏輯,無需運行其他流處理引擎。

  Pulsar函數是輕量級的計算過程:

    使用來自一個或多個Pulsar主題的消息,

    將開發人員提供的處理邏輯應用於每條消息,

    將計算結果發布到另一個主題。

  Pulsar函數非常適合簡單的數據(交易流水)清洗清洗場景。

2.Java的函數說明

  在Java中編寫Pulsar函數涉及實現兩個接口之一:

    java.util.function.Function接口。

    org.apache.pulsar.functions.api接口,此接口的工作方式與java.util.function.Function接口非常相似,但要的區別在於它提供了一個Context 對象,發人員可以通過各種方式使用它

    函數分為functionwindow functionwindow function通過記錄數和時間限制窗口的大小。

3.資源准備

1. 配置functions-worker

  在Pulsar集群所有節點啟動functions-worker服務。

  在/opt/apache-pulsar-2.5.1目錄中,修改配置文件functions_worker.yml

  # 進入Pulsar目錄

  cd /opt/apache-pulsar-2.5.1

 

  # 編輯functions_worker.yml

  vim functions_worker.yml

 

  # 修改workerId

  workerId: worker79

 

  # 修改workerHostname

  workerHostname: 192.168.10.10

 

  # 修改workerPort

  workerPort: 6750

 

  # 修改workerPortTls

  workerPortTls: 6751

 

  # 修改configurationStoreServers

  configurationStoreServers: 192.168.10.10:12181

 

  # 修改pulsarServiceUrl

  pulsarServiceUrl: pulsar://192.168.10.10:6650

 

  # 修改pulsarWebServiceUrl

  pulsarWebServiceUrl: http://192.168.10.10:8083

 

  # 修改pulsarFunctionsCluster,Pulsar集群名稱

  pulsarFunctionsCluster: pulsar-cluster

  注意:在另外兩台服務器上,對 functions_worker.yml文件進行相應的配置。

2. 啟動functions-worker

  在Pulsar集群的所有節點上執行以下命令

  # 進入 Pulsar文件目錄

  cd /opt/apache-pulsar-2.5.1

 

  # 啟動function服務

  bin/pulsar-daemon start functions-worker

4. 測試函數

1. 設置maven依賴

<dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-functions-api</artifactId> <version>2.5.1 </version> </dependency>

2. 編寫代碼

public class ExclamationFunction implements Function<String, String> { @Override public String process(String input, Context context) { Logger LOG = context.getLogger(); LOG.info("======process input======:{}", input); return String.format("%s!", input); } }

3. 編譯生成api-examples.jar

4. 上傳api-examples.jar到Pulsar目錄/opt/apache-pulsar-2.5.1/examples

5. 提交函數

bin/pulsar-admin-function functions create \

  --processing-guarantees ATMOST_ONCE \

  -- max-message-retries -1 \

  --jar examples/api-examples.jar \

  --classname org.apache.pulsar.functions.api.examples.ExclamationFunction \

  --inputs persistent://public/default/exclamation-input \

  --output persistent://public/default/exclamation-output \

  --name exclamation

6. 查看函數信息

bin/pulsar-admin-function functions get \

  --tenant public \

  --namespace default \

  --name exclamation

7. 查看函數運行狀態

bin/pulsar-admin-function functions getstatus \

  --tenant public \

  --namespace default \

  --name exclamation

8. 查看函數日志

  在函數運行狀態上可以看到function運行時所屬的節點,到運行的節點上,查看日志

  cd /opt/apache-pulsar-2.5.1/logs/functions/public/default

  # 選擇exclamation函數日志目錄

  cd exclamation

 

  # 打開日志文件

  vim exclamation-0.log

 

5. 分配function運行時的資源

資源

描述.

運行

CPU

核數

Docker (即將推出)

RAM

內存字節數

Process, Docker

Disk

磁盤字節數

Docker

1. 通過以下命令指定資源

bin/pulsar-admin functions create \

  --jar target/my-functions.jar \

  --classname org.example.functions.MyFunction \

  --cpu 8 \

  --ram 8589934592 \

  --disk 10737418240

2. Logging,日志定位

$ bin/pulsar-admin functions create \

  --name my-func-1 \

  --log-topic persistent://public/default/my-func-1-log \

  # Other configs

6. 管理函數

  管理pulsar函數時,需要指定有關這些函數的各種信息,包括租戶、名稱空間、輸入和輸出主題等。但是,有一些參數具有默認值,如果忽略,將提供這些默認值。下表列出了默認值:

  默認參數示例:

參數

默認

函數名

為函數指定名稱,例如:標記--classname org.example.myFunction會給函數指定一個myFunction的名稱。

租戶

從輸入主題的名稱派生。如果輸入主題位於marketing租戶下的asia名稱空間下,即主題名稱的格式為persistent://marketing/asia/{topicName},則名稱空間將為marketing。

名稱空間

從輸入主題的名稱派生。如果輸入主題位於marketing租戶下的asia名稱空間下,即主題名稱的格式為persistent://marketing/asia/{topicName},則名稱空間將為asia。

輸出主題

{input topic}-{function name}-output。如何沒有提供輸出主題,函數默認會提供主題:incoming-exclamation-output。

訂閱類型

對於至少一次且最多一次的處理保證,默認情況下應用共享;對於有效的一次保證,應用故障轉移

處理保證

ATLEAST_ONCE(可用選項包括ATMOST_ONCE/ ATLEAST_ONCE/ EFFECTIVELY_ONCE)

Pulsar服務URL

pulsar://localhost:6650

7. 處理保證機制

1. 語義類型

  Pulsar函數提供三種不同的消息傳遞語義,可以將其應用於任何函數。

傳遞語義

描述

At-most-once

發送到函數的每個消息都可能被處理,或者不被處理(因此“最多”)。

At-least-once

發送到函數的每個消息都可以處理多次(因此“至少”)。

Effectively-once

發送到函數的每條消息都將有一條與其相關聯的輸出。

2. 語義測試

  提交函數后查看function日志,處理保證的類型與訂閱類型關聯,如下:

1.EFFECTIVELY_ONCE
  ##############################EFFECTIVELY_ONCE#################################

PulsarSourceConfig(

processingGuarantees=EFFECTIVELY_ONCE,

subscriptionType=Failover,

subscriptionName=public/default/exclamation,

maxMessageRetries=-1,

deadLetterTopic=null,

topicSchema={

persistent://public/default/exclamation-input=ConsumerConfig(

schemaType=null,

serdeClassName=null,

isRegexPattern=false,

receiverQueueSize=null)},

typeClassName=java.lang.String,

timeoutMs=null) 

  通過SourceConfig可以看出處理保證為EFFECTIVELY_ONCE訂閱類型是FAILOVER,如下圖:

 

Starting Pulsar consumer perf with config: {

  "topicNames" : [ "persistent://public/default/exclamation-input" ],

  "topicsPattern" : null,

  "subscriptionName" : "public/default/exclamation",

  "subscriptionType" : "Failover",

  "receiverQueueSize" : 1000,

  "acknowledgementsGroupTimeMicros" : 100000,

  "negativeAckRedeliveryDelayMicros" : 60000000,

  "maxTotalReceiverQueueSizeAcrossPartitions" : 50000,

  "consumerName" : null,

  "ackTimeoutMillis" : 0,

  "tickDurationMillis" : 1000,

  "priorityLevel" : 0,

  "cryptoFailureAction" : "CONSUME",

  "properties" : {

    "application" : "pulsar-function",

    "id" : "public/default/exclamation",

    "instance_id" : "0"

  },

  "readCompacted" : false,

  "subscriptionInitialPosition" : "Latest",

  "patternAutoDiscoveryPeriod" : 1,

  "regexSubscriptionMode" : "PersistentOnly",

  "deadLetterPolicy" : null,

  "autoUpdatePartitions" : true,

  "replicateSubscriptionState" : false,

  "resetIncludeHead" : false

}

 

PulsarSinkConfig(

processingGuarantees=EFFECTIVELY_ONCE, topic=persistent://public/default/exclamation-output,

serdeClassName=null,

schemaType=null,

typeClassName=java.lang.String)
2.ATMOST_ONCE
############################ ATMOST_ONCE######################################

PulsarSourceConfig(

processingGuarantees=ATMOST_ONCE,

subscriptionType=Shared,

subscriptionName=public/default/exclamation,

maxMessageRetries=-1,

deadLetterTopic=null,

topicSchema={

persistent://public/default/exclamation-input=ConsumerConfig(

schemaType=null,

serdeClassName=null,

isRegexPattern=false,

receiverQueueSize=null)},

typeClassName=java.lang.String,

timeoutMs=null)

處理保證為ATMOST_ONCE訂閱類型是Shared,如下圖:

 

 

 

Starting Pulsar consumer perf with config: {

  "topicNames" : [ "persistent://public/default/exclamation-input" ],

  "topicsPattern" : null,

  "subscriptionName" : "public/default/exclamation",

  "subscriptionType" : "Shared",

  "receiverQueueSize" : 1000,

  "acknowledgementsGroupTimeMicros" : 100000,

  "negativeAckRedeliveryDelayMicros" : 60000000,

  "maxTotalReceiverQueueSizeAcrossPartitions" : 50000,

  "consumerName" : null,

  "ackTimeoutMillis" : 0,

  "tickDurationMillis" : 1000,

  "priorityLevel" : 0,

  "cryptoFailureAction" : "CONSUME",

  "properties" : {

    "application" : "pulsar-function",

    "id" : "public/default/exclamation",

    "instance_id" : "0"

  },

  "readCompacted" : false,

  "subscriptionInitialPosition" : "Latest",

  "patternAutoDiscoveryPeriod" : 1,

  "regexSubscriptionMode" : "PersistentOnly",

  "deadLetterPolicy" : null,

  "autoUpdatePartitions" : true,

  "replicateSubscriptionState" : false,

  "resetIncludeHead" : false

}

 

Starting Pulsar producer perf with config: {

  "topicName" : "persistent://public/default/exclamation-output",

  "producerName" : null,

  "sendTimeoutMs" : 0,

  "blockIfQueueFull" : true,

  "maxPendingMessages" : 1000,

  "maxPendingMessagesAcrossPartitions" : 50000,

  "messageRoutingMode" : "CustomPartition",

  "hashingScheme" : "Murmur3_32Hash",

  "cryptoFailureAction" : "FAIL",

  "batchingMaxPublishDelayMicros" : 10000,

  "batchingMaxMessages" : 1000,

  "batchingEnabled" : true,

  "batcherBuilder" : { },

  "compressionType" : "LZ4",

  "initialSequenceId" : null,

  "autoUpdatePartitions" : true,

  "properties" : {

    "application" : "pulsar-function",

    "id" : "public/default/exclamation",

    "instance_id" : "0"

  }

}
3.ATLEAST_ONCE
##############################ATLEAST_ONCE####################################

PulsarSourceConfig(

processingGuarantees=ATLEAST_ONCE,

subscriptionType=Shared,

subscriptionName=public/default/exclamation,

maxMessageRetries=-1,

deadLetterTopic=null,

topicSchema={

persistent://public/default/exclamation-input=ConsumerConfig(

schemaType=null,

serdeClassName=null,

isRegexPattern=false,

receiverQueueSize=null)},

typeClassName=java.lang.String,

timeoutMs=null)

  處理保證為ATLEAST_ONCE訂閱類型是Shared,如下圖:

 

 

 

Starting Pulsar consumer perf with config: {

  "topicNames" : [ "persistent://public/default/exclamation-input" ],

  "topicsPattern" : null,

  "subscriptionName" : "public/default/exclamation",

  "subscriptionType" : "Shared",

  "receiverQueueSize" : 1000,

  "acknowledgementsGroupTimeMicros" : 100000,

  "negativeAckRedeliveryDelayMicros" : 60000000,

  "maxTotalReceiverQueueSizeAcrossPartitions" : 50000,

  "consumerName" : null,

  "ackTimeoutMillis" : 0,

  "tickDurationMillis" : 1000,

  "priorityLevel" : 0,

  "cryptoFailureAction" : "CONSUME",

  "properties" : {

    "application" : "pulsar-function",

    "id" : "public/default/exclamation",

    "instance_id" : "0"

  },

  "readCompacted" : false,

  "subscriptionInitialPosition" : "Latest",

  "patternAutoDiscoveryPeriod" : 1,

  "regexSubscriptionMode" : "PersistentOnly",

  "deadLetterPolicy" : null,

  "autoUpdatePartitions" : true,

  "replicateSubscriptionState" : false,

  "resetIncludeHead" : false

}

 

PulsarSinkConfig(

processingGuarantees=ATLEAST_ONCE,

topic=persistent://public/default/exclamation-output,

serdeClassName=null,

schemaType=null,

typeClassName=java.lang.String)

Starting Pulsar producer perf with config: {

  "topicName" : "persistent://public/default/exclamation-output",

  "producerName" : null,

  "sendTimeoutMs" : 0,

  "blockIfQueueFull" : true,

  "maxPendingMessages" : 1000,

  "maxPendingMessagesAcrossPartitions" : 50000,

  "messageRoutingMode" : "CustomPartition",

  "hashingScheme" : "Murmur3_32Hash",

  "cryptoFailureAction" : "FAIL",

  "batchingMaxPublishDelayMicros" : 10000,

  "batchingMaxMessages" : 1000,

  "batchingEnabled" : true,

  "batcherBuilder" : { },

  "compressionType" : "LZ4",

  "initialSequenceId" : null,

  "autoUpdatePartitions" : true,

  "properties" : {

    "application" : "pulsar-function",

    "id" : "public/default/exclamation",

    "instance_id" : "0"

  }

}

11. Dashboard

1. 說明

  Pulsar自帶Docker版本的Dashboard,對集群Broker、Bookie、ZooKeeper及Topic等進行監控和統計。

  以下在CentOS使用Prometheus+Grafana搭建Pulsar集群監控Dashboard。實現對NameSpace、Topic、Broker、Bookie、ZooKeeper等指標和組件進行監控和統計。

2. 准備資源

  一台CentOS裸機服務器:192.168.10.10。

  Prometheus 安裝包(版本號 2.12.0)。

  Grafana 安裝包(版本號 6.3.5)。

3. 下載解壓 PrometheusGrafana 安裝包 

  # Prometheus 安裝包下載地址

  https://github.com/prometheus/prometheus/releases/download/v2.12.0/prometheus-2.12.0.linux-amd64.tar.gz

 

  # 解壓安裝包

  tar -zxvf prometheus-2.12.0.linux-amd64.tar.gz

 

  # Grafana 安裝包下載地址

  https://dl.grafana.com/oss/release/grafana-6.3.5.linux-amd64.tar.gz

 

  # 解壓安裝包

  tar -zxvf grafana-6.3.5.linux-amd64.tar.gz

 

  # 解壓后,我的文件路徑如下

  # /opt/prometheus-2.12.0.linux-amd64

  # /opt/grafana-6.3.5

4. 配置Prometheusprometheus.yml 配置文件

  1. 修改集群名(cluster: pulsar-cluster)

  2. 配置 broker 節點、IP 和端口號

  3. 配置 bookie 節點、IP 和端口號

  4. 配置 ZooKeeper 節點、IP 和端口號

  說明:

  1. 測試集群是3CentOS服務器。每台服務器上都部署一個broker節點、一個bookie節點、一個ZooKeeper節點。

  2. 3台服務器的測試環境IP地址設定如下:

    192.168.10.10

    192.168.10.11

    192.168.10.12

  3. 集群名為pulsar-cluster。若在搭建Pulsar集群的過程中,沒有修改端口號,則只需要參照如下配置文件修改集群名和機器IP地址即可。

  4. 部署監控的機器IP地址為

    192.168.10.10。

5. prometheus.yml 文件示例

  prometheus.yml文件模版下載地址:https://github.com/streamnative/apache-pulsar-grafana-dashboard/blob/master/prometheus/standalone.yml.template

  根據實際情況進行修改。以下是一個 prometheus.yml 文件示例:

# my global config

global:

  scrape_interval:     15s # Set the scrape interval to every 15 seconds. Default is every 1 minute.

  evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.

  # scrape_timeout is set to the global default (10s).

  external_labels:

    cluster: pulsar-cluster

# Alertmanager configuration

alerting:

  alertmanagers:

  - static_configs:

    - targets:

      # - alertmanager:9093

 

# Load rules once and periodically evaluate them according to the global 'evaluation_interval'.

rule_files:

  # - "first_rules.yml"

  # - "second_rules.yml"

 

# A scrape configuration containing exactly one endpoint to scrape:

# Here it's Prometheus itself.

scrape_configs:

  # The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.

 

  - job_name: "broker"

    honor_labels: true # don't overwrite job & instance labels

    static_configs:

    - targets: ['192.168.10.10:8083','192.168.10.11:8083','192.168.10.12:8083']

      

 

  - job_name: "bookie"

    honor_labels: true # don't overwrite job & instance labels

    static_configs:

    - targets: ['192.168.10.10:8100','192.168.10.11:8100','192.168.10.12:8100']

      

 

  - job_name: "zookeeper"

    honor_labels: true

    static_configs:

    - targets: ['192.168.10.10:9990','192.168.10.11:9990','192.168.10.12:9990']     

6. 后台啟動Prometheus

  # 進入Prometheus目錄

  cd /opt/prometheus-2.12.0.linux-amd64

 

  # 以后台進程執行啟動命令

  nohup ./prometheus --config.file ./prometheus.yml --web.listen-address=192.168.10.10:9090 --web.enable-lifecycle --storage.tsdb.retention=10d >prometheus.log 2>&1 &

 

  # 參數說明

  # --config.file 指定 prometheus.yml 文件路徑

  # 指定監聽地址端口

  # --web.listen-address=192.168.10.10:9090

  # --web.enable-lifecycle 啟動時熱加載配置文件使用

  # --storage.tsdb.retention 指定統計數據存儲時長,10d 代表10天

  # >prometheus.log 2>&1 &  存儲輸出的啟動日志

7. 訪問Prometheus

  1. 部署監控的機器ip為192.168.10.10

  2. 在瀏覽器訪問http://192.168.10.10:9090/graph

  3. 能成功訪問及運行成功

  4. Prometheus做時序數據存儲,以及提供強大的查詢功能,Dashboard展示使用Grafana更漂亮專業,搭建配置Grafana

8. 后台啟動 Grafana

  # 進入 Grafana 文件目錄

  cd /opt/grafana-6.3.5/

 

  # 以后台進程執行啟動命令

  nohup bin/grafana-server  start  >grafana.log 2>&1 &

9. 訪問 Grafana

  1. 部署監控的機器 ip為192.168.10.10

  2. 在瀏覽器訪問 http://192.168.10.10:3001

  3. 出現如下圖所示 Grafana 首頁,輸入默認用戶名 admin,默認密碼 admin。

10. 配置 Grafana 數據源

  1. 添加 Prometheus 數據源。

 

  2. 選擇 Prometheus。

 

  3. 配置 Prometheus 數據源。

 

 

 11. 導入監控 Dashboard 模板

 

  1.  Github 下載 Dashboard 模板。

  2. 選擇上傳導入 Dashboard 模版(這里以 "Pulsar 集群總況.json" 模板為例)。

  3. 導入成功,查看 Dashboard 面板統 

  4. 依次導入Pulsar 消息總況.json、Pulsar Topic詳情.json、Pulsar JVM監控.json,查看Dashboard,展示如下:

  

12 安全管理

1. 簡述

  作為企業的中央消息總線,Apache Pulsar經常用於存儲關鍵任務數據。因此,在Pulsar中啟用安全功能至關重要。

  默認情況下,Pulsar不配置加密,身份驗證或授權。任何客戶端都可以通過純文本服務URLApache Pulsar通信。因此,我們必須確保通過這些純文本服務URL訪問的Pulsar僅限於受信任的客戶端。在這種情況下,您可以使用網絡分段和/或授權ACL來限制對可信IP的訪問。如果不使用,則群集的狀態是敞開的,任何人都可以訪問群集。

  Pulsar支持可插拔的身份驗證機制。Pulsar客戶端使用此機制對代理和代理進行身份驗證。還可以配置Pulsar以支持多個身份驗證源。

2. 身份驗證提供商

  目前,Pulsar支持以下身份驗證提供商:

    TLS身份驗證

    Athenz

    Kerberos

13 分層存儲

  Pulsar面向segment的架構允許topic backlog增長的十分龐大,如果有沒加以限制,隨着時間的增加,代價將越來越高。

有一個減輕這個消耗的辦法,那就是使用分層存儲。 通過分層存儲,在backlog中的舊消息可以從BookKeeper轉移到更連接的存儲器中,如果不發生變化客戶端仍然可以通過backlog去訪問。

 

 

 

  Pulsar目前支持亞馬遜的S3以及Google雲存儲GCS來作為長期存儲。可以通過Rest API或者命令行接口,將數據從短期數據卸載Offloading長期存儲。用戶傳入他們想要保留在BookKeeper上的大量Topic數據,Broker會復制backlog數據到長期存儲。通過一個可配置的延時(默認是4小時),與那時數據會從BookKeeper上被刪除。

  通過修改conf/broker.conf實現分層存儲的功能。后續略


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM