四種訂閱模式
獨占Exclusive
在獨占模式下,只允許一個消費者附加到訂閱上。如果多個消費者使用同一個訂閱來訂閱一個主題,就會發生錯誤。在下圖中,只有消費者A-0被允許消費消息。
此模式也是默認模式
Demo
subscriptionName相同,同時執行exclusive/comsumer1和exclusive/comsumer2就會報錯
package com.project.pulsar.delayMsgAndSubscriptionsModel;
import com.project.pulsar.conf.PulsarConf;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.HashSet;
import java.util.Set;
/**
* 獨占模式
*/
@RestController
public class ExclusiveMessageController {
@Autowired
PulsarConf pulsarConf;
String tenantName = "delay";//調用接口創建的租戶,創建方法見com.project.pulsar.persistent.PersistentController.java
String namespace = "mySpace";//調用接口創建的命名空間,創建方法見com.project.pulsar.persistent.PersistentController.java
/**
* 生產消息
*
* @param msg
* @throws PulsarClientException
*/
@GetMapping("/exclusive/sendMsg")
public MessageId sendMsg(String msg) throws PulsarClientException {
PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
String topic = "persistent://" + tenantName + "/" + namespace + "/my-topic";
//持久化 租戶 命名空間 主題
Producer<byte[]> producer1 = pulsarFactory.newProducer()
.topic(topic)
.create();
return producer1.send(msg.getBytes());
}
/**
* 手動執行獲取消息
*
* @throws PulsarClientException
*/
@GetMapping("/exclusive/comsumer")
public void comsumerByArtificial() throws PulsarClientException {
PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
String topic = "persistent://" + tenantName + "/" + namespace + "/my-topic";
//持久化 租戶 命名空間 主題
Consumer<byte[]> consumer = pulsarFactory.newConsumer()
.topic(topic)
.subscriptionName("my-subscription")//必須名稱一致,才可觸發下一行,否則就是兩個不同的消費者
.subscriptionType(SubscriptionType.Exclusive)//指定模式
.subscribe();
Message<byte[]> receive = consumer.receive();
System.out.println(new String(receive.getData()));
consumer.acknowledge(receive);//確認消息被消費
consumer.close();
}
/**
* 手動執行獲取消息
*
* @throws PulsarClientException
*/
@GetMapping("/exclusive/comsumer2")
public void comsumerByArtificial2() throws PulsarClientException {
PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
String topic = "persistent://" + tenantName + "/" + namespace + "/my-topic";
//持久化 租戶 命名空間 主題
Consumer<byte[]> consumer = pulsarFactory.newConsumer()
.topic(topic)
.subscriptionName("my-subscription")//必須名稱一致,才可觸發下一行,否則就是兩個不同的消費者
.subscriptionType(SubscriptionType.Exclusive)//指定模式
.subscribe();
Message<byte[]> receive = consumer.receive();
System.out.println(new String(receive.getData()));
consumer.acknowledge(receive);//確認消息被消費
consumer.close();
}
}
代碼下載
代碼見此delayMsgAndSubscriptionsModel包下ExclusiveMessageController.java
災備Failover
在故障轉移模式中,多個消費者可以附加到同一個訂閱。為非分區主題或分區主題的每個分區挑選一個主消費者並接收消息。當主消費者斷開連接時,所有(未確認的和后續的)消息都被傳遞給排在后面的消費者。對於分區主題,經紀人將按優先級和消費者名稱的詞匯順序對消費者進行排序。對於非分區主題,經紀人將按照消費者訂閱非分區主題的順序挑選消費者。在下圖中,消費者B-0是主消費者,而如果消費者B-0斷開連接,消費者B-1將是排在后面的消費者接收消息。
Demo
package com.project.pulsar.delayMsgAndSubscriptionsModel;
import com.project.pulsar.conf.PulsarConf;
import org.apache.pulsar.client.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 災備模式
*/
@RestController
public class FailoverMessageController {
@Autowired
PulsarConf pulsarConf;
String tenantName = "delay";//調用接口創建的租戶,創建方法見com.project.pulsar.persistent.PersistentController.java
String namespace = "mySpace";//調用接口創建的命名空間,創建方法見com.project.pulsar.persistent.PersistentController.java
/**
* 生產消息
*
* @param msg
* @throws PulsarClientException
*/
@GetMapping("/failover/sendMsg")
public MessageId sendMsg(String msg) throws PulsarClientException {
PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
String topic = "persistent://" + tenantName + "/" + namespace + "/my-topic";
//持久化 租戶 命名空間 主題
Producer<byte[]> producer1 = pulsarFactory.newProducer()
.topic(topic)
.create();
return producer1.send(msg.getBytes());
}
/**
* 手動執行獲取消息
*
* @throws PulsarClientException
*/
@GetMapping("/failover/comsumer")
public void comsumerByArtificial() throws PulsarClientException {
PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
String topic = "persistent://" + tenantName + "/" + namespace + "/my-topic";
//持久化 租戶 命名空間 主題
Consumer<byte[]> consumer = pulsarFactory.newConsumer()
.topic(topic)
.subscriptionName("my-subscription")//必須名稱一致,才可觸發下一行,否則就是兩個不同的消費者
.subscriptionType(SubscriptionType.Failover)//指定模式
.subscribe();
Message<byte[]> receive = consumer.receive();
System.out.println(new String(receive.getData()));
consumer.acknowledge(receive);//確認消息被消費
consumer.close();
}
/**
* 手動執行獲取消息
*
* @throws PulsarClientException
*/
@GetMapping("/failover/comsumer2")
public void comsumerByArtificial2() throws PulsarClientException {
PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
String topic = "persistent://" + tenantName + "/" + namespace + "/my-topic";
//持久化 租戶 命名空間 主題
Consumer<byte[]> consumer = pulsarFactory.newConsumer()
.topic(topic)
.subscriptionName("my-subscription")//必須名稱一致,才可觸發下一行,否則就是兩個不同的消費者
.subscriptionType(SubscriptionType.Failover)//指定模式
.subscribe();
Message<byte[]> receive = consumer.receive();
System.out.println(new String(receive.getData()));
consumer.acknowledge(receive);//確認消息被消費
consumer.close();
}
}
代碼下載
代碼見此delayMsgAndSubscriptionsModel包下FailoverMessageController.java
共享Shared
在共享或循環模式下,多個使用者可以附加到同一訂閱。消息在消費者之間以循環分發的方式傳遞,任何給定的消息只傳遞給一個消費者。當消費者斷開連接時,發送給它但未確認的所有消息將重新安排發送給其余消費者。
在下圖中,Consumer-C-1和Consumer-C-2可以訂閱該主題,但Consumer-C-3和其他人也可以訂閱。
Demo
package com.project.pulsar.delayMsgAndSubscriptionsModel;
import com.project.pulsar.conf.PulsarConf;
import org.apache.pulsar.client.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 共享模式
*/
@RestController
public class ShareMessageController {
@Autowired
PulsarConf pulsarConf;
String tenantName = "delay";//調用接口創建的租戶,創建方法見com.project.pulsar.persistent.PersistentController.java
String namespace = "mySpace";//調用接口創建的命名空間,創建方法見com.project.pulsar.persistent.PersistentController.java
/**
* 生產消息
*
* @param msg
* @throws PulsarClientException
*/
@GetMapping("/share/sendMsg")
public MessageId sendMsg(String msg) throws PulsarClientException {
PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
String topic = "persistent://" + tenantName + "/" + namespace + "/my-topic";
//持久化 租戶 命名空間 主題
Producer<byte[]> producer1 = pulsarFactory.newProducer()
.topic(topic)
.create();
return producer1.send(msg.getBytes());
}
/**
* 手動執行獲取消息
*
* @throws PulsarClientException
*/
@GetMapping("/share/comsumer")
public void comsumerByArtificial() throws PulsarClientException {
PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
String topic = "persistent://" + tenantName + "/" + namespace + "/my-topic";
//持久化 租戶 命名空間 主題
Consumer<byte[]> consumer = pulsarFactory.newConsumer()
.topic(topic)
.subscriptionName("my-subscription")//必須名稱一致,才可觸發下一行,否則就是兩個不同的消費者
.subscriptionType(SubscriptionType.Shared)//指定模式
.subscribe();
Message<byte[]> receive = consumer.receive();
System.out.println(new String(receive.getData()));
consumer.acknowledge(receive);//確認消息被消費
consumer.close();
}
/**
* 手動執行獲取消息
*
* @throws PulsarClientException
*/
@GetMapping("/share/comsumer2")
public void comsumerByArtificial2() throws PulsarClientException {
PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
String topic = "persistent://" + tenantName + "/" + namespace + "/my-topic";
//持久化 租戶 命名空間 主題
Consumer<byte[]> consumer = pulsarFactory.newConsumer()
.topic(topic)
.subscriptionName("my-subscription")//必須名稱一致,才可觸發下一行,否則就是兩個不同的消費者
.subscriptionType(SubscriptionType.Shared)//指定模式
.subscribe();
Message<byte[]> receive = consumer.receive();
System.out.println(new String(receive.getData()));
consumer.acknowledge(receive);//確認消息被消費
consumer.close();
}
}
代碼下載
代碼見此delayMsgAndSubscriptionsModel包下ShareMessageController.java
按照Key共享Key_Shared
在Key_Shared模式下,多個消費者可以附加到同一個訂閱。消息在消費者之間分布傳遞,具有相同密鑰或相同訂購密鑰的消息只傳遞給一個消費者。無論消息被重新傳遞多少次,它都會被傳遞給同一個消費者。當一個消費者連接或斷開連接時,將導致所服務的消費者對消息的某些鍵進行改變。
當您使用Key_Shared模式時,Key_Shared模式的局限性,應注意:您需要指定鍵或orderingKey的消息。你不能使用與Key_Shared模式的累積確認。你的生產者應禁用批處理或使用一個基於批處理生成器。
Demo
package com.project.pulsar.delayMsgAndSubscriptionsModel;
import com.project.pulsar.conf.PulsarConf;
import org.apache.pulsar.client.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 共享模式
*/
@RestController
public class KeyShareMessageController {
@Autowired
PulsarConf pulsarConf;
String tenantName = "delay";//調用接口創建的租戶,創建方法見com.project.pulsar.persistent.PersistentController.java
String namespace = "mySpace";//調用接口創建的命名空間,創建方法見com.project.pulsar.persistent.PersistentController.java
/**
* 生產消息
*
* @param msg
* @throws PulsarClientException
*/
@GetMapping("/keyShare/sendMsg")
public MessageId sendMsg(String msg) throws PulsarClientException {
PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
String topic = "persistent://" + tenantName + "/" + namespace + "/my-topic";
//持久化 租戶 命名空間 主題
Producer<byte[]> producer1 = pulsarFactory.newProducer()
.topic(topic)
.create();
return producer1.send(msg.getBytes());
}
@Bean
public void comsumerByListener1() throws PulsarClientException {
String topic = "persistent://" + tenantName + "/" + namespace + "/my-topic";
MessageListener myMessageListener = (consumer, msg) -> {
try {
System.out.println("comsumerByListener1: " + new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
};
PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
pulsarFactory.newConsumer()
.topic(topic)
.subscriptionName("my-subscriptionByListener")//必須名稱一致,才可觸發下一行,否則就是兩個不同的消費者
.subscriptionType(SubscriptionType.Key_Shared)
.messageListener(myMessageListener)
.subscribe();
}
@Bean
public void comsumerByListener2() throws PulsarClientException {
String topic = "persistent://" + tenantName + "/" + namespace + "/my-topic";
MessageListener myMessageListener = (consumer, msg) -> {
try {
System.out.println("comsumerByListener2: " + new String(msg.getData()));
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg);
}
};
PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
pulsarFactory.newConsumer()
.topic(topic)
.subscriptionName("my-subscriptionByListener")//必須名稱一致,才可觸發下一行,否則就是兩個不同的消費者
.subscriptionType(SubscriptionType.Key_Shared)
.messageListener(myMessageListener)
.subscribe();
}
}
代碼下載
代碼見此delayMsgAndSubscriptionsModel包下KeyShareMessageController.java
延時消息
僅對Share和KeyShare模式有效,其他模式均為立即發送
延時消息有兩種模式:
-
指定時長后發送
deliverAfter(3L, TimeUnit.MINUTES)
-
指定時間發送
deliverAt(LocalDateTime.of(2021, 9, 20, 16, 20).toInstant(ZoneOffset.of("+8")).toEpochMilli() )
Demo
package com.project.pulsar.delayMsgAndSubscriptionsModel;
import com.project.pulsar.conf.PulsarConf;
import org.apache.pulsar.client.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.TimeUnit;
/**
* 延時消息
*/
@RestController
public class DelayMessageController {
@Autowired
PulsarConf pulsarConf;
String tenantName = "delay";//調用接口創建的租戶,創建方法見com.project.pulsar.persistent.PersistentController.java
String namespace = "mySpace";//調用接口創建的命名空間,創建方法見com.project.pulsar.persistent.PersistentController.java
/**
* 生產消息
*
* @param msg
* @throws PulsarClientException
*/
@GetMapping("/delay/sendMsg")
public void sendMsg(String msg) throws PulsarClientException {
PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
String topic = "persistent://" + tenantName + "/" + namespace + "/my-delay-topic";
//持久化 租戶 命名空間 主題
Producer<byte[]> producer = pulsarFactory.newProducer()
.topic(topic)
.create();
producer.newMessage().deliverAfter(2L, TimeUnit.MINUTES).key("key-1").value("message-1-1".getBytes()).send();
// // MessageId send = producer1.newMessage().deliverAfter(3L, TimeUnit.MINUTES).value("Hello Pulsar!".getBytes()).send();//指定消息在X時分秒后執行
// MessageId send =producer1.newMessage().deliverAt(
// LocalDateTime.of(2021, 9, 20, 16, 20).toInstant(ZoneOffset.of("+8")).toEpochMilli()
// ).value(msg.getBytes()).send();//指定消息在2021/09/20 15:52分發送,如發送時日期已過,會立刻發送
}
/**
* 手動執行獲取消息
*
* @throws PulsarClientException
*/
@GetMapping("/delay/comsumer")
public void comsumerByArtificial() throws PulsarClientException {
PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
String topic = "persistent://" + tenantName + "/" + namespace + "/my-delay-topic";
//持久化 租戶 命名空間 主題
Consumer<byte[]> consumer = pulsarFactory.newConsumer()
.topic(topic)
.subscriptionName("my-subscription")//必須名稱一致,才可觸發下一行,否則就是兩個不同的消費者
.subscriptionType(SubscriptionType.Key_Shared)//指定模式
.subscribe();
Message<byte[]> receive = consumer.receive();
System.out.println(new String(receive.getData()));
consumer.acknowledge(receive);//確認消息被消費
consumer.close();
}
/**
* 手動執行獲取消息
*
* @throws PulsarClientException
*/
@GetMapping("/delay/comsumer2")
public void comsumerByArtificial2() throws PulsarClientException {
PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
String topic = "persistent://" + tenantName + "/" + namespace + "/my-delay-topic";
//持久化 租戶 命名空間 主題
Consumer<byte[]> consumer = pulsarFactory.newConsumer()
.topic(topic)
.subscriptionName("my-subscription")//必須名稱一致,才可觸發下一行,否則就是兩個不同的消費者
.subscriptionType(SubscriptionType.Key_Shared)//指定模式
.subscribe();
Message<byte[]> receive = consumer.receive();
System.out.println(new String(receive.getData()));
consumer.acknowledge(receive);//確認消息被消費
consumer.close();
}
}
代碼下載
代碼見此delayMsgAndSubscriptionsModel包下DelayMessageController.java
Pulsar還有很多特性,寫不過來了,以后用到再總結,有問題就官網、Github、谷歌、百度