[Apache Pulsar] 企業級分布式消息系統-Pulsar快速上手


Pulsar快速上手

前言

如果你還不了解Pulsar消息系統,可以先看上一篇文章 企業級分布式消息系統-Pulsar入門基礎
Pulsar客戶端支持多個語言,包括Java,Go,Pytho和C++,本篇文章只講述Java客戶端。
Pulsar Java客戶端既可用於創建消息的producers、consumers和readers ,也可用於執行管理任務。Java 客戶端的當前版本為 2.4.0。

1. 安裝

最新版本的Pulsar Java 客戶端庫可通過 Maven中央倉庫 使用。 要使用最新版本, 請將 pulsar-client 庫添加到構建配置中。

1.1 Maven

如果你使用maven,添加以下內容到你的 pom.xml 中:

<!-- 在你的 <properties> 部分-->
<pulsar.version>2.4.0</pulsar.version>

<!-- 在你的 <dependencies> 部分-->
<dependency>
  <groupId>org.apache.pulsar</groupId>
  <artifactId>pulsar-client</artifactId>
  <version>${pulsar.version}</version>
</dependency>

 

1.2 Gradle

如果你使用Gradle,添加以下內容到你的 build.gradle 中:

def pulsarVersion = '2.4.0'

dependencies {
compile group: 'org.apache.pulsar', name: 'pulsar-client', version: pulsarVersion
}  

 

1.3 本地安裝Pulsar

Pulsar目前只支持MacOS和Linux系統,JDK版本1.8及以上。
下載地址見下載說明及配置,Windows的小伙伴們就不用下載了。

2.連接URL

要使用客戶端連接到Pulsar,你需要指定Pulsar 協議URL。
Pulsar協議URL分配給特定的集群,使用pulsar scheme ,默認端口6650。以下是本地主機的示例:

pulsar://localhost:6650

 

如果有多個broker,那么URL如下:

pulsar://localhost:6550,localhost:6651,localhost:6652

 

生產環境的Pulsar 集群URL如下:

pulsar://pulsar.us-west.example.com:6650

 

如果需要TLS認證,URL如下:

pulsar+ssl://pulsar.us-west.example.com:6651

 

3.客戶端配置

你可以用一個URL來實例化一個連接到指定的Pulsar 集群的 PulsarClient 對象,像這樣:

PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar://localhost:6650")
    .build();

 

如果有多個brokers,實例化客戶端如下:

PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar://localhost:6650,localhost:6651,localhost:6652")
    .build();

默認的broker URL是單機集群。 如果你使用單機模式運行一個集群,broker將默認使用pulsar://localhost:6650

3.1 生產者

在Pulsar中,生產者寫消息到topic中。 一旦你實例化一個Pulsar Client對象,你可以創建一個Producer 用於特定的topic。

Producer<byte[]> producer = client.newProducer()
    .topic("my-topic")
    .create();

// 然后你就可以發送消息到指定的broker 和topic上:
producer.send("My message".getBytes());

 

默認情況下,生產者生成由字節數組組成的消息。當然,你也可以指定消息類型,例如下面的String類型:

Producer<String> stringProducer = client.newProducer(Schema.STRING)
    .topic("my-topic")
    .create();
stringProducer.send("My message");

在不再使用時,你需要確保關閉生產者、消費者和客戶端
producer.close(); consumer.close(); client.close();

關閉操作也可以是異步的:

//...業務代碼

producer.closeAsync()
   .thenRun(() -> System.out.println("Producer closed"));
   .exceptionally((ex) -> {
       System.err.println("Failed to close producer: " + ex);
       return ex;
   });
 
3.1.1 生產者配置

如果實例化生產者對象時僅指定topic名稱 (如上面的示例所示), 則生產者將使用默認配置。 要使用非默認配置, 你可以設置多種可配置的參數。詳情見ProducerBuilder的文檔說明,下面是一個示例:

Producer<byte[]> producer = client.newProducer()
.topic("my-topic")                                  //主題名稱
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) //最大發布延遲時間
.sendTimeout(10, TimeUnit.SECONDS)                  //超時時間
.blockIfQueueFull(true)                             //隊列滿了,是否阻塞
.create();
 
3.1.2 消息路由 #####

使用分區主題時,當你使用生產者發布消息時你可以指定路由模式。

3.1.3 異步發送

你可以使用Java客戶端異步發布消息。 使用異步發送,生產者將消息放入阻塞隊列並立即返回。 然后,客戶端將在后台將消息發送給broker。 如果隊列已滿(配置的最大值),則在調用API時,生產者可能會被阻塞或立即失敗,具體取決於傳遞給生產者的參數。
以下是異步發送操作的示例:

producer.sendAsync("my-async-message".getBytes()).thenAccept(msgId -> {
    System.out.printf("Message with ID %s successfully sent", msgId);
});
 
3.1.4 消息配置

除了value之外, 還可以在特定消息上設置其他選項:

producer.newMessage()
.key("my-message-key")                      //消息的key
.value("my-async-message".getBytes())       //消息內容的字節數組
.property("my-key", "my-value")             //自定義的key/value
.property("my-other-key", "my-other-value")
.send();
 
3.2 消費者

在Pulsar中,消費者訂閱topic並處理生產者發布到這些topic的消息。 你可以首先實例化一個PulsarClient對象並傳給他一個borker URL(和生產樣的一樣)來實例化一個消費者。
一旦實例化一個PulsarClient 對象,你可以指定一個主題和一個訂閱來創建一個 Consumer 消費者。

Consumer consumer = client.newConsumer()
    .topic("my-topic")                      //生產者定義的topic
    .subscriptionName("my-subscription")    //消費者自定義的訂閱名稱
    .subscribe();

 

subscribe()方法將自動將訂閱消費者指定的主題, 一種讓消費者監聽主題的方法是使用while循環,示例如下:

while (true) {

  // 等待一個消息
  Message msg = consumer.receive();

  try {
      // 對這個消息的處理(業務)
      System.out.printf("Message received: %s", new String(msg.getData()));

      // 消費者確認消息已消費,同時broker刪除該消息
      consumer.acknowledge(msg);

  } catch (Exception e) {

      // 消息處理失敗,否定確認,該消息稍后會重發
      consumer.negativeAcknowledge(msg);
  }
}
 
3.2.1 消費者配置

如果實例化 消費者對象, 僅指定主題和訂閱名稱, 如上面的示例所示, 消費者將采用默認配置。 要使用非默認配置, 你可以設置多種可配置的參數。詳情見ConsumerBuilder的說明,下面是一個示例:

Consumer consumer = client.newConsumer()
    .topic("my-topic")
    .subscriptionName("my-subscription")
    .ackTimeout(10, TimeUnit.SECONDS)               //確認超時時間
    .subscriptionType(SubscriptionType.Exclusive)   //訂閱模式
    .subscribe();
 
3.2.2 異步接收

receive方法將異步接受消息(消費者處理器將被阻塞,直到有消息到達)。 你也可以使用異步接收方法,這將在一個新消息到達時立即返回一個CompletableFuture對象。示例如下:

CompletableFuture<Message> asyncMessage = consumer.receiveAsync();
 
3.2.3 多主題訂閱

消費者除了訂閱單個Pulsar主題外,你還可以使用多主題訂閱訂閱多個主題。 若要使用多主題訂閱, 可以提供一個topic正則表達式 (regex) 或 主題List 。 如果通過 regex 選擇主題, 則所有主題都必須位於同一Pulsar命名空間中。
下面是一些示例:

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;

import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

ConsumerBuilder consumerBuilder = pulsarClient.newConsumer()
        .subscriptionName(subscription);

// 訂閱命名空間中的所有主題
Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*");
Consumer allTopicsConsumer = consumerBuilder
        .topicsPattern(allTopicsInNamespace)
        .subscribe();

// 使用regex訂閱命名空間中的主題子集
Pattern someTopicsInNamespace = Pattern.compile("persistent://public/default/foo.*");
Consumer allTopicsConsumer = consumerBuilder
        .topicsPattern(someTopicsInNamespace)
        .subscribe();

 

你還可以訂閱明確的主題列表 (可跨命名空間):

List<String> topics = Arrays.asList(
    "topic-1",
    "topic-2",
    "topic-3"
);

Consumer multiTopicConsumer = consumerBuilder
        .topics(topics)
        .subscribe();

// 或者:
Consumer multiTopicConsumer = consumerBuilder
        .topics(
            "topic-1",
            "topic-2",
            "topic-3"
        )
        .subscribe();

 

你也可以使用subscribeAsync 方法異步訂閱多主題,下面是一個示例:

Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default.*");
consumerBuilder
        .topics(topics)
        .subscribeAsync()
        .thenAccept(this::receiveMessageFromConsumer);

private void receiveMessageFromConsumer(Consumer consumer) {
    consumer.receiveAsync().thenAccept(message -> {
                // 業務處理
                receiveMessageFromConsumer(consumer);
            });
}

 

3.2.4 訂閱模型

Pulsar有多種訂閱模型來適用不同的場景,訂閱模型見Pulsar基礎概念,下面講述如何使用。
為了更好的描述他們之間的不同,假設你創建了一個topic,命名為"my-topic",生產者發布了10條消息,示例如下:

//創建生產者
Producer<String> producer = client.newProducer(Schema.STRING)
    .topic("my-topic")
    .enableBatch(false)
    .create();

// "key-1"的消息有3條
// "key-2"的消息有3條
// "key-3"的消息有2條
// "key-4"的消息有2條
producer.newMessage().key("key-1").value("message-1-1").send();
producer.newMessage().key("key-1").value("message-1-2").send();
producer.newMessage().key("key-1").value("message-1-3").send();
producer.newMessage().key("key-2").value("message-2-1").send();
producer.newMessage().key("key-2").value("message-2-2").send();
producer.newMessage().key("key-2").value("message-2-3").send();
producer.newMessage().key("key-3").value("message-3-1").send();
producer.newMessage().key("key-3").value("message-3-2").send();
producer.newMessage().key("key-4").value("message-4-1").send();
producer.newMessage().key("key-4").value("message-4-2").send();

 

Exclusive(獨占模式):
創建一個消費者,以Exclusive模式訂閱消息,代碼如下:

Consumer consumer = client.newConsumer()
    .topic("my-topic")
    .subscriptionName("my-subscription")
    .subscriptionType(SubscriptionType.Exclusive)   //獨占模式
    .subscribe()

 

只有第一個消費者可以訂閱,其他消費者訂閱會報錯。這就意味着第一個消費者可以收到所有的10條消息,消息消費的順序和生產的順序是一樣的。
Failover(災備):
創建一個消費者,以Exclusive模式訂閱消息,代碼如下:

//創建消費者1
Consumer consumer1 = client.newConsumer()
    .topic("my-topic")
    .subscriptionName("my-subscription")
    .subscriptionType(SubscriptionType.Failover)    //災備模式
    .subscribe()

//創建消費者2
Consumer consumer2 = client.newConsumer()
        .topic("my-topic")
        .subscriptionName("my-subscription")
        .subscriptionType(SubscriptionType.Failover)    //災備模式
        .subscribe()

 

conumser1是起作用的消費者, consumer2是備用消費者。假設consumer1收到的5條消息后突然崩了, 那么consumer2接替,成了起作用的消費者。
當然多個消費者都可以訂閱,但是只有第一個是可用,第一個消費者斷開連接后,下一個備用的消費者就起作用了。
Shared(共享):
創建一個消費者,以Exclusive模式訂閱消息,代碼如下:

Consumer consumer1 = client.newConsumer()
    .topic("my-topic")
    .subscriptionName("my-subscription")
    .subscriptionType(SubscriptionType.Shared)  //共享模式
    .subscribe()

Consumer consumer2 = client.newConsumer()
        .topic("my-topic")
        .subscriptionName("my-subscription")
        .subscriptionType(SubscriptionType.Shared)
        .subscribe()

//這兩個消費者都是可用的

 

在共享模式,多個消費者都可以訂閱,消息在多個消費者之間是以輪詢的方式分發。
如果broke同一時間只發送一個消息,那么consume1收到5條消息:

("key-1", "message-1-1")
("key-1", "message-1-3")
("key-2", "message-2-2")
("key-3", "message-3-1")
("key-4", "message-4-1")

 

消費者2收到另外5條消息。
總之,共享模式和其他兩種模式不同,共享模式有更好的靈活性,但是不能保證消息的順序。
Key_share
這是2.4.0版本后新出的訂閱模式,代碼如下:

Consumer consumer1 = client.newConsumer()
    .topic("my-topic")
    .subscriptionName("my-subscription")
    .subscriptionType(SubscriptionType.Key_Shared) //key共享模式
    .subscribe()

Consumer consumer2 = client.newConsumer()
        .topic("my-topic")
        .subscriptionName("my-subscription")
        .subscriptionType(SubscriptionType.Key_Shared)
        .subscribe()

 

KeyShared和Shared模式類似,區別在於KeyShared模式下,具有相同key的消息分發到同一個消費者。
消費者1最后收到5條消息:

("key-1", "message-1-1")
("key-1", "message-1-2")
("key-1", "message-1-3")
("key-3", "message-3-1")
("key-3", "message-3-2")

 

消費者2收到另外5條。

如果該模式下消息的key沒有指定,那么所有的消息默認分發到同一消費者。

3.2.5 Reader接口

使用 reader 接口, Pulsar客戶可以在topic中“手動定位”,從指定的消息開始向前讀取所有消息。Pulsar Java API 可以創建Reader對象,同時指定一個 topic, 一個MessageId ,和ReaderConfiguration。
下面是一個示例:

ReaderConfiguration conf = new ReaderConfiguration();

byte[] msgIdBytes = // 一些消息ID 的字節數組
MessageId id = MessageId.fromByteArray(msgIdBytes);

Reader reader = pulsarClient.newReader()
        .topic(topic)
        .startMessageId(id)
        .create();

while (true) {
    Message message = reader.readNext();
    // 處理消息
}

 

在上面的示例中,實例化一個Reader對象指定的主題和消息(ID); reader將遍歷主題中msgIdBytes(取值方式取決於應用程序) 之后的消息。
上面的示例代碼展示了Reader對象指向特定的消息(ID),但你也可以使用MessageId.earliest來指向topic上最早可用的消息,使用MessageId.latest指向最新的消息。

3.3 Schema

在Pulsar中,所有的消息數據都在字節數組中,消息schema允許在構造和處理消息時使用其他類型的數據(從簡單類型(如String)到更復雜的類型)。如果在不指定schema的情況下構造生產者,則生產者只能生成類型為 byte[]的消息。 下面是一個示例:

Producer<byte[]> producer = client.newProducer()
    .topic(topic)
    .create();

 

以下schema格式目前可用於 Java:

  • 無schema 或者字節數組schema(使用Schema.BYTES) 
    Producer<byte[]> bytesProducer = client.newProducer(Schema.BYTES) .topic("some-raw-bytes-topic") .create();
  • String,UTF-8編碼,使用Schema.STRING 
    Producer<String> stringProducer = client.newProducer(Schema.STRING) .topic("some-string-topic") .create();
  • JSON 模式,創建POJO 
    Schema<MyPojo> pojoSchema = JSONSchema.of(MyPojo.class); Producer<MyPojo> pojoProducer = client.newProducer(pojoSchema) .topic("some-pojo-topic") .create();

     

結語

Pulsar的特性還有很多,這里重點介紹了Java客戶端的快速上手教程,后面有時間的話會繼續更新Pulsar系列。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM