Pulsar架构


本篇主要总结下Pulsar的整体架构以及各组件的工作原理。

1.概述

在最高级别,单个 Pulsar 实例由一个或多个 Pulsar 集群组成,同一实例中的集群之间可

以相互复制数据;单个 Pulsar 集群由以下三部分组成:

1)一个或者多个 Broker,负责处理和负载均衡生产者发出的消息,并将这些消息分派给消费者,它与配置存储交互来处理相应的任务,并将消息存储在 BookKeeper 实例中(又称 bookies)Broker 依赖 ZooKeeper 集群处理特定的任务,等等;

2)由一个或多个 bookie 组成的 BookKeeper集群,负责消息的持久化存储;

3)一个Zookeeper集群,用来处理集群级别的的配置和协调任务;

集群说明如下:

  

在更大的实例层面,有一个叫做配置存储的ZooKeeper集群能访问到全部实例,负责处理多集群间的协调任务,例如异地复制。

2.Broker

一个无状态组件,主要运行两个服务组件

1)http服务,监听8080端口,暴露一个rest接口,供管理员管理集群和为producer和consumer提供topic查询,可以使用命令行或url方式;

2)tcp服务,监听6650端口,是一个调度器,用来处理所有数据的传输;

出于性能的考虑, 消息通常从 managed ledger 缓存中调度, 除非 backlog超过了缓存的大小,如果 backlog对于缓存来说太大了, 则Broker将开始从BookKeeper那里读取数据条目;

最后,为了支持全局Topic异地复制,Broker会控制Replicators追踪本地发布的条目,并通过Java 客户端把这些条目重新发布到其他区域。

3.持久化存储

Pulsar为应用提供有保证消息传递,即未确认送达的消息需要持久化存储直到它们被确认送达,Pulsar内部,所有消息都被保存并同步N份;

1)Apache BookKeeper

Pulsar用 Apache BookKeeper作为持久化消息存储,BookKeeper是一个分布式的预写日志(WAL)系统,有如下几个特性特别适合Pulsar的应用场景:

  • 允许Pulsar创建多个独立的日志,这种独立的日志就是ledgers,随着时间的推移,Pulsar会为Topic创建多个ledgers;
  • 为按条目复制的顺序数据提供了非常高效的存储;
  • 保证了多系统挂掉时ledgers的读取一致性;
  • 提供了不同的Bookies之间IO均匀分布的特性;
  • 容量和吞吐量都能水平扩展,并且容量可以通过在集群内添加更多的Bookies立刻得到提升;
  • Bookies被设计成可以承载数千的并发读写的ledgers,通过使用多个磁盘设备,一个用于journal,另一个用于一般存储,这样Bookies可以将读操作的影响和对于写操作的延迟分隔开;

除了消息数据,cursors也会被持久化入BookKeeper,cursors是消费端订阅消费的位置;

下图展示了brokers和bookies是如何交互的:

  

2)Ledgers

Ledger是一个只追加的数据结构,并且只有一个写入器,这个写入器负责多个BookKeeper存储节点(就是Bookies)的写入,Ledger的条目会被复制到多个bookies;

Ledgers本身有着非常简单的语义:

  • Pulsar Broker可以创建ledeger,添加内容到ledger和关闭ledger;
  • 当一个ledger被关闭后,无论是明确关闭或者是因为写入器挂掉,这个ledger只会以只读模式打开;
  • ledger中的条目不再有用的时候,整个legder可以被删除(跨所有Bookies);

Ledger读一致性:

BookKeeper的主要优势在于他能在有系统故障时保证读的一致性, 由于Ledger只能被一个进程写入(之前提的写入器进程),在写入时不需要考虑一致性,从而写入会非常高效;在一次故障之后,ledger会启动一个恢复进程来确定ledger的最终状态并确认最后提交到日志的是哪一个条目;在这之后,能保证所有的ledger读进程读取到相同的内容;

Managed ledgers:

由于BookKeeper Ledgers提供了单一的日志抽象,在ledger的基础上我们开发了一个叫managed ledger的库,用以表示单个topic的存储层;managed ledger即消息流的抽象,有一个写入器进程不断在流结尾添加消息,并且有多个cursors 消费这个流,每个cursor有自己的消费位置;

一个managed ledger在内部用多个BookKeeper ledgers保存数据,这么做有两个原因:

  • 在故障之后,原有的某个ledger不能再写了,需要创建一个新的;
  • ledger在所有cursors消费完它所保存的消息之后就可以被删除,这样可以实现ledgers的定期回滚;

说明:

Entry和Ledger都BookKeeper中的概念,Entry相当于一条记录,而Ledger相当于一种记录的集合;一个topic拥有一个managed ledger,每个managed ledger下面可以有多个ledgers,一个ledger也是一个segment,即分片,bookie中,数据是分片存储的;

3)Journal storage

BookKeeper中journal文件包含事务日志,在更新到 ledger之前,bookie需要确保描述这个更新的事务被写到持久(非易失)存储上面;bookie启动或旧的journal文件大小达到上限(由 journalMaxSizeMB 参数配置)的时候,新的journal文件会被创建;

4.元数据存储

Pulsar 元数据存储维护一个 Pulsar 集群的所有元数据,例如topic、schema、broker负载等,Pulsar 使用Apache ZooKeeper进行元数据存储、集群配置和协调;Pulsar 元数据存储可以部署在单独的 ZooKeeper 集群上,也可以部署在现有的 ZooKeeper 集群上;您可以将一个 ZooKeeper 集群同时用于 Pulsar 元数据存储和BookKeeper 元数据存储,如果要部署连接到现有 BookKeeper 集群的 Pulsar broker,则需要分别为 Pulsar 元数据存储和 BookKeeper 元数据存储部署单独的 ZooKeeper 集群;

Pulsar 实例中:

  • 配置存储保存了 tenants, namespaces以及其他需要全局一致的配置项;
  • 每个集群有自己独立的ZooKeeper集群内保存部配置和协调信息,例如topic归属信息,broker负载报告,BookKeeper ledger信息等;

5.配置存储

配置存储维护一个 Pulsar 实例的所有配置,例如集群、租户、命名空间、分区主题相关配置等,一个 Pulsar 实例可以有一个本地集群、多个本地集群或多个跨区域集群;因此,配置存储可以在 Pulsar 实例下的多个集群之间共享配置,配置存储可以部署在单独的 ZooKeeper 集群上,也可以部署在现有的 ZooKeeper 集群上;

6.Pulsar proxy

Pulsar客户端和Pulsar集群交互的一种方式就是直连Pulsar brokers,然而,在某些情况下,这种直连既不可行也不可取,因为客户端并不知道broker的地址,例如在云环境或者 Kubernetes 以及其他类似的系统上面运行Pulsar,直连brokers就基本上不可能了;

Pulsar proxy提供了解决这个问题的方案,它可以作为集群中的所有brokers的统一网关,如果你选择运行Pulsar Proxy(这是可选的),所有的客户端连接将会通过这个代理而不是直接与brokers通信,出于性能和容错的考虑,你可以运行任意个Pulsar proxy;

注意点:

  • 连接客户端不需要为使用Pulsar proxy提供任何特定配置;
  • Pulsar proxy支持TLS 加密 和 认证;

7.服务发现

客户端需要能够使用单个 URL 与整个 Pulsar 集群进行通信,Pulsar内部提供了服务发现的机制,你也可以用你自己的服务发现系统,当客户端发送一个HTTP请求时,例如发到http://pulsar.us-west.example.com:8080,客户端需要被重定向到某些所需的集群中活跃的broker,或者通过DNS,或者通过HTTP和IP重定向,或者其他机制;

注意:

Pulsar中,每个主题只由一个 broker 处理,客户端发出的读取,更新或删除主题的初始请求可能发送给不是处理该主题的 broker,如果这个 broker 不能处理该主题的请求,broker 将会把该请求重定向到可以处理该主题请求的 broker上。

8.消息发送到落盘过程

几个概念:

topic:用于将消息从生产者传输到消费者的通道,producer发布消息到topic, consumer订阅topic并处理发布的消息

bundle:切分命名空间的一段哈希值范围,被独立的分配到不同的broker,每个topic会根据其名称算出的哈希值来判断需要分到哪一个特定的bundle,即分配到了不同的broker上,相当于所有topic的一个子集

broker:一个无状态组件,主要运行两个服务组件,http服务(8080)和tcp服务(6650),一个管理集群,一个传输数据

entry:存储到bookkeeper中的一条记录

ledger:用来存储entry的,多个entry序列组成一个ledger

managed ledger:单个topic的存储层,下面可以有多个ledgers,数据的写入是通过managed ledger完成的

bookie:bookkeeper集群中的一个存储节点,用于存储ledger,因为存储是分布式的,每个ledger会存储在多个bookie上

entry log:存储entry的文件

index file:ledger的索引文件,记录每个ledger在entry log中的存储位置以及数据在entry log文件中的长度

ledger cache:用于缓存数据和索引文件的,加快查找效率

journal:用于存储bookkeeper的事务日志,在数据刷新到ledger之前,持久化存储这个刷新的事务,防止缓存中的数据丢失,用于数据恢复,可定时删除已刷盘数据,避免占用存储空间

整个过程:

客户端指定一个或多个broker地址,执行时会先和任意一个broker建立连接,发送一个http请求查询topic所在broker,然后和该broker建立一个tcp连接并进行认证和鉴权,通过后,客户端会为该broker创建一个生产者对象;

生产者发送消息到该broker,broker接收到消息后,会先缓存在自己的内存中,然后查找几个合适的bookie节点,同时启动几个线程分别发送数据到不同的bookie节点,并等待bookie返回ack确认;

bookie在接收到broker发来的数据后,数据和索引文件会先在内存中缓存,不是立刻写入磁盘,当内存达到一定值或者达到刷盘时间后,会触发刷盘操作,将数据和索引持久化到磁盘,这样的话,在刷盘之前可能会因为宕机或其他异常导致缓存中的数据丢失,所以在刷盘之前,还会把缓存中的数据持久化到journal文件中,并记录journal文件的id和对应位置,这样在数据丢失之后可以通过journal文件做数据恢复,而且在刷盘之后或者按周期时间,会删除已刷盘数据之前的journal,避免占用存储空间;

对于bookie来说,当数据持久化到journal文件后,即会给broker返回ack确认,然后broker给客户端返回ack确认,一条数据的发送过程完成。

 

参考官方文档 https://pulsar.apache.org/docs/zh-CN/next/concepts-architecture-overview/


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM