原文鏈接
1、使用前准備
引入依賴:
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.6.1</version>
</dependency>
2、PulsarClient
在嘗試使用Producer和Consumer前,我們先講一下Pulsar客戶端,因為不管是Producer還是Consumer,都是依靠PulsarClient來創建的:
/**
* Pulsar工具類
* @author winfun
**/
public class PulsarUtils {
/**
* 根據serviceUrl創建PulsarClient
* @param serviceUrl 服務地址
* @return 客戶端
* @throws PulsarClientException 異常
*/
public static PulsarClient createPulsarClient(String serviceUrl) throws PulsarClientException {
return PulsarClient.builder()
.serviceUrl(serviceUrl)
.build();
}
}
我們這里簡單使用,只借用ServiceUrl創建客戶端,其實還有很多比較重要的參數,下面稍微列舉一下:
- ioThreads:Set the number of threads to be used for handling connections to brokers (default: 1 thread)
- listenerThreads:Set the number of threads to be used for message listeners (default: 1 thread). 一條線程默認只為一個消費者服務
- enableTcpNoDelay:No-delay features make sure packets are sent out on the network as soon as possible
- ....
3、Producer
Producer這里我們也先簡單使用,只負責往指定Topic發送消息,其他功能不用,例如異步發送、延時發送等
/**
* 初次使用Pulsar生產者,無任何封裝
* @author winfun
**/
public class FirstProducerDemo {
public static void main(String[] args) throws PulsarClientException {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://127.0.0.1:6650")
.build();
ProducerBuilder<String> productBuilder = client.newProducer(Schema.STRING).topic("winfun/study/test-topic")
.blockIfQueueFull(Boolean.TRUE).batchingMaxMessages(100).enableBatching(Boolean.TRUE).sendTimeout(3, TimeUnit.SECONDS);
Producer<String> producer = productBuilder.create();
for (int i = 0; i < 100; i++) {
producer.send("hello"+i);;
}
producer.close();
}
}
4、Consumer
下面我們將比較詳細地介紹消費者的使用方式,因為這里能拓展的東西稍微多一點,下面開始使用旅程。
4.1 第一次使用:
我們利用PulsarClient創建Consumer;接着在死循環中利用Consumer#receive方法接收消息然后進行消費。
/**
* 初次使用Pulsar消費者,無任何封裝
* @author winfun
**/
@Slf4j
public class FirstConsumerDemo {
public static void main(String[] args) throws PulsarClientException {
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://127.0.0.1:6650")
.build();
/**
* The subscribe method will auto subscribe the consumer to the specified topic and subscription.
* One way to make the consumer listen on the topic is to set up a while loop.
* In this example loop, the consumer listens for messages, prints the contents of any received message, and then acknowledges that the message has been processed.
* If the processing logic fails, you can use negative acknowledgement to redeliver the message later.
*/
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("winfun/study/test-topic")
.subscriptionName("my-subscription")
.ackTimeout(10, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
// 死循環接收
while (true){
Message<String> message = consumer.receive();
String msgContent = message.getValue();
log.info("接收到消息: {}",msgContent);
consumer.acknowledge(message);
}
}
}
4.2 第二次使用:
上面我們可以看到,我們是利用死循環來保證及時消費,但是這樣會導致主線程;所以下面我們可以使用Pulsar提供的MessageListener,即監聽器,當消息來了,會回調監聽器指定的方法,從而避免阻塞主線程。
/**
* 使用MessageListener,避免死循環代碼&阻塞主線程
* @author winfun
**/
@Slf4j
public class SecondConsumerDemo {
public static void main(String[] args) throws PulsarClientException {
PulsarClient client = PulsarUtils.createPulsarClient("pulsar://127.0.0.1:6650");
/**
* If you don't want to block your main thread and rather listen constantly for new messages, consider using a MessageListener.
*
*/
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("winfun/study/test-topic")
.subscriptionName("my-subscription")
.ackTimeout(10, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Exclusive)
.messageListener((MessageListener<String>) (consumer1, msg) -> {
/**
* 當接收到一個新的消息,就會回調 MessageListener的receive方法。
* 消息將會保證按順序投放到單個消費者的同一個線程,因此可以保證順序消費
* 除非應用程序或broker崩潰,否則只會為每條消息調用此方法一次
* 應用程序負責調用消費者的確認方法來確認消息已經被消費
* 應用程序負責處理消費消息時可能出現的異常
*/
log.info("接收到消息:{}",msg.getValue());
try {
consumer1.acknowledge(msg);
} catch (PulsarClientException e) {
e.printStackTrace();
}
}).subscribe();
}
}
4.3 第三次使用:
上面利用監聽器來解決死循環代碼和阻塞主線程問題;但是我們可以發現,每次消費都是單線程,當一個消息消費完才能進行下一個消息的消費,這樣會導致消費效率非常的低;
如果如果追求高吞吐量,不在乎消息消費的順序性,那么我們可以接入線程池;一有消息來就丟進線程池中,這樣不但可以支持異步消費,還能保證消費的效率非常的高。
/**
* MessageListener 內使用線程池進行異步消費
* @author winfun
**/
@Slf4j
public class ThirdConsumerDemo {
public static void main(String[] args) throws PulsarClientException {
Executor executor = new ThreadPoolExecutor(
10,
10,
10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100)
);
PulsarClient client = PulsarUtils.createPulsarClient("pulsar://127.0.0.1:6650");
/**
* If you don't want to block your main thread and rather listen constantly for new messages, consider using a MessageListener.
*
*/
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("winfun/study/test-topic")
.subscriptionName("my-subscription")
.ackTimeout(10, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Exclusive)
.messageListener((MessageListener<String>) (consumer1, msg) -> {
/**
* MessageListener還是保證了接收的順序性
* 但是利用線程池進行異步消費后不能保證消費順序性
*/
executor.execute(() -> handleMsg(consumer1, msg));
}).subscribe();
}
/**
* 線程池異步處理
* @param consumer 消費者
* @param msg 消息
*/
public static void handleMsg(Consumer consumer, Message msg){
ThreadUtil.sleep(RandomUtil.randomInt(3),TimeUnit.SECONDS);
log.info("接收到消息:{}",msg.getValue());
try {
consumer.acknowledge(msg);
} catch (PulsarClientException e) {
e.printStackTrace();
}
}
}
4.4 第四次使用:
我們可以發現,在上面的三個例子中,如果在調用Consumer#acknowledge方法前,因為代碼問題導致拋異常了,我們是沒有做處理的,那么會導致消費者會一直重試沒有被確認的消息。
那么我們此時需要接入Pulsar提供的死信隊列:當Consumer消費消息時拋異常,並達到一定的重試次數,則將消息丟入死信隊列;但需要注意的是,單獨使用死信隊列,Consumer的訂閱類型需要是 Shared/Key_Shared;否則不會生效。
/**
* 超過最大重試次數,進入死信隊列
* @author: winfun
**/
@Slf4j
public class FourthConsumerDemo {
public static void main(String[] args) throws PulsarClientException {
/**
* 如果指定了死信隊列策略,但是沒指定死信隊列
* 死信隊列:String.format("%s-%s-DLQ", topic, this.subscription)
* 這里的this.subscription為上面指定的 subscriptionName。
*
* 一般在生產環境,會將pulsar的自動創建topic功能給關閉掉,所以上線前,記得先提工單創建指定的死信隊列。
*
* 重點信息:
* 如果是單單使用死信隊列,subscriptionType為 Shared/Key_Shared,否則死信隊列不生效。
*/
PulsarClient client = PulsarUtils.createPulsarClient("pulsar://127.0.0.1:6650");
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("winfun/study/test-topic")
.subscriptionName("my-subscription")
.receiverQueueSize(100)
.ackTimeout(1, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Key_Shared)
.negativeAckRedeliveryDelay(1,TimeUnit.SECONDS)
.deadLetterPolicy(DeadLetterPolicy.builder()
//可以指定最大重試次數,最大重試三次后,進入到死信隊列
.maxRedeliverCount(3)
//可以指定死信隊列
.deadLetterTopic("winfun/study/test-topic-dlq3")
.build())
.messageListener((MessageListener<String>) (consumer1, msg) -> {
log.info("接收到隊列「{}」消息:{}",msg.getTopicName(),msg.getValue());
if (msg.getValue().equals("hello3")) {
throw new RuntimeException("hello3消息消費失敗!");
}else {
try {
consumer1.acknowledge(msg);
} catch (PulsarClientException e) {
e.printStackTrace();
}
}
}).subscribe();
}
}
4.5 第五次使用:
死信隊列一般是不做消費的,我們會關注死信隊列的情況,從而作出下一步的動作。
而且,一般做消息重試,我們不希望在原Topic中做重試,這樣會影響原有消息的消費進度。
那么我們可以同時使用重試隊列和死信隊列。
當代碼拋出異常時,我們可以捕獲住,然后調用Consumer#reconsumeLater方法,將消息丟入重試隊列;當消息重試指定次數后還無法正常完成消費,即會將消息丟入死信隊列。
/**
* 重試隊列
* @author winfun
**/
@Slf4j
public class FifthConsumerDemo {
public static void main(String[] args) throws PulsarClientException {
PulsarClient client = PulsarUtils.createPulsarClient("pulsar://127.0.0.1:6650");
/**
* 注意點:
* 1、使用死信策略,但是沒有指定重試topic和死信topic名稱
* 死信隊列:String.format("%s-%s-DLQ", topic, this.subscription)
* 重試隊列:String.format("%s-%s-RETRY", topic, this.subscription)
* 這里的this.subscription為上面指定的 subscriptionName。
*
* 2、是否限制訂閱類型
* 同時開啟重試隊列和死信隊列,不限制subscriptionType只能為Shared/Key_Shared;
* 如果只是單獨使用死信隊列,需要限制subscriptionType為Shared
*
* 3、重試原理
* 如果使用重試隊列,需要保證 enableRetry 是開啟的,否則調用 reconsumeLater 方法時會拋異常:org.apache.pulsar.client.api.PulsarClientException: reconsumeLater method not support!
* 如果配置了重試隊列,consumer會同時監聽原topic和重試topic,consumer的實現類對應是:MultiTopicsConsumerImpl
* 如果消費消息時調用了 reconsumeLater 方法,會將此消息丟進重試topic
* 如果在重試topic重試maxRedeliverCount次后都無法正確ack消息,即將消息丟到死信隊列。
* 死信隊列需要另起Consumer進行監聽消費。
*
* 4、直接拋異常
* 如果我們不是業務層面上調用 reconsumeLater 方法來進行重試,而是代碼層面拋異常了,如果subscriptionType不為Shared/Key_Shared,消息無法進入重試隊列和死信隊列,是當前消費者無限在原topic進行消費。
* 而如果如果subscriptionType為Shared/Key_Shared,則消息進行maxRedeliverCount次消費后,會直接進入到死信隊列,此時不會用到重試隊列。
* 因此,重試隊列是僅僅針對 reconsumeLater 方法的,而不針對異常的重試。
*/
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("winfun/study/test-retry-topic")
.subscriptionName("my-subscription")
.receiverQueueSize(100)
.ackTimeout(1, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Exclusive)
.negativeAckRedeliveryDelay(1,TimeUnit.SECONDS)
.enableRetry(true)
.deadLetterPolicy(DeadLetterPolicy.builder()
//可以指定最大重試次數,最大重試三次后,進入到死信隊列
.maxRedeliverCount(3)
.retryLetterTopic("winfun/study/test-retry-topic-retry")
//可以指定死信隊列
.deadLetterTopic("winfun/study/test-retry-topic-dlq")
.build())
.messageListener((MessageListener<String>) (consumer1, msg) -> {
log.info("接收到隊列「{}」消息:{}",msg.getTopicName(),msg.getValue());
if (msg.getValue().equals("hello3")) {
try {
consumer1.reconsumeLater(msg,1,TimeUnit.SECONDS);
} catch (PulsarClientException e) {
e.printStackTrace();
}
//throw new RuntimeException("hello3消息消費失敗!");
}else {
try {
consumer1.acknowledge(msg);
} catch (PulsarClientException e) {
e.printStackTrace();
}
}
}).subscribe();
}
}
重試機制源碼分析
關於重試機制,其實是比較有意思的,下面我們會簡單分析一下源碼。
- 判斷是否開啟重試機制,如果沒有開啟重試機制,則直接拋異常
public void reconsumeLater(Message<?> message, long delayTime, TimeUnit unit) throws PulsarClientException {
// 如果沒開啟重試機制,直接拋異常
if (!this.conf.isRetryEnable()) {
throw new PulsarClientException("reconsumeLater method not support!");
} else {
try {
// 當然了,reconsumeLaterAsync里面也會判斷是否開啟重試機制
this.reconsumeLaterAsync(message, delayTime, unit).get();
} catch (Exception var7) {
Throwable t = var7.getCause();
if (t instanceof PulsarClientException) {
throw (PulsarClientException)t;
} else {
throw new PulsarClientException(t);
}
}
}
}
還有我們可以發現,pulsar很多方法是支持同步和異步的,而同步就是直接調用異步方法,再后調用get()方法進行同步阻塞等待即可。
- 調用 reconsumeLaterAsunc 方法,接着調用 get() 進行同步阻塞等待結果
public CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, long delayTime, TimeUnit unit) {
if (!this.conf.isRetryEnable()) {
return FutureUtil.failedFuture(new PulsarClientException("reconsumeLater method not support!"));
} else {
try {
return this.doReconsumeLater(message, AckType.Individual, Collections.emptyMap(), delayTime, unit);
} catch (NullPointerException var6) {
return FutureUtil.failedFuture(new InvalidMessageException(var6.getMessage()));
}
}
}
- 調用 doReconsumeLater 方法
我們知道,在 Pulsar 的 Consumer 中,可以支持多 Topic 監聽,而如果我們加入了重試機制,默認是同個 Consumer 同時監聽原隊列和重試隊列,所以 Consumer 接口的實現此時為 MultiTopicsConsumerImpl,而不是 ComsumerImpl。
那我們看看 MultiConsumerImpl 的 doReconsumeLater 是如何進行重新消費的:
protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType ackType, Map<String, Long> properties, long delayTime, TimeUnit unit) {
MessageId messageId = message.getMessageId();
Preconditions.checkArgument(messageId instanceof TopicMessageIdImpl);
TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl)messageId;
if (this.getState() != State.Ready) {
return FutureUtil.failedFuture(new PulsarClientException("Consumer already closed"));
} else {
MessageId innerId;
if (ackType == AckType.Cumulative) {
Consumer individualConsumer = (Consumer)this.consumers.get(topicMessageId.getTopicPartitionName());
if (individualConsumer != null) {
innerId = topicMessageId.getInnerMessageId();
return individualConsumer.reconsumeLaterCumulativeAsync(message, delayTime, unit);
} else {
return FutureUtil.failedFuture(new NotConnectedException());
}
} else {
ConsumerImpl<T> consumer = (ConsumerImpl)this.consumers.get(topicMessageId.getTopicPartitionName());
innerId = topicMessageId.getInnerMessageId();
return consumer.doReconsumeLater(message, ackType, properties, delayTime, unit).thenRun(() -> {
this.unAckedMessageTracker.remove(topicMessageId);
});
}
}
}
- 首先判斷客戶端是否為准備狀態
- 接着判斷 AckType 是累計的還是單獨的,如果是累計的話,subscriptionType 一定要是 exclusive
- 不管是累計還是單獨的,最后都是調用 ConsumerImpl 的 doReconsumerLater 方法
protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType ackType, Map<String, Long> properties, long delayTime, TimeUnit unit) {
MessageId messageId = message.getMessageId();
if (messageId instanceof TopicMessageIdImpl) {
messageId = ((TopicMessageIdImpl)messageId).getInnerMessageId();
}
Preconditions.checkArgument(messageId instanceof MessageIdImpl);
if (this.getState() != State.Ready && this.getState() != State.Connecting) {
this.stats.incrementNumAcksFailed();
PulsarClientException exception = new PulsarClientException("Consumer not ready. State: " + this.getState());
if (AckType.Individual.equals(ackType)) {
this.onAcknowledge(messageId, exception);
} else if (AckType.Cumulative.equals(ackType)) {
this.onAcknowledgeCumulative(messageId, exception);
}
return FutureUtil.failedFuture(exception);
} else {
if (delayTime < 0L) {
delayTime = 0L;
}
// 如果 retryLetterProducer 為null,則嘗試創建
if (this.retryLetterProducer == null) {
try {
this.createProducerLock.writeLock().lock();
if (this.retryLetterProducer == null) {
this.retryLetterProducer = this.client.newProducer(this.schema).topic(this.deadLetterPolicy.getRetryLetterTopic()).enableBatching(false).blockIfQueueFull(false).create();
}
} catch (Exception var28) {
log.error("Create retry letter producer exception with topic: {}", this.deadLetterPolicy.getRetryLetterTopic(), var28);
} finally {
this.createProducerLock.writeLock().unlock();
}
}
// 如果 retryLetterProcuder 不為空,則嘗試將消息丟進重試隊列中
if (this.retryLetterProducer != null) {
try {
MessageImpl<T> retryMessage = null;
String originMessageIdStr = null;
String originTopicNameStr = null;
if (message instanceof TopicMessageImpl) {
retryMessage = (MessageImpl)((TopicMessageImpl)message).getMessage();
originMessageIdStr = ((TopicMessageIdImpl)message.getMessageId()).getInnerMessageId().toString();
originTopicNameStr = ((TopicMessageIdImpl)message.getMessageId()).getTopicName();
} else if (message instanceof MessageImpl) {
retryMessage = (MessageImpl)message;
originMessageIdStr = ((MessageImpl)message).getMessageId().toString();
originTopicNameStr = ((MessageImpl)message).getTopicName();
}
SortedMap<String, String> propertiesMap = new TreeMap();
int reconsumetimes = 1;
if (message.getProperties() != null) {
propertiesMap.putAll(message.getProperties());
}
// 如果包含 RECONSUMETIMES,則最遞增
if (propertiesMap.containsKey("RECONSUMETIMES")) {
reconsumetimes = Integer.valueOf((String)propertiesMap.get("RECONSUMETIMES"));
++reconsumetimes;
// 否則先加入「原始隊列」和「原始messageId」信息
} else {
propertiesMap.put("REAL_TOPIC", originTopicNameStr);
propertiesMap.put("ORIGIN_MESSAGE_IDY_TIME", originMessageIdStr);
}
// 加入重試次數信息
propertiesMap.put("RECONSUMETIMES", String.valueOf(reconsumetimes));
// 加入延時時間信息
propertiesMap.put("DELAY_TIME", String.valueOf(unit.toMillis(delayTime)));
TypedMessageBuilder typedMessageBuilderNew;
// 判斷是否超過最大重試次數,如果還未超過,則重新投放到重試隊列
if (reconsumetimes <= this.deadLetterPolicy.getMaxRedeliverCount()) {
typedMessageBuilderNew = this.retryLetterProducer.newMessage().value(retryMessage.getValue()).properties(propertiesMap);
if (delayTime > 0L) {
typedMessageBuilderNew.deliverAfter(delayTime, unit);
}
if (message.hasKey()) {
typedMessageBuilderNew.key(message.getKey());
}
// 發送延時消息
typedMessageBuilderNew.send();
// 確認當前消息
return this.doAcknowledge(messageId, ackType, properties, (TransactionImpl)null);
}
// 先忽略
this.processPossibleToDLQ((MessageIdImpl)messageId);
// 判斷 deadLetterProducer 是否為null,如果為null,嘗試創建
if (this.deadLetterProducer == null) {
try {
if (this.deadLetterProducer == null) {
this.createProducerLock.writeLock().lock();
this.deadLetterProducer = this.client.newProducer(this.schema).topic(this.deadLetterPolicy.getDeadLetterTopic()).blockIfQueueFull(false).create();
}
} catch (Exception var25) {
log.error("Create dead letter producer exception with topic: {}", this.deadLetterPolicy.getDeadLetterTopic(), var25);
} finally {
this.createProducerLock.writeLock().unlock();
}
}
// 如果 deadLetterProducer 不為null
if (this.deadLetterProducer != null) {
// 加入「原始隊列」信息
propertiesMap.put("REAL_TOPIC", originTopicNameStr);
// 加入「原始MessageId」信息
propertiesMap.put("ORIGIN_MESSAGE_IDY_TIME", originMessageIdStr);
typedMessageBuilderNew = this.deadLetterProducer.newMessage().value(retryMessage.getValue()).properties(propertiesMap);
// 將消息內容發往死信隊列中
typedMessageBuilderNew.send();
// 確認當前消息
return this.doAcknowledge(messageId, ackType, properties, (TransactionImpl)null);
}
} catch (Exception var27) {
log.error("Send to retry letter topic exception with topic: {}, messageId: {}", new Object[]{this.deadLetterProducer.getTopic(), messageId, var27});
Set<MessageId> messageIds = new HashSet();
messageIds.add(messageId);
this.unAckedMessageTracker.remove(messageId);
this.redeliverUnacknowledgedMessages(messageIds);
}
}
return CompletableFuture.completedFuture((Object)null);
}
}
分析了一波,我們可以看到和上面代碼的注釋描述的基本一致。
4.6 第六次使用
上面我們提到,當Consumer指定了重試隊列,Consumer會同時監聽原Topic和重試Topic,那么如果我們想多個Consumer消費重試Topic時,需要將Consumer的訂閱類型指定為 Shared/Key_Shared,讓重試隊列支持多Consumer監聽消費,提升重試隊列的消費效率。
/**
* 重試隊列-Shared
* @author winfun
**/
@Slf4j
public class SixthConsumerDemo {
public static void main(String[] args) throws PulsarClientException {
PulsarClient client = PulsarUtils.createPulsarClient("pulsar://127.0.0.1:6650");
/**
* 因為如果指定了重試策略,Consumer會同時監聽「原隊列」和「重試隊列」
* 即如果我們想「重試隊列」可以讓多個 Consumer 監聽,從而提高消費能力,那么 Consumer 需指定為 Shared 模式。
*/
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("winfun/study/test-retry-topic")
.subscriptionName("my-subscription")
.receiverQueueSize(100)
.ackTimeout(1, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared)
.negativeAckRedeliveryDelay(1,TimeUnit.SECONDS)
.enableRetry(true)
.deadLetterPolicy(DeadLetterPolicy.builder()
//可以指定最大重試次數,最大重試三次后,進入到死信隊列
.maxRedeliverCount(3)
.retryLetterTopic("winfun/study/test-retry-topic-retry")
//可以指定死信隊列
.deadLetterTopic("winfun/study/test-retry-topic-dlq")
.build())
.messageListener((MessageListener<String>) (consumer1, msg) -> {
log.info("接收到隊列「{}」消息:{}",msg.getTopicName(),msg.getValue());
if (msg.getValue().contains("1") || msg.getValue().contains("2") || msg.getValue().contains("3")) {
try {
consumer1.reconsumeLater(msg,1,TimeUnit.SECONDS);
} catch (PulsarClientException e) {
e.printStackTrace();
}
//throw new RuntimeException("hello3消息消費失敗!");
}else {
try {
consumer1.acknowledge(msg);
} catch (PulsarClientException e) {
e.printStackTrace();
}
}
}).subscribe();
}
}
/**
* 監聽重試隊列-Shared訂閱模式
* @author winfun
**/
@Slf4j
public class RetryConsumerDemo {
public static void main(String[] args) throws PulsarClientException {
PulsarClient client = PulsarUtils.createPulsarClient("pulsar://127.0.0.1:6650");
Consumer<String> deadLetterConsumer = client.newConsumer(Schema.STRING)
.topic("winfun/study/test-retry-topic-retry")
.subscriptionName("my-subscription2")
.receiverQueueSize(100)
.ackTimeout(1, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared)
.messageListener((MessageListener<String>) (consumer1, msg) -> {
log.info("接收到隊列「{}」消息:{}",msg.getTopicName(),msg.getValue());
try {
consumer1.acknowledge(msg);
} catch (PulsarClientException e) {
e.printStackTrace();
}
}).subscribe();
}
}
到此,我們已經將Consmuer的幾種使用方式都嘗試了一遍,可以說基本包含了常用的操作;但是我們可以發現,如果我們每次新建一個Consumer都需要寫一堆同樣的代碼,那其實挺麻煩的,又不好看;並且,現在我們大部分項目都是基於 SpringBoot 來做的,而 SpringBoot 也沒有一個比較大眾的Starter。
所以接下來的計划就是,自己寫一個編寫一個關於Pulsar的SpringBoot Starter,這個組件不會特別復雜,但是會支持 Producer 和 Cousnmer 的自動配置,並且支持 Consumer 上面提到的幾個點:MessageListener 監聽、線程池異步並發消費、重試機制等。