ActiveMQ基礎教程(三):C#連接使用ActiveMQ消息隊列


  接上一篇:ActiveMQ基礎教程(二):安裝與配置(單機與集群)

  安裝部署好集群環境:192.168.209.133:61616,192.168.209.134:61616,192.168.209.135:61616

  因為ActiveMQ的集群模式是一種master-slave模式,master節點對外提供服務,slave節點只做數據同步備份,當master節點掛了,slave就會成為master從而繼續對外提供服務,以此實現高可用。

  下面介紹C#連接使用ActiveMQ

 

  前言

  C#連接ActiveMQ一般使用Apache.NMS.ActiveMQ工具包,這個工具包可以在Nuget上獲取:Apache.NMS.ActiveMQ

  

  吐槽:當前Apache.NMS.ActiveMQ最新版本是1.8.0,更新時間是2021-1-4,算是剛更新不久,但是上一個版本1.7.2的更新時間是2016-12-15,五六年不更新,到底是放棄治療了還是ActiveMQ太優秀了呢?

  當然Apache.NMS.ActiveMQ也是開源的,不過它依賴於NMS規范,所以想看源碼的朋友最好是看這兩個開源項目:

  Apache.NMS.ActiveMQ(1.8.0):https://archive.apache.org/dist/activemq/apache-nms-activemq/1.8.0/

  Apache.NMS(1.8.0):https://archive.apache.org/dist/activemq/apache-nms-api/1.8.0/  

  Apache.NMS.ActiveMQ一般的使用步驟是:

  1、通過連接信息(連接字符串)來創建創建NMSConnectionFacotry或者特定環境下的ConnectionFacotry

  2、使用ConnectionFacotry創建Connection,並啟動Connection(類似於ADO.net中的DbConnection)

  3、使用Connection開啟一個Session,開啟Session時可以指定一個默認的通道Destination(對於ActiveMQ而言,Destination可以認為是Queue或者Topic)

  4、如果是發送消息,使用Session創建Producer,接着使用Session創建Message,也可以自行實例化創建,再使用Producer發送Message,發送時可以指定通道Destination

  5、如果是消費消息,使用Session在指定的通道Destination創建Consumer(可創建持久化的Consumer),消費可以是阻塞消費(同步消費),也可以是異步消費

  6、使用完畢之后,關閉釋放Connection,Session等資源(如果是異步消費,則不需要釋放)

 

  連接(IConnection)與會話(ISeesion)

  其實NMS是ActiveMQ官方制定的.net連接使用ActiveMQ的規范,java也有個JMS。

  接下來說的就是NMS在ActiveMQ方面的使用了。

  創建連接會話的個一個簡單的例子:  

    string brokerUri = "activemq:failover:(tcp://192.168.209.133:61616,tcp://192.168.209.134:61616,tcp://192.168.209.135:61616)?randomize=false";
    IConnectionFactory factory = new NMSConnectionFactory(brokerUri);
    IConnection connection = factory.CreateConnection();
    connection.Start();
    ISession session = connection.CreateSession();

  1、brokerUri是NSM的連接信息,作用類似於DbConnection的連接字符串,不過,對於ActiveMQ而言,單機和集群環境下它們是不一樣的:

  如果是單機模式下,brokerUri格式如下:  

    // activemq:tcp://host:port?name1=value1&name2=value2
    string brokerUri = "activemq:tcp://192.168.209.133:61616?alwaysSessionAsync=true";

  其中?alwaysSessionAsync=true部分是QueryString格式的連接配置參數,alwaysSessionAsync表示是否采用異步會話,更多配置參考官網:http://activemq.apache.org/connection-configuration-uri

  如果是集群模式,brokerUri格式如下:  

    // activemq:failover:(tcp://host1:port1,tcp://host2:port2,tcp://host2:port2)?name1=value1&name2=value2
    string brokerUri = "activemq:failover:(tcp://192.168.209.133:61616,tcp://192.168.209.134:61616,tcp://192.168.209.135:61616)?randomize=false";

  啟動?randomize=false也是QueryString格式的連接配置,randomize表示是否隨機取一個uri去連接,更多配置參考官網:http://activemq.apache.org/failover-transport-reference.html

  2IConnectionFactory是工廠接口,而NMSConnectionFactory是NMS中其他工廠的代理工廠,從源碼中可以看到目前代理的工廠有:  

    static NMSConnectionFactory()
    {
        schemaProviderFactoryMap = new Dictionary<string, ProviderFactoryInfo>();
        schemaProviderFactoryMap["activemq"] = new ProviderFactoryInfo("Apache.NMS.ActiveMQ", "Apache.NMS.ActiveMQ.ConnectionFactory");
        schemaProviderFactoryMap["activemqnettx"] = new ProviderFactoryInfo("Apache.NMS.ActiveMQ", "Apache.NMS.ActiveMQ.NetTxConnectionFactory");
        schemaProviderFactoryMap["tcp"] = new ProviderFactoryInfo("Apache.NMS.ActiveMQ", "Apache.NMS.ActiveMQ.ConnectionFactory");
        schemaProviderFactoryMap["ems"] = new ProviderFactoryInfo("Apache.NMS.EMS", "Apache.NMS.EMS.ConnectionFactory");
        schemaProviderFactoryMap["mqtt"] = new ProviderFactoryInfo("Apache.NMS.MQTT", "Apache.NMS.MQTT.ConnectionFactory");
        schemaProviderFactoryMap["msmq"] = new ProviderFactoryInfo("Apache.NMS.MSMQ", "Apache.NMS.MSMQ.ConnectionFactory");
        schemaProviderFactoryMap["stomp"] = new ProviderFactoryInfo("Apache.NMS.Stomp", "Apache.NMS.Stomp.ConnectionFactory");
        schemaProviderFactoryMap["xms"] = new ProviderFactoryInfo("Apache.NMS.XMS", "Apache.NMS.XMS.ConnectionFactory");
        schemaProviderFactoryMap["zmq"] = new ProviderFactoryInfo("Apache.NMS.ZMQ", "Apache.NMS.ZMQ.ConnectionFactory");
        schemaProviderFactoryMap["amqp"] = new ProviderFactoryInfo("Apache.NMS.AMQP", "Apache.NMS.AMQP.ConnectionFactory");
    }

   如果是ActiveMQ,下面兩種方式是等價的,因為NMSConnectionFactory只是其它工廠的代理:  

    IConnectionFactory factory = new Apache.NMS.NMSConnectionFactory(brokerUri);
    IConnectionFactory factory = new Apache.NMS.ActiveMQ.ConnectionFactory(brokerUri);

  3、使用工廠的CreateConnection方法可以創建實現IConnection接口的連接,其實內部實現也是通過被代理的工廠去創建連接,CreateConnection有個重載,可以傳入用戶名和密碼,也就是認證信息  

    IConnection CreateConnection();
    IConnection CreateConnection(string userName, string password);

  注意,這里的用戶名和密碼可不是管理后台的用戶和密碼,它是以插件的形式在acticvemq.xml文件中的broker節點內配置的,如:  

    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
  ....
<plugins> <simpleAuthenticationPlugin> <users> <authenticationUser username="test" password="123456" groups="users,admins"/> </users> </simpleAuthenticationPlugin> </plugins> </broker>

   4、IConnection的Start方法用於開啟連接

  5、創建會話使用IConnection的CreateSession方法,同樣它也有一個重載,表示會話模式AcknowledgementMode,可以分為有事務和非事務兩種

    ISession CreateSession();
    ISession CreateSession(AcknowledgementMode acknowledgementMode);
    # 非事務模式,主要用在簽收方面,表示消費時是否有收到消息並處理完消息,然后需要從隊列中移除
   AutoAcknowledge:消息自動簽收,這也是默認的方式,只要消費者收到消息就將消息從隊列移除,不管消費者有沒有正確處理消息 DupsOkAcknowledge:可重復簽收模式,表示Session不用確定消息的簽收,消息可能會重復發給消費者(如果是重復發送的消息,那么消息的NMSRedelivered屬性將被置為true),這樣做可以降低某些開銷,如果消費者能容忍重復消息就可以使用這種模式 ClientAcknowledge:客戶端簽收模式,表示需要客戶端確認簽收,確認簽收是調用某個消息的Acknowledge方法,需要注意的是,在這種模式下,簽收一個消息會導致自動簽收Session下所有的已消費消息。
   IndividualAcknowledge:單一簽收模式,可ClientAcknowledge模式一樣需要調用消息的Acknowledge方法簽收確認,但是不同的時它只會簽收當前消息,避諱導致Session中其它消費被簽收
  
   # 事務模式,也就是一起提交,不然就回滾,這種模式往往發生在生產者發送多條消息時 Transactional:事務機制,在事務中,要么調用Session的Commit方法一起提交,要么調用Rollback方法回滾,如果在事務中未Commit就調用Close關閉會話,將自動觸發Rollback

   6、IConnection和ISession都有三個事件:  

    # IConnection事件
    ExceptionListener:在發生異常是觸發事件,比如服務器掛了導致連接斷開
    ConnectionInterruptedListener:連接斷開是觸發事件,主要是在集群模式中,master節點掛了導致
    ConnectionResumedListener:重新建立連接時觸發事件,主要是在集群模式中,需要重新連接到master節點
    
    # ISession事件
    TransactionCommittedListener:事務提交后觸發事件
    TransactionRolledBackListener:事務回滾后觸發事件
    TransactionStartedListener:事務開始時觸發事件

  7、Session的常用方法:  

    //創建消息
    CreateBytesMessage();
    CreateMapMessage();
    CreateMessage();
    CreateObjectMessage();
    CreateStreamMessage();
    CreateTextMessage();
    
    //消費者與生產者
    CreateProducer();//創建生產者
    CreateConsumer();//創建消費者
    CreateDurableConsumer();//創建持久化訂閱者,只對Topic有效
    DeleteDurableConsumer();//刪除持久化訂閱者

    //Queue和Topic
    GetQueue();//獲取指定名稱的Queue,如果不存在,將自動創建
    GetTopic();//獲取指定名稱的Topic,如果不存在,將自動創建
    CreateTemporaryQueue();//創建臨時隊列,隊列屬於當前IConnection,只有當前IConnection下才能訪問,如果IConnection關閉斷開,隊列將自動刪除
    CreateTemporaryTopic();//創建臨時的Topic,Topic屬於當前IConnection,只有當前IConnection下才能訪問,如果IConnection關閉斷開,Topic將自動刪除
    DeleteDestination();//刪除隊列或者Topic
    
    //其他
    Close()://關閉當前會話
    Commit();//提交會話,在事務模式中使用
    Rollback();//會話回滾,在事務模式中使用
    Recover();//恢復會話,這個方法會導致消息重新發送,慎用
    CreateBrowser();//創建一個隊列的瀏覽機制,主要用於查看某個隊列中的數據,它的存在意義主要在於,一般的情況下,要查看隊列中的消息,需要彈出消息才行,既然彈出來了,那消息又怎么消費?而IQueueBrowser允許我們在不彈出消息的情況下獲取隊列中的消息信息        

  

  消息

  ActiveMQ中,消息也可以通過Session來創建,也可以自己實例化,而ActiveMQ的消息主要有以下幾種:  

    IBytesMessage:接受字節數據未消息
    IMapMessage:接受字典(鍵值對)消息
    IObjectMessage:接受Object對象消息
    IStreamMessage:接受流數據消息,同IBytesMessage差不多
    ITextMessage:接受字符文本消息,這也是用的最多的一個

  這些消息其實都實現了IMessage接口,除了它們各自的幾個私有屬性和方法外(比如ITextMessage有Text屬性,IObjectMessage有Body屬性,IBytesMessage和IStreamMessage有很多的Read和Write方法),常用的公共屬性還有這么幾個:  

    Properties:消息屬性,生產者往Properties中保存的數據將會原樣的給消費者,不過消息屬性的作用主要是在Selector消息選擇上,后文再說
    NMSDestination:消息所在通道,對ActiveMQ來說是Queue或者Topic
    NMSTimeToLive:消息的有效時間
    NMSMessageId:消息的唯一ID
    NMSDeliveryMode:消息傳遞方式(是否持久化)
    NMSPriority:優先級
    NMSRedelivered:是否是重新發送的消息,可能是事務回滾或者使用Session的Recover方法重發,亦可能是DupsOkAcknowledge模式下的重發,總之,只要消息不是第一次發給消費者,那么這個屬性就是true
    
    Acknowledge():確認簽收方法

 

  生產消息

   先看例子

    ISession session = connection.CreateSession();
    var producer = session.CreateProducer();
    producer.DeliveryMode = MsgDeliveryMode.Persistent;//持久化消息
    var destination = session.GetQueue("queue");
   //var destination = session.GetTopic("topic");//發送到Topic
var message = session.CreateTextMessage("hello activemq"); producer.Send(destination, message);

  1、通過會話ISession的CreateProducer方法創建一個生產者,CreateProducer方法有個重載,可接受一個默認的通道Destination。

    IMessageProducer CreateProducer();
    IMessageProducer CreateProducer(IDestination destination);

  2、對ActiveMQ來說,Destination是Queue或者Topic,而獲取Destination一般有兩種方法  

    // 方法一(推薦)、通過Session.GetQueue()或者Session.GetTopic()方法來獲取
    var destination = session.GetQueue("queue");
    var destination = session.GetTopic("topic");

    // 方法二:直接實例化
    var destination = new ActiveMQQueue("queue");
    var destination = new ActiveMQTopic("topic");

   3、發送消息需要使用IMessageProducer的Send方法,它有幾個重載,無通道Destination的Send方法需要在創建Producer時指定一個Destination,而Send方法接受的幾個參數主要如下:  

    void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive);
    void Send(IDestination destination, IMessage message);
    void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive);//在CreateProducer方法創建IMessageProducer時需要指定Destination參數
    void Send(IMessage message);//在CreateProducer方法創建IMessageProducer時需要指定Destination參數
    destination:通道,queue或者topic
    message:消息,用的多的就是ITextMessage
    deliveryMode:消息傳遞方式:持久化(Persistent)和非持久化(NonPersistent),默認是Persistent,持久化消息保證了消息的可靠性,保證消息發送之后至少被消費一次,而非持久化不保證消息不丟失,但是它的開銷更小
    priority:優先級,默認是Normal
    timeToLive:消息的有效期,如果有效期內還未被消費,消息會被發送到死信隊列

  其中deliveryMode、priority、timeToLive可以在Producer中設置默認值,比如上面的例子,deliveryMode設置默認值Persistent。

  4、生產者IMessageProducer有一個委托:ProducerTransformer,這個委托主要用作一個轉換,或者說是在調用Send方法后,真正發給ActiveMQ之前做的一層過濾機制,通過這個委托,我們可以做往消息屬性添加一些數據,或者統一消息格式等等操作,如:  

    //每個發往ActiveMQ的消息都會加上一個名稱為name的屬性
    producer.ProducerTransformer = new ProducerTransformerDelegate((s, p, m) =>
    {
        m.Properties["name"] = "value";
        return m;
    });

  

  消費消息

  NMS對ActiveMQ的消息消費有兩種:阻塞消費(同步消費),異步消費

  阻塞消費需要我們自己寫循環來實現:  

    ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);//自動簽收
    var destination = session.GetQueue("queue");
    var consumer = session.CreateConsumer(destination);
    while (true)
    {
        var message = consumer.Receive();//阻塞式消費
        Console.WriteLine("接收到消息:" + message?.ToString());
        //如果是ClientAcknowledge或者IndividualAcknowledge,需要調用Acknowledge方法進行簽收確認
        //message?.Acknowledge();
    }

  異步消費:  

    ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);//自動簽收
    var destination = session.GetQueue("queue");
    var consumer = session.CreateConsumer(destination);
    consumer.Listener += message =>
    {
        Console.WriteLine("接收到消息:" + message?.ToString());
        //如果是ClientAcknowledge或者IndividualAcknowledge,需要調用Acknowledge方法進行簽收確認
        //message?.Acknowledge();
    };

  1、非持久化消費者使用Session的CreateConsumer方法創建,創建需要指明Destination,表明是從哪個Queue或者Topic去消費,它包含三個參數: 

    IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal);
    IMessageConsumer CreateConsumer(IDestination destination, string selector);
    IMessageConsumer CreateConsumer(IDestination destination);
    destination:通道,Queue或者Topic
    selector:消息選擇器,默認值是null(不是空字符串),它是一個字符串格式,基於SQL92表達式語法,格式上,你可以認為是SQL語句的where部分(不包含子查詢),具體使用后文再介紹
    noLocal:默認false,當為true時表示創建的消費者不消費當前同一連接創建的生產者發送的消息,只能消費其他連接上的生產者發送的消息(這個配置只對Topic生效,對Queue不生效)

  注意,CreateConsumer方法創建的消費者是非持久化的,也就是說,如果消費者斷線期間發送的消息將會丟失。

  創建持久化訂閱者需要使用CreateDurableConsumer方法,持久化訂閱者只能是針對Topic,創建持久化訂閱者需要指定一個ClientId,當斷線后,再次連接就是通過這個ClientId來比較的,另外還可以指定一個name表示持久化訂閱者的名稱  

    IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal);

   看看個列子

    string brokerUri = "activemq:failover:(tcp://192.168.209.133:61616,tcp://192.168.209.134:61616,tcp://192.168.209.135:61616)?randomize=false";
    IConnectionFactory factory = new Apache.NMS.NMSConnectionFactory(brokerUri); 
    IConnection connection = factory.CreateConnection();
    connection.ClientId = "myClientId";
    connection.Start();
    ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);//自動簽收
    var destination = session.GetTopic("topic");
    var consumer = session.CreateDurableConsumer(destination, "Subscription Name", null, false);
    consumer.Listener += message =>
    {
        Console.WriteLine("接收到消息:" + message?.ToString());
        //如果是ClientAcknowledge或者IndividualAcknowledge,需要調用Acknowledge方法進行簽收確認
        //message?.Acknowledge();
    };

   上面的代碼,哪怕連接斷開期間有接收到新的消息,當再次連接是,也能消費得到,因為CreateDurableConsumer方法在ActiveMQ中注冊了一個指定ClientId的持久化訂閱者,ActiveMQ會給這個指定ClientId的持久化訂閱者保存它斷線期間接收到的消息,當下次這個ClientId的訂閱者重新連接時,ActiveMQ會將斷線期間接收到的消息發給訂閱者。

  2、消息消費主要有下面四個方法:  

    # 同步消費
   Receive():阻塞當前線程,直到有接收到消息時才繼續往下執行 Receive(TimeSpan timeout):阻塞當前線程,直到有接收到消息時才繼續往下執行,或者如果指定時間內未收到消息,方法將會返回null ReceiveNoWait():不會阻塞當前線程,如果有消息則返回消息,沒有消息則返回null
  
   # 異步消費
   Listener事件

  3、同樣的,IMessageConsumer也有一個委托:ConsumerTransformer,它在消費者接受到消息后,在傳給消費程序前執行,可以看做一層過濾機制。

 

  消息選擇器(Selector)

  前面好幾個地方提到消息選擇器,現在說說它。

  消息選擇器采用ActiveMQ提供了SQL92表達式語法,格式上我們可以認為就是SQL語句中不帶子查詢條件的where部分語句,或者說通俗點,可以認為消息選擇器就是通過對消息屬性的刷選過濾來實現對消息的刷選過濾

  在語法上,消息選擇器常用的數據類型有:  

   數值類型:只能由數字開頭,可以包含小數點表示浮點數,不要用引號包着,比如:1、2.34、5.6789,常用運算符有:>、<、=、>=、<=、BETWEEN、+、-、*、/、%
   字符類型:用單引號包着的字符串,比如:'a'、'hello',常用的運算符有:=、<>、IN,注意不等於是<>不是!=
   NULL:表示空,常用的運算:IS NULL 和 IS NOT NULL
   布爾類型:TRUE、FALSE,常用的邏輯運算:AND、OR、NOT

  比如:  

    ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);//自動簽收
    var destination = session.GetTopic("topic");
    var consumer = session.CreateConsumer(destination, "name='Apple' or price>5000");
    consumer.Listener += message =>
    {
        if (message is ITextMessage textMessage)
        {
            Console.WriteLine($"從{destination.TopicName}接收到過濾后的消息:{textMessage.Text}");
        }
        //如果是ClientAcknowledge或者IndividualAcknowledge,需要調用Acknowledge方法進行簽收確認
        //message?.Acknowledge();
    };

  發送消息:

    ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
    var producer = session.CreateProducer();
    producer.DeliveryMode = MsgDeliveryMode.Persistent;//持久化消息
    IDestination destination = session.GetTopic("topic");

    //message1可以被消費到
    var message1 = session.CreateTextMessage("message1");
    message1.Properties["name"] = "Apple";
    producer.Send(destination, message1);

    //message2可以被消費到
    var message2 = session.CreateTextMessage("message2");
    message2.Properties["price"] = 6000d;
    producer.Send(destination, message2);

    //message3不能被消費
    var message3 = session.CreateTextMessage("message3");
    producer.Send(destination, message3);

  

  日志輸出

  在開發過程中,有時由於一個小錯誤,導致某些預想不到的結果,這時我們可能需要打印一下日志查看一下問題(你總不希望去看源碼流程吧),比如我碰到的,因為發送的消息屬性不合理(貌似屬性不支持decimal,可以理解,畢竟它不是基礎屬性),導致ActiveMQ集群中master出問題,然后在Producer.Send()時導致線程阻塞卡住,但是它卻沒有拋出異常!這種問題是很難找,還好ActiveMQ提供了日志輸出的功能,允許我們實現 Apache.NMS.ITrace 來實現日志輸出,比如我們簡單的實現:  

  
    public class MyActiveTrace : Apache.NMS.ITrace
    {

        public bool IsDebugEnabled => true;
        public bool IsInfoEnabled => true;
        public bool IsWarnEnabled => true;
        public bool IsErrorEnabled => true;
        public bool IsFatalEnabled => true;

        public void Debug(string message)
        {
            if (IsDebugEnabled)
            {
                Console.WriteLine($"Debug:{message}");
            }
        }

        public void Error(string message)
        {
            if (IsErrorEnabled)
            {
                Console.WriteLine($"Error:{message}");
            }
        }

        public void Fatal(string message)
        {
            if (IsFatalEnabled)
            {
                Console.WriteLine($"Fatal:{message}");
            }
        }

        public void Info(string message)
        {
            if (IsInfoEnabled)
            {
                Console.WriteLine($"Info:{message}");
            }
        }

        public void Warn(string message)
        {
            if (IsWarnEnabled)
            {
                Console.WriteLine($"Warn:{message}");
            }
        }
    }
MyActiveTrace

  在使用是,只需要給 Tracer.Trace 的靜態屬性賦值就可以了:  

    Tracer.Trace = new MyActiveTrace();

 

  ActiveMQ發送和接收消息的一個完成的Demo例子

  例子:  

    static void Main(string[] args)
    {
        Tracer.Trace = new MyActiveTrace();

        //string brokerUri = "activemq:tcp://192.168.209.133:61616?alwaysSessionAsync=true";
        string brokerUri = "activemq:failover:(tcp://192.168.209.133:61616,tcp://192.168.209.134:61616,tcp://192.168.209.135:61616)?randomize=false";
        IConnectionFactory factory = new Apache.NMS.NMSConnectionFactory(brokerUri);
        IConnection connection = factory.CreateConnection();
        connection.Start();

        //從隊列消費
        {
            ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);//自動簽收
            var destination = session.GetQueue("queue");
            var consumer = session.CreateConsumer(destination);
            consumer.Listener += message =>
            {
                if (message is ITextMessage textMessage)
                {
                    Console.WriteLine($"從{destination.QueueName}接收到消息:{textMessage.Text}");
                }
                //如果是ClientAcknowledge或者IndividualAcknowledge,需要調用Acknowledge方法進行簽收確認
                //message?.Acknowledge();
            };
        }
        //從Topic消費
        {
            ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);//自動簽收
            var destination = session.GetTopic("topic");
            var consumer = session.CreateConsumer(destination);
            consumer.Listener += message =>
            {
                if (message is ITextMessage textMessage)
                {
                    Console.WriteLine($"從{destination.TopicName}接收到消息:{textMessage.Text}");
                }
                //如果是ClientAcknowledge或者IndividualAcknowledge,需要調用Acknowledge方法進行簽收確認
                //message?.Acknowledge();
            };
        }
        //使用Selector從Topic過濾
        {
            ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);//自動簽收
            var destination = session.GetTopic("topic");
            var consumer = session.CreateConsumer(destination, "name='Apple' or price>5000 ");
            consumer.Listener += message =>
            {
                if (message is ITextMessage textMessage)
                {
                    Console.WriteLine($"從{destination.TopicName}接收到過濾后的消息:{textMessage.Text}");
                }
                //如果是ClientAcknowledge或者IndividualAcknowledge,需要調用Acknowledge方法進行簽收確認
                //message?.Acknowledge();
            };
        }
        //發送消息
        {
            ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
            var producer = session.CreateProducer();
            producer.DeliveryMode = MsgDeliveryMode.Persistent;//持久化消息

            while (true)
            {
                Console.Write("請輸入消息(輸入空表示退出):");
                var line = Console.ReadLine();
                if (string.IsNullOrEmpty(line)) break;
                var message = session.CreateTextMessage(line);

                IDestination destination;
                var value = new Random().Next();
                if (value % 2 == 0)
                {
                    destination = session.GetQueue("queue");
                    Console.WriteLine($"往隊列queue中發送消息:{line}");
                }
                else
                {
                    destination = session.GetTopic("topic");
                    message.Properties["name"] = new string[] { "小米", "華為", "Apple" }.ElementAt(new Random().Next(0, 3));
                    message.Properties["price"] = new double[] { 3000d, 4000d, 5000d, 6000d }.ElementAt(new Random().Next(0, 3));
                    Console.WriteLine($"往topic中發送消息:{line}");
                    Console.WriteLine($"消息的屬性:name={message.Properties["name"]}     price={message.Properties["price"]}");
                }

                producer.Send(destination, message);
            }
        }

    
//關閉連接前記得注意消費者是不是異步消費      connection.Close();
Console.ReadKey(); }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM