5.四種訂閱模式、延時消息


四種訂閱模式

獨占Exclusive

在獨占模式下,只允許一個消費者附加到訂閱上。如果多個消費者使用同一個訂閱來訂閱一個主題,就會發生錯誤。在下圖中,只有消費者A-0被允許消費消息。

此模式也是默認模式

Exclusive subscriptions

Demo

subscriptionName相同,同時執行exclusive/comsumer1和exclusive/comsumer2就會報錯

package com.project.pulsar.delayMsgAndSubscriptionsModel;

import com.project.pulsar.conf.PulsarConf;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.HashSet;
import java.util.Set;

/**
 * 獨占模式
 */
@RestController
public class ExclusiveMessageController {
    @Autowired
    PulsarConf pulsarConf;

    String tenantName = "delay";//調用接口創建的租戶,創建方法見com.project.pulsar.persistent.PersistentController.java
    String namespace = "mySpace";//調用接口創建的命名空間,創建方法見com.project.pulsar.persistent.PersistentController.java

    /**
     * 生產消息
     *
     * @param msg
     * @throws PulsarClientException
     */
    @GetMapping("/exclusive/sendMsg")
    public MessageId sendMsg(String msg) throws PulsarClientException {
        PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
        String topic = "persistent://" + tenantName + "/" + namespace + "/my-topic";
        //持久化         租戶   命名空間   主題
        Producer<byte[]> producer1 = pulsarFactory.newProducer()
                .topic(topic)
                .create();

        return producer1.send(msg.getBytes());
    }


    /**
     * 手動執行獲取消息
     *
     * @throws PulsarClientException
     */
    @GetMapping("/exclusive/comsumer")
    public void comsumerByArtificial() throws PulsarClientException {
        PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
        String topic = "persistent://" + tenantName + "/" + namespace + "/my-topic";
        //持久化         租戶   命名空間   主題
        Consumer<byte[]> consumer = pulsarFactory.newConsumer()
                .topic(topic)
                .subscriptionName("my-subscription")//必須名稱一致,才可觸發下一行,否則就是兩個不同的消費者
                .subscriptionType(SubscriptionType.Exclusive)//指定模式
                .subscribe();
        Message<byte[]> receive = consumer.receive();
        System.out.println(new String(receive.getData()));
        consumer.acknowledge(receive);//確認消息被消費
        consumer.close();
    }

    /**
     * 手動執行獲取消息
     *
     * @throws PulsarClientException
     */
    @GetMapping("/exclusive/comsumer2")
    public void comsumerByArtificial2() throws PulsarClientException {
        PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
        String topic = "persistent://" + tenantName + "/" + namespace + "/my-topic";
        //持久化         租戶   命名空間   主題
        Consumer<byte[]> consumer = pulsarFactory.newConsumer()
                .topic(topic)
                .subscriptionName("my-subscription")//必須名稱一致,才可觸發下一行,否則就是兩個不同的消費者
                .subscriptionType(SubscriptionType.Exclusive)//指定模式
                .subscribe();
        Message<byte[]> receive = consumer.receive();
        System.out.println(new String(receive.getData()));
        consumer.acknowledge(receive);//確認消息被消費
        consumer.close();
    }
}

代碼下載

代碼見此delayMsgAndSubscriptionsModel包下ExclusiveMessageController.java

災備Failover

在故障轉移模式中,多個消費者可以附加到同一個訂閱。為非分區主題或分區主題的每個分區挑選一個主消費者並接收消息。當主消費者斷開連接時,所有(未確認的和后續的)消息都被傳遞給排在后面的消費者。對於分區主題,經紀人將按優先級和消費者名稱的詞匯順序對消費者進行排序。對於非分區主題,經紀人將按照消費者訂閱非分區主題的順序挑選消費者。在下圖中,消費者B-0是主消費者,而如果消費者B-0斷開連接,消費者B-1將是排在后面的消費者接收消息。

Failover subscriptions

Demo

package com.project.pulsar.delayMsgAndSubscriptionsModel;

import com.project.pulsar.conf.PulsarConf;
import org.apache.pulsar.client.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 災備模式
 */
@RestController
public class FailoverMessageController {
    @Autowired
    PulsarConf pulsarConf;

    String tenantName = "delay";//調用接口創建的租戶,創建方法見com.project.pulsar.persistent.PersistentController.java
    String namespace = "mySpace";//調用接口創建的命名空間,創建方法見com.project.pulsar.persistent.PersistentController.java

    /**
     * 生產消息
     *
     * @param msg
     * @throws PulsarClientException
     */
    @GetMapping("/failover/sendMsg")
    public MessageId sendMsg(String msg) throws PulsarClientException {
        PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
        String topic = "persistent://" + tenantName + "/" + namespace + "/my-topic";
        //持久化         租戶   命名空間   主題
        Producer<byte[]> producer1 = pulsarFactory.newProducer()
                .topic(topic)
                .create();

        return producer1.send(msg.getBytes());
    }


    /**
     * 手動執行獲取消息
     *
     * @throws PulsarClientException
     */
    @GetMapping("/failover/comsumer")
    public void comsumerByArtificial() throws PulsarClientException {
        PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
        String topic = "persistent://" + tenantName + "/" + namespace + "/my-topic";
        //持久化         租戶   命名空間   主題
        Consumer<byte[]> consumer = pulsarFactory.newConsumer()
                .topic(topic)
                .subscriptionName("my-subscription")//必須名稱一致,才可觸發下一行,否則就是兩個不同的消費者
                .subscriptionType(SubscriptionType.Failover)//指定模式
                .subscribe();
        Message<byte[]> receive = consumer.receive();
        System.out.println(new String(receive.getData()));
        consumer.acknowledge(receive);//確認消息被消費
        consumer.close();
    }

    /**
     * 手動執行獲取消息
     *
     * @throws PulsarClientException
     */
    @GetMapping("/failover/comsumer2")
    public void comsumerByArtificial2() throws PulsarClientException {
        PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
        String topic = "persistent://" + tenantName + "/" + namespace + "/my-topic";
        //持久化         租戶   命名空間   主題
        Consumer<byte[]> consumer = pulsarFactory.newConsumer()
                .topic(topic)
                .subscriptionName("my-subscription")//必須名稱一致,才可觸發下一行,否則就是兩個不同的消費者
                .subscriptionType(SubscriptionType.Failover)//指定模式
                .subscribe();
        Message<byte[]> receive = consumer.receive();
        System.out.println(new String(receive.getData()));
        consumer.acknowledge(receive);//確認消息被消費
        consumer.close();
    }
}

代碼下載

代碼見此delayMsgAndSubscriptionsModel包下FailoverMessageController.java

共享Shared

在共享或循環模式下,多個使用者可以附加到同一訂閱。消息在消費者之間以循環分發的方式傳遞,任何給定的消息只傳遞給一個消費者。當消費者斷開連接時,發送給它但未確認的所有消息將重新安排發送給其余消費者。

在下圖中,Consumer-C-1和Consumer-C-2可以訂閱該主題,但Consumer-C-3和其他人也可以訂閱。

Shared subscriptions

Demo

package com.project.pulsar.delayMsgAndSubscriptionsModel;

import com.project.pulsar.conf.PulsarConf;
import org.apache.pulsar.client.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 共享模式
 */
@RestController
public class ShareMessageController {
    @Autowired
    PulsarConf pulsarConf;

    String tenantName = "delay";//調用接口創建的租戶,創建方法見com.project.pulsar.persistent.PersistentController.java
    String namespace = "mySpace";//調用接口創建的命名空間,創建方法見com.project.pulsar.persistent.PersistentController.java

    /**
     * 生產消息
     *
     * @param msg
     * @throws PulsarClientException
     */
    @GetMapping("/share/sendMsg")
    public MessageId sendMsg(String msg) throws PulsarClientException {
        PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
        String topic = "persistent://" + tenantName + "/" + namespace + "/my-topic";
        //持久化         租戶   命名空間   主題
        Producer<byte[]> producer1 = pulsarFactory.newProducer()
                .topic(topic)
                .create();

        return producer1.send(msg.getBytes());
    }


    /**
     * 手動執行獲取消息
     *
     * @throws PulsarClientException
     */
    @GetMapping("/share/comsumer")
    public void comsumerByArtificial() throws PulsarClientException {
        PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
        String topic = "persistent://" + tenantName + "/" + namespace + "/my-topic";
        //持久化         租戶   命名空間   主題
        Consumer<byte[]> consumer = pulsarFactory.newConsumer()
                .topic(topic)
                .subscriptionName("my-subscription")//必須名稱一致,才可觸發下一行,否則就是兩個不同的消費者
                .subscriptionType(SubscriptionType.Shared)//指定模式
                .subscribe();
        Message<byte[]> receive = consumer.receive();
        System.out.println(new String(receive.getData()));
        consumer.acknowledge(receive);//確認消息被消費
        consumer.close();
    }

    /**
     * 手動執行獲取消息
     *
     * @throws PulsarClientException
     */
    @GetMapping("/share/comsumer2")
    public void comsumerByArtificial2() throws PulsarClientException {
        PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
        String topic = "persistent://" + tenantName + "/" + namespace + "/my-topic";
        //持久化         租戶   命名空間   主題
        Consumer<byte[]> consumer = pulsarFactory.newConsumer()
                .topic(topic)
                .subscriptionName("my-subscription")//必須名稱一致,才可觸發下一行,否則就是兩個不同的消費者
                .subscriptionType(SubscriptionType.Shared)//指定模式
                .subscribe();
        Message<byte[]> receive = consumer.receive();
        System.out.println(new String(receive.getData()));
        consumer.acknowledge(receive);//確認消息被消費
        consumer.close();
    }
}

代碼下載

代碼見此delayMsgAndSubscriptionsModel包下ShareMessageController.java

按照Key共享Key_Shared

在Key_Shared模式下,多個消費者可以附加到同一個訂閱。消息在消費者之間分布傳遞,具有相同密鑰或相同訂購密鑰的消息只傳遞給一個消費者。無論消息被重新傳遞多少次,它都會被傳遞給同一個消費者。當一個消費者連接或斷開連接時,將導致所服務的消費者對消息的某些鍵進行改變。

當您使用Key_Shared模式時,Key_Shared模式的局限性,應注意:您需要指定鍵或orderingKey的消息。你不能使用與Key_Shared模式的累積確認。你的生產者應禁用批處理或使用一個基於批處理生成器。

Key_Shared subscriptions

Demo

package com.project.pulsar.delayMsgAndSubscriptionsModel;

import com.project.pulsar.conf.PulsarConf;
import org.apache.pulsar.client.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 共享模式
 */
@RestController
public class KeyShareMessageController {
    @Autowired
    PulsarConf pulsarConf;

    String tenantName = "delay";//調用接口創建的租戶,創建方法見com.project.pulsar.persistent.PersistentController.java
    String namespace = "mySpace";//調用接口創建的命名空間,創建方法見com.project.pulsar.persistent.PersistentController.java

    /**
     * 生產消息
     *
     * @param msg
     * @throws PulsarClientException
     */
    @GetMapping("/keyShare/sendMsg")
    public MessageId sendMsg(String msg) throws PulsarClientException {
        PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
        String topic = "persistent://" + tenantName + "/" + namespace + "/my-topic";
        //持久化         租戶   命名空間   主題
        Producer<byte[]> producer1 = pulsarFactory.newProducer()
                .topic(topic)
                .create();

        return producer1.send(msg.getBytes());
    }



    @Bean
    public void comsumerByListener1() throws PulsarClientException {
        String topic = "persistent://" + tenantName + "/" + namespace + "/my-topic";
        MessageListener myMessageListener = (consumer, msg) -> {
            try {
                System.out.println("comsumerByListener1: " + new String(msg.getData()));
                consumer.acknowledge(msg);
            } catch (Exception e) {
                consumer.negativeAcknowledge(msg);
            }
        };
        PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
        pulsarFactory.newConsumer()
                .topic(topic)
                .subscriptionName("my-subscriptionByListener")//必須名稱一致,才可觸發下一行,否則就是兩個不同的消費者
                .subscriptionType(SubscriptionType.Key_Shared)
                .messageListener(myMessageListener)
                .subscribe();
    }

    @Bean
    public void comsumerByListener2() throws PulsarClientException {
        String topic = "persistent://" + tenantName + "/" + namespace + "/my-topic";
        MessageListener myMessageListener = (consumer, msg) -> {
            try {
                System.out.println("comsumerByListener2: " + new String(msg.getData()));
                consumer.acknowledge(msg);
            } catch (Exception e) {
                consumer.negativeAcknowledge(msg);
            }
        };
        PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
        pulsarFactory.newConsumer()
                .topic(topic)
                .subscriptionName("my-subscriptionByListener")//必須名稱一致,才可觸發下一行,否則就是兩個不同的消費者
                .subscriptionType(SubscriptionType.Key_Shared)
                .messageListener(myMessageListener)
                .subscribe();
    }
}

代碼下載

代碼見此delayMsgAndSubscriptionsModel包下KeyShareMessageController.java

延時消息

僅對Share和KeyShare模式有效,其他模式均為立即發送

延時消息有兩種模式:

  1. 指定時長后發送

    deliverAfter(3L, TimeUnit.MINUTES)

  2. 指定時間發送

    deliverAt(LocalDateTime.of(2021, 9, 20, 16, 20).toInstant(ZoneOffset.of("+8")).toEpochMilli() )

Demo

package com.project.pulsar.delayMsgAndSubscriptionsModel;

import com.project.pulsar.conf.PulsarConf;
import org.apache.pulsar.client.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.TimeUnit;

/**
 * 延時消息
 */
@RestController
public class DelayMessageController {
    @Autowired
    PulsarConf pulsarConf;

    String tenantName = "delay";//調用接口創建的租戶,創建方法見com.project.pulsar.persistent.PersistentController.java
    String namespace = "mySpace";//調用接口創建的命名空間,創建方法見com.project.pulsar.persistent.PersistentController.java

    /**
     * 生產消息
     *
     * @param msg
     * @throws PulsarClientException
     */
    @GetMapping("/delay/sendMsg")
    public void sendMsg(String msg) throws PulsarClientException {
        PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
        String topic = "persistent://" + tenantName + "/" + namespace + "/my-delay-topic";
        //持久化         租戶   命名空間   主題
        Producer<byte[]> producer = pulsarFactory.newProducer()
                .topic(topic)
                .create();
        producer.newMessage().deliverAfter(2L, TimeUnit.MINUTES).key("key-1").value("message-1-1".getBytes()).send();
        //        //    MessageId send = producer1.newMessage().deliverAfter(3L, TimeUnit.MINUTES).value("Hello Pulsar!".getBytes()).send();//指定消息在X時分秒后執行
//        MessageId send =producer1.newMessage().deliverAt(
//                LocalDateTime.of(2021, 9, 20, 16, 20).toInstant(ZoneOffset.of("+8")).toEpochMilli()
//        ).value(msg.getBytes()).send();//指定消息在2021/09/20 15:52分發送,如發送時日期已過,會立刻發送
    }


    /**
     * 手動執行獲取消息
     *
     * @throws PulsarClientException
     */
    @GetMapping("/delay/comsumer")
    public void comsumerByArtificial() throws PulsarClientException {
        PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
        String topic = "persistent://" + tenantName + "/" + namespace + "/my-delay-topic";
        //持久化         租戶   命名空間   主題
        Consumer<byte[]> consumer = pulsarFactory.newConsumer()
                .topic(topic)
                .subscriptionName("my-subscription")//必須名稱一致,才可觸發下一行,否則就是兩個不同的消費者
                .subscriptionType(SubscriptionType.Key_Shared)//指定模式
                .subscribe();
        Message<byte[]> receive = consumer.receive();
        System.out.println(new String(receive.getData()));
        consumer.acknowledge(receive);//確認消息被消費
        consumer.close();
    }

    /**
     * 手動執行獲取消息
     *
     * @throws PulsarClientException
     */
    @GetMapping("/delay/comsumer2")
    public void comsumerByArtificial2() throws PulsarClientException {
        PulsarClient pulsarFactory = pulsarConf.pulsarFactory();
        String topic = "persistent://" + tenantName + "/" + namespace + "/my-delay-topic";
        //持久化         租戶   命名空間   主題
        Consumer<byte[]> consumer = pulsarFactory.newConsumer()
                .topic(topic)
                .subscriptionName("my-subscription")//必須名稱一致,才可觸發下一行,否則就是兩個不同的消費者
                .subscriptionType(SubscriptionType.Key_Shared)//指定模式
                .subscribe();
        Message<byte[]> receive = consumer.receive();
        System.out.println(new String(receive.getData()));
        consumer.acknowledge(receive);//確認消息被消費
        consumer.close();
    }
}

代碼下載

代碼見此delayMsgAndSubscriptionsModel包下DelayMessageController.java

Pulsar還有很多特性,寫不過來了,以后用到再總結,有問題就官網、Github、谷歌、百度


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM