RabbitMQ.Client API (.NET)中文文檔


主要的名稱空間,接口和類

核心API中定義接口和類 RabbitMQ.Client 名稱空間:

1
using RabbitMQ.Client;
核心API接口和類
  • IModel :表示一個AMQP 0-9-1頻道,提供了大部分 的操作(方法)協議。
  • IConnection :表示一個AMQP 0-9-1連接
  • ConnectionFactory :構造 IConnection 實例
  • IBasicConsumer:代表一個消費者消息
其他有用的接口和類包括:
  • DefaultBasicConsumer:常用的消費者基類
RabbitMQ.Client以外的公共命名空間 包括:
  • RabbitMQ.Client.Events :客戶端庫的各種事件和事件處理程序,包括 EventingBasicConsumer , 建立在消費者實現c#事件處理程序。
  • RabbitMQ.Client.Exceptions :用戶可見的異常。

所有其他的命名空間是留給庫的私有實現細節,雖然私有的命名空間的成員使用該庫以允許開發者實現他們在庫實現發現錯誤或設計錯誤的解決方法通常是提供給應用程序。應用程序可以不依賴任何類,接口,成員出現私人的命名空間內的跨庫的版本保持穩定的變量等。

連接到代理( Connecting to a Broker

要連接到RabbitMQ的,有必要實例化(例示)一個連接工廠和其配置為使用所需的主機,虛擬主機和證書(證書)。然后使用ConnectionFactory.CreateConnection()打開的連接。下面兩段代碼連接到主機名RabbitMQ的節點:
   
   
   
           
     
     
     
             
  1. ConnectionFactory factory = new ConnectionFactory(); factory.UserName = user; // "gue
  2. factory.Password = pass;
  3. factory.VirtualHost = vhost;
  4. factory.HostName = hostName;
  5. IConnection conn = factory.CreateConnection();
   
   
   
           
     
     
     
             
  1. ConnectionFactory factory = new ConnectionFactory();
  2. factory.Uri = "amqp://user:pass@hostName:port/vhost";
  3. IConnection conn = factory.CreateConnection();
由於.NET客戶端使用AMQP 0-9-1 URI規格比其他客戶的嚴格的解釋,必須小心使用URI時服用。特別是,主機部分不能被省略,並且與空名稱虛擬主機不可尋址(可尋址的)。所有出廠的屬性都有默認值。如果該屬性保持建立連接之前未分配將用於一個屬性的默認值: 用戶名     “guest” 密碼     “guest” 虛擬主機     “/” 主機名     “localhost” 端口     5672定期連接,5671連接使用TLS 然后IConnection接口可用於打開一個通道:
      
      
      
              
  1. IModel channel= conn.CreateModel();
通道 現在可以被用來發送和接收消息,如在隨后的章節中描述。
使用交換機(Exchanges)和隊列(Queues)

客戶端應用程序在交換機和隊列中工作,AMQP 0-9-1的高層次的積木工作。這些都必須先“申明”,然后才能使用它們。聲明任一類型的對象只是確保該名稱的一個存在,如果有必要創造它。繼續前面的例子,下面的代碼聲明一個交換機和隊列,然后綁定在一起。
     
     
     
             
  1. model.ExchangeDeclare(exchangeName, ExchangeType.Direct);
  2. model.QueueDeclare(queueName, false, false, false, null);
  3. model.QueueBind(queueName, exchangeName, routingKey, null);
這將激活一下對象:
   1: “直連”(direct)類型的非持久性(non-durable),非自動刪除(non-autodelete)交換機(exchange)
   2:非持久,不自動刪除,非排他性(non-exclusive)的隊列

       交換機可通過使用額外的參數 定制 。上面的代碼將隊列綁定到指定路由的交換機。請注意,許多通道API(IModel)方法被重載。 ExchangeDeclare的便利縮寫形式使用合理的默認值。也有較長的形式與更多的參數,讓你 根據需要重載 這些默認值,充分控制在需要的地方。這種“短版,長版”的格局在整個API使用。

發送消息(Publishing Messages)

使用  IModel.BasicPublish 發送消息到交換機,  如下:
     
     
     
             
  1. byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
  2. model.BasicPublish(exchangeName, routingKey, null, messageBodyBytes);
為了更好的控制,可以使用重載變量指定的強制性標志,或指定的消息屬性:
     
     
     
             
  1. byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
  2. IBasicProperties props = model.CreateBasicProperties();
  3. props.ContentType = "text/plain";
  4. props.DeliveryMode = 2;
  5. model.BasicPublish(exchangeName,routingKey,props,messageBodyBytes);

以持續性的交互模式發送文本消息,有關消息屬性的詳細信息請查看IBasicProperties接口的定義。

在下面的例子中,我們發送定義了Header(頭)的消息:

1
2
3
4
5
6
7
8
9
10
byte [] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes( "Hello, world!" );
   
IBasicProperties props = model.CreateBasicProperties();
props.ContentType = "text/plain" ;
props.DeliveryMode = 2;
props.Headers = new Dictionary< string , object >();
props.Headers.Add( "latitude" ,  51.5252949);
props.Headers.Add( "longitude" , -0.0905493);
   
model.BasicPublish(exchangeName, routingKey,props, messageBodyBytes);

下面的示例代碼設置消息過期時間:

1
2
3
4
5
6
7
8
byte [] messageBodyBytes=System.Text.Encoding.UTF8.GetBytes( "Hello, world!" );
  
IBasicProperties props = model.CreateBasicProperties();
props.ContentType = "text/plain" ;
props.DeliveryMode = 2;
props.Expiration = "36000000"
  
mode.BasicPublish(exchangeName,routingKey,props,messageBodyBytes);

獲取單條消息(Fetching Individual Messages ("pull API"))

使用IModel.BasicGet獲取單條消息,從消息的Header(屬性)和消息主體可以獲取到BasicGetResult的實例

1
2
3
4
5
6
7
8
bool noAck = false ;
BasicGetResult result = channel.BasicGet(queueName, noAck);
if (result == null ) {
     // No message available at this time.
} else {
     IBasicProperties props = result.BasicProperties;
     byte [] body = result.Body;
     ...

上面的  noAck=false 你也可以使用  IModel.BasicAck 來確認成功的接受並處理了消息。

1
2
3
4
...
     // acknowledge receipt of the message
     channel.BasicAck(result.DeliveryTag, false );
}

注意:使用該API獲取消息是效率較低。如果你想使用RabbitMQ將郵件推送到客戶端,請參閱下一節。 


通過訂閱檢索消息(Retrieving Messages By Subscription ("push API"))


接收消息的另一種方法是使用IBasicConsumer接口建立訂閱。該消息將在到達時被自動推送,而不必進行主動請求。實現消費者的一種方法是使用的方便(convenience)類EventingBasicConsumer,其中調度交付和其他消費的生命周期事件為C#事件:

1
2
3
4
5
6
7
8
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
                 {
                     var body = ea.Body;
                     // ... process the message
                     ch.BasicAck(ea.DeliveryTag, false );
                 };
String consumerTag=channel.BasicConsume(queueName, false ,consumer);

另一種選擇是繼承DefaultBasicConsumer類,重寫必要的方法,或者直接實現IBasicConsumer。通常要實現核心方法IBasicConsumer.HandleBasicDeliver。更復雜的消費者將需要實施進一步的方法。特別是,HandleModelShutdown使 channel/connection關閉。消費者還可以實現HandleBasicCancelOk通知取消的消息。 在沒有被提交給原始IModel.BasicConsume情況下,DefaultBasicConsumer的ConsumerTag屬性可用於檢索服務器生成的消費者標簽。您可以使用IModel.BasicCancel主動取消消費者:

1
channel.BasicCancel(consumerTag);

當調用API方法,你總是通過消費者標簽提交到他們自己的消費者,它可以是客戶端或服務器生成的,詳見AMQP規范0-9-1文件中解釋。


消費者的並發性考慮(Concurrency Considerations for Consumers)


每個IConnection實例,在當前實現中,由單個后台線程從Socket中讀取並調度所得事件給應用程序的支持。如果啟用心跳,必須3.5.0版本,它們用.NET的定時器來實現的。通常,因此,在使用這種庫的應用程序至少需要激活兩個線程:

1:應用程序線程(the application thread

包含應用程序的邏輯,調用 IModel 的方法執行協議操作。

2:活動的 I/O 線程

通過IConnection的實例隱藏和完全管理


在任何 回調的應用程序和庫中,線程模型對應用程序是可見的。這樣的回調包括:

1、任何IBasicConsumer方法

2、在IModel的BasicReturn事件

3、任何對IConnection了各種關閉事件,IModel等。


消費者回調和訂閱(Consumer Callbacks and Ordering)


從版本3.5.0應用回調處理程序可以調用阻塞操作(如IModel.QueueDeclare或IModel.BasicCancel)。IBasicConsumer回調並發調用。然而,每個通道的操作順序將予以保留。換句話說,如果消息A和B在該順序輸送在同一通道上,它們將被以該順序進行處理。如果消息A和B分別在不同的通道輸送,它們可以以任何順序進行處理(或並行)。消費者回調在派往由.NET運行庫提供的默認的TaskScheduler任務調用。


使用自定義計划任務(Using a Custom Task Scheduler)

我們可以通過設置ConnectionFactory.TaskScheduler使用自定義的任務調度程序:

1
2
3
4
5
6
7
public class CustomTaskScheduler : TaskScheduler
{
   // ...
}
 
var cf = new ConnectionFactory();
cf.TaskScheduler = new CustomTaskScheduler();

此處的例子,可以用來限制與一個自定義的TaskScheduler並發程度。


線程之間共享通道(Sharing Channels Between Threads)

根據經驗,IModel實例不應由多個線程同時使用:應用程序代碼應該為IModel實例維護一個清晰的線程所有權概念。如果多個線程需要訪問特定的IModel實例,應用程序應該實施互斥本身。 實現這一點的一種方式是對於IModel的所有用戶鎖定實例本身

   
   
   
           
  1. IModel ch = RetrieveSomeSharedIModelInstance();
  2. lock (ch) {
  3. ch.BasicPublish(...);
  4. }
不正確序列化的 IModel操作包括但不限於,如下:

1、 在線路上發送的無效幀序列(例如,如果同時運行多於一個BasicPublish操作,則發生),和/或NotSupportedExceptions從RpcContinuationQueue類中的方法拋出,引發“禁止請求的管道”(在同時運行多個AMQP 0-9-1同步操作(如ExchangeDeclare)的情況下)。


處理不可路由的消息(Handling Unroutable Messages

如果發布的消息具有設置的“mandatory”標志,但不能傳遞,代理將返回給發送客戶端(通過basic.return AMQP 0-9-1命令)。 為了通知這樣的返回,客戶可以訂閱IModel.BasicReturn事件。 如果沒有連接到事件的偵聽器,則返回的消息將被靜默刪除。

   
   
   
           
  1. model.BasicReturn +=
  2. new RabbitMQ.Client.Events.BasicReturnEventHandler(...);
例如,如果客戶端發布了一條“強制”標志設置為未綁定到隊列的“direct”類型交換的消息,則BasicReturn事件將觸發。

斷開與RabbitMQ的連接( Disconnecting from RabbitMQ )

要斷開連接,只需關閉通道和連接:

   
   
   
           
  1. channel.Close(200, "Goodbye");
  2. conn.Close();
注意,關閉頻道被認為是良好的做法,但不是絕對必要的 - 它將在底層連接關閉時自動完成。 在某些情況下,您可能希望連接在連接上的最后一個打開通道關閉后自動關閉。 要實現這一點,請將IConnection.AutoClose屬性設置為true,但僅在創建第一個通道后:
    
    
    
            
  1. IConnection conn = factory.CreateConnection(...);
  2. IModel channel = conn.CreateModel();
  3. conn.AutoClose = true;
當AutoClose為true時,最后關閉的通道也將導致連接關閉。 如果在創建任何通道之前將其設置為true,則連接將在此時關閉。

從網絡故障自動恢復( Automatic Recovery From Network Failures )
連接恢復( Connection Recovery )

客戶端和RabbitMQ節點之間的網絡連接可能失敗。 RabbitMQ .NET / C#客戶端支持自動恢復連接和拓撲(queues, exchanges, bindings, and consumers)。 許多應用程序的自動恢復過程遵循以下步驟:

     1、重新連接 (Reconnect)

     2、還原連接偵聽器( Restore connection listeners)

     3、重新打開通道(Re-open channels)

     4、還原頻道偵聽器(Restore channel listeners

     5、恢復通道basic.qos設置,發布者確認和事務設置( Restore channel basic.qos setting, publisher confirms and transaction settings


拓撲恢復包括對每個通道執行的以下操作:

     1、重新聲明交易(除了預定義的交易)(Re-declare exchanges (except for predefined ones)

     2、重新聲明隊列(Re-declare queues)

     3、恢復所有綁定(Recover all bindings)

     4、恢復所有消費者(Recover all consumers)

要啟用自動連接恢復,請將ConnectionFactory.AutomaticRecoveryEnabled設置為true:

   
   
   
           
  1. ConnectionFactory factory = new ConnectionFactory();
  2. factory.AutomaticRecoveryEnabled = true;
  3. // connection that will recover automatically
  4. IConnection conn = factory.CreateConnection();
如果恢復由於異常(例如RabbitMQ節點仍然不可達)失敗,將在固定的時間間隔(默認為5秒)后重試。 間隔可以配置:
    
    
    
            
  1. ConnectionFactory factory = new ConnectionFactory();
  2. // attempt recovery every 10 seconds
  3. factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);

拓撲恢復

拓撲恢復涉及恢復queues, exchanges, bindings, and consumers。 默認情況下啟用它,但可以禁用:

   
   
   
           
  1. ConnectionFactory factory = new ConnectionFactory();
  2. Connection conn = factory.CreateConnection();
  3. factory.AutomaticRecoveryEnabled = true;
  4. factory.TopologyRecoveryEnabled = false;
手動確認和自動恢復
當使用手動確認時,可能與RabbitMQ節點的網絡連接在消息傳遞和確認之間失敗。 在連接恢復后,RabbitMQ將重置所有通道上的交付標簽。 這意味着使用舊的傳遞標記的basic.ack,basic.nack和basic.reject將導致通道異常。 為了避免這種情況,RabbitMQ .NET客戶端跟蹤和更新傳遞標記,使它們在恢復之間單調增長。 IModel.BasicAck,IModel.BasicNack和IModel.BasicReject然后將調整后的交付標簽轉換為RabbitMQ使用的標簽。 不會發送過期交貨標簽的確認。 使用手動確認和自動恢復的應用程序必須能夠處理重新遞送。

使用AMQP 0-9-1的常見方法( Common ways of working with AMQP 0-9-1

當使用RabbitMQ構建分布式系統時,會有一些不同的消息模式反復出現。在本節中,我們將介紹一些最常見的編碼模式和交互風格:


    點對點消息:遠程過程調用(RPC)和指向特定接收器的異步消息。

    事件廣播:一對多交互;隱含地指向一組感興趣的接收者的消息的傳輸,以及零個或多個可能的響應的收集。

    責任轉移:選擇網絡中的哪個部分負責任何給定的消息。

    消息傳輸:至少一次和最多一次消息傳遞。

    在與外部資源交互時保持原子性和冪等性。


有限庫支持也可用於處理這些模式,在RabbitMQ.Client.MessagePatterns命名空間:


    訂閱提供了從服務器接收消息的高級接口。

    SimpleRpcServer構建在Subscription上以實現RPC或單向服務。

    SimpleRpcClient構建在Subscription上,與遠程服務交互。


將來的RabbitMQ .NET客戶端庫版本將包括改進對最常見的消息傳遞模式及其變體的高級支持。


點對點消息(Point-to-point Messaging


當消息的發布者具有特定的接收應用時,例如,當通過AMQP服務器使得RPC樣式的服務可用時,或者當工作流鏈中的應用接收到消息時,發生點對點消息傳遞模式工作項,並將轉換后的工作項發送給其后繼者。

同步,客戶機 - 服務器遠程過程調用(RPC)

為了執行請求/響應RPC,


    一些解決服務的手段必須可用

    一些接收答復的方法必須可用

    將請求消息與回復消息相關聯的一些裝置必須可用


尋址服務(Addressing the service

由於AMQP消息是使用一對交換名稱和路由密鑰發布的,因此這足以用於尋址服務。使用簡單的交換名/路由 - 密鑰組合允許多種不同的方式來實現服務,同時向客戶端呈現相同的接口。例如,服務可以被實現為從隊列消耗的單個進程和在內部的負載均衡,或者其可以是從單個隊列消耗的多個進程,被遞送請求循環式,從而在沒有特殊編碼的情況下進行負載均衡服務邏輯。消息也可以尋址到服務請求隊列


    直接,使用AMQP默認交換(“”);要么

    間接地通過使用服務特定交換,其使得路由密鑰免費用於諸如方法選擇或附加服務特定尋址信息的目的;要么

    間接地,通過使用由多個服務共享的交換,其中服務名稱在路由密鑰中編碼。


使用默認交換之外的交換允許其他應用程序接收每個請求消息的副本,這對於監視,審計,日志記錄和調試是有用的。

確保服務實例正在偵聽


AMQP 0-9-1發布操作(IModel.BasicPublish)提供了交付標志“強制性”,可用於確保客戶端發送請求時的服務可用性。如果不能將請求路由到隊列,則設置“mandatory”標志會導致返回請求。返回的消息顯示為basic.return命令,通過IModel上用於發布消息的IModel.BasicReturn事件使其可見。


由於已發布的消息通過basic.return方法返回到客戶端,而basic.return是異步否定確認事件,因此不能將特定消息的basic.return作為傳遞的確認:使用傳遞標志只提供了提高桿的方法,而不是完全消除故障。


另外,消息被標記為“強制性”並且成功地入隊在一個或多個隊列上的事實不能保證其最終接收:最平常地,隊列可以在消息被處理之前被刪除,但是其他情況,例如使用noAck標志的消息消費者,也可以使得“強制”提供的保證有條件。


或者,您可以使用發布商確認。通過調用IModel.ConfirmSelect將通道設置為確認模式會導致代理在通過傳遞到就緒消費者或持久存儲到磁盤來處理每個消息后發送Basic.Ack。一旦通過IModel.BasicAcks事件處理程序確認成功處理的消息,代理就承擔了該消息的責任,客戶端可以考慮處理消息。注意,代理還可以通過發送回Basic.Nack來否定確認消息。在這種情況下,如果通過IModel.BasicNacks事件處理程序拒絕消息,客戶端應該假定消息丟失或以其他方式無法投遞。此外,請注意,不可路由的消息 - 發布為不存在隊列的強制性消息 - 都是Basic.Return和Basic.Ack'ed。


接收回復(Receiving Replies

AMQP 0-9-1內容頭(IBasicProperties)包含一個稱為ReplyTo的字段,可用於告知服務在何處發布對接收到的RPC請求的答復。在當前的RabbitMQ客戶端庫中,ReplyTo頭中的字符串使用最廣泛的格式是一個簡單的隊列名稱,盡管傳遞通過應用程序特定規則加入的交換名稱和路由鍵也是一個選項。服務實例將其答復發布到指定的目的地,並且請求客戶端應該安排接收如此尋址的消息,使用在適當綁定的隊列上的BasicGet或BasicConsume。

將接收到的應答與發送的請求相關聯

IBasicProperties包含一個名為CorrelationId的字段,在AMQP 0-9-1中是一個非結構化字符串,可用於將請求匹配到回復。應答消息應具有與附加到請求消息的相同的相關標識。


異步,單向消息傳遞(Asynchronous, one-way messaging

在某些情況下,簡單的請求 - 回復交互模式不適合您的應用程序。 在這些情況下,感興趣的交互模式可以從異步,單向,點對點消息構造。 如果應用程序要響應同步,RPC樣式請求和異步單向請求,它應該使用ReplyTo的值來決定請求它的交互樣式:如果ReplyTo存在並且非空, 請求可以假定是一個RPC樣式的調用; 否則,應假定它是單向消息。 CorrelationId字段可以用於將多個相關消息分組在一起,就像對於RPC樣式的情況一樣,但是更通常地將任意數量的消息綁定在一起。


點對點的確認模式(Acknowledgment modes for point-to-point)

當從服務器接收消息時,AMQP可以以兩種模式之一操作:自動確認模式(當BasicGet,BasicConsume或Subscription構造函數上設置noAck標志時)或手動確認模式。選擇正確的確認模式對於您的應用程序很重要:


    自動確認模式意味着當服務器在網絡上傳輸消息時,服務器將內部將消息標記為已成功傳遞。以自動確認模式傳送的消息通常不會重新傳送到任何其他接收器。

    手動確認模式意味着在將消息標記為已成功傳送之前,服務器將等待接收的肯定確認。如果在服務器接收到確認之前關閉了交付的手動確認模式下的通道(IModel),則將重新排隊。


一般來說,

    如果服務處於手動確認模式,則它不應該確認請求消息,直到它回復它;請參閱下面有關與外部資源交互的部分。

    客戶端可以使用自動確認模式,這取決於請求消息的重傳的結果。


庫支持點對點消息傳遞(Library support for point-to-point messaging)

RabbitMQ .NET客戶端庫包括涉及點對點消息傳遞的常見任務的基本支持。

SimpleRpcServer

類RabbitMQ.Client.MessagePatterns.SimpleRpcServer實現同步RPC樣式的請求處理以及異步消息處理。用戶應該繼承SimpleRpcServer,覆蓋一個或多個以“Handle”開頭的方法。 SimpleRpcServer實例具有請求調度循環MainLoop,它將請求解釋為RPC樣式的請求,如果請求的IBasicProperties的ReplyTo字段為非空且非空,則需要回復。具有缺少或空的ReplyTo字段的請求被視為單向。當處理了RPC樣式的請求時,將答復發送到ReplyTo地址。答復地址首先與描述上面給出的類似URI的語法的正則表達式相匹配;如果匹配,則使用類似URI的語法的組件作為回復地址,如果不匹配,則將整個字符串用作簡單隊列名稱,並將回復發送到默認交換(“”)一個等於ReplyTo字符串的路由鍵。

SimpleRpcClient

類RabbitMQ.Client.MessagePatterns.SimpleRpcClient實現與SimpleRpcServers或類似的交互的代碼。 RPC風格的交互是用Call方法執行的。 (私人)訂閱設置為從服務接收回復,並且ReplyTo字段設置為指向訂閱。請求的CorrelationId字段被初始化為新的GUID。異步/單向交互被簡單地傳遞到IModel.BasicPublish而不修改:它是由調用者在異步情況下設置CorrelationId。該類目前不支持在已發布的請求消息上設置“mandatory”標志,也不支持處理由於設置該標志而可能產生的任何BasicReturn事件。從內部訂閱檢索答復的代碼當前無法處理多個同時未解決的RPC請求,因為它要求答復以與發送請求相同的順序到達。在解除此限制之前,不要嘗試管理通過SimpleRpcClient的單個實例發送的請求。另請參見可覆蓋的受保護方法SimpleRpcClient.RetrieveReply。使用SimpleRpcClient的基本模式如下:

   
   
   
           
  1. using (IConnection conn = new ConnectionFactory()
  2. .CreateConnection(args[0])) {
  3. using (IModel ch = conn.CreateModel()) {
  4. SimpleRpcClient client = new SimpleRpcClient(ch, /* ... */);
  5. // in the line above, the "..." indicates the parameters
  6. // used to specify the address to use to route messages
  7. // to the service.
  8. // The next three lines are optional:
  9. client.TimeoutMilliseconds = 5000; // defaults to infinity
  10. client.TimedOut += new EventHandler(TimedOutHandler);
  11. client.Disconnected += new EventHandler(DisconnectedHandler);
  12. byte[] replyMessageBytes = client.Call(requestMessageBytes);
  13. // other useful overloads of Call() and Cast() are
  14. // available. See the code documentation of SimpleRpcClient
  15. // for full details.
  16. }
  17. }
請注意,單個SimpleRpcClient實例可以執行許多(順序)Call()和Cast()請求! 建議單個SimpleRpcClient重復用於多個服務請求,只要請求是嚴格順序的。


事件廣播(Event Broadcasting)

當應用程序希望在不知道每個感興趣方的地址的情況下向應用程序池指示狀態改變或其他通知時,發生事件廣播模式。 對某個事件子集感興趣的應用程序使用交換和隊列綁定來配置哪些事件被路由到其自己的專用隊列。


通常,事件將通過主題交換廣播,但是直接交換雖然不太靈活,但是有時對於其有限模式匹配能力足夠的應用可以執行得更好。


發布事件(Publishing events

要發布事件,首先確保交換存在,然后確定適當的路由密鑰。 例如,對於股票,一個鍵如“stock.ibm.nyse”可能是合適的; 對於其他應用程序,其他主題層次結構將自然出現。 主題交換常用。 然后發布消息。 例如:

   
   
   
           
  1. using (IConnection conn = new ConnectionFactory()
  2. .CreateConnection(args[0])) {
  3. using (IModel ch = conn.CreateModel()) {
  4. IBasicProperties props = ch.CreateBasicProperties();
  5. FillInHeaders(props); // or similar
  6. byte[] body = ComputeBody(props); // or similar
  7. ch.BasicPublish("exchangeName",
  8. "chosen.routing.key",
  9. props,
  10. body);
  11. }
  12. }
請參閱RabbitMQ.Client.IModel類中的BasicPublish的各種重載的文檔。

訂閱(Subscription)
RabbitMQ.Client.MessagePatterns.Subscription類實現了大多數接收消息(包括,特別是廣播事件)的樣板,包括消費者聲明和管理,但不包括隊列和交換聲明和隊列綁定。例如,
    
    
    
            
  1. // "IModel ch" in scope.
  2. Subscription sub = new Subscription(ch, "STOCK.IBM.#");
  3. foreach (BasicDeliverEventArgs e in sub) {
  4. // handle the message contained in e ...
  5. // ... and finally acknowledge it
  6. sub.Ack(e);
  7. }
將使用IModel.BasicConsume在隊列上啟動一個消費者。它假定隊列和任何綁定以前已經聲明。應該為每個接收的事件調用Subscription.Ack(),無論是否使用自動確認模式,因為Subscription內部知道是否需要確認的實際網絡消息,並以有效的方式為您處理只要在你的代碼中總是調用Ack()。有關完整的詳細信息,請參閱Subscription類的代碼文檔。

使用自定義消費者檢索事件(Retrieving events with a custom consumer)
有時,使用Subscription的高級方法是足夠的。 然而,其他時候,需要使用定制消費者。 這種檢索事件的方法是將隊列綁定到與適當的路由 - 密鑰模式規范相關的交換。 例如,假設我們的應用程序想要在隊列“MyApplicationQueue”上檢索有關IBM的所有價格:         

   
   
   
           
  1. // "IModel ch" in scope.
  2. ch.ExchangeDeclare("prices", "topic");
  3. ch.QueueDeclare("MyApplicationQueue", false, true, true, null);
  4. ch.QueueBind("MyApplicationQueue", "prices",
  5. "STOCK.IBM.#", false, null);
然后使用BasicGet或BasicConsume從“MyApplicationQueue”消耗消息。 一個更完整的例子在ApiOverview章節。


事件廣播的確認模式(Acknowledgment modes for event broadcasting

與用於點對點消息傳遞的相同的自動確認/手動確認決定可用於廣播事件的消費者,但是交互的模式引入不同的權衡:


    對於高容量消息傳遞,其中偶爾可接受的是不接收一個感興趣的消息,自動確認模式是有意義的

    對於其中滿足我們的訂閱的每個消息需要被遞送的情況,手動確認是適當的


有關詳細信息,請參閱下面的可靠郵件傳輸部分。還要注意,只要為每個接收的消息調用Subscription.Ack(),類Subscription就會負責確認和各種確認模式。


可靠的消息傳輸(Reliable message transfer)

消息可以在具有不同服務質量(QoS)水平的端點之間傳輸。一般來說,不能完全排除故障,但重要的是要了解各種交付故障模式,以了解從故障中恢復的種類,以及可能恢復的情況。重申:不可能完全排除故障。可以做的最好是縮小可能發生故障的條件,並且當檢測到故障時通知系統操作員。

至少一次遞送


該QoS水平確保消息被傳遞到其最終目的地至少一次。也就是說,接收器可以接收消息的多個副本。如果對於給定消息,副作用僅發生一次是重要的,則應該使用至多一次遞送。


要實施至少一次投放(At-least-once delivery)

    像往常一樣發布消息,在其上具有一些相關標識符和回復地址,使得接收方可以確認對發送方的接收。當接收到消息時,將確認消息發送回發送者。如果消息是RPC請求,則RPC應答消息隱式地是對請求的接收的確認。

    或者,不是手動實現往返邏輯,而是客戶端可以使用發布者確認。通過在通道上啟用確認模式,客戶端請求代理確認或否定確認從該點開始在該通道上發送的所有消息。請參閱“責任轉移”中有關如何使用確認的說明。


決定郵件重發策略可能很困難。一些簡單的重新發送策略是:


    如果您的連接丟失或在您收到收據確認之前發生其他崩潰,請重新發送

    如果您在幾秒鍾內沒有收到確認,則超時並重新發送。請確保每次重新發送的超時時間加倍,以幫助避免與重試相關的拒絕服務和網絡擁塞。


最多一次傳送(At-most-once delivery)

對於最多一次傳遞,只需發布消息,一次,照常。不需要相關標識符。在使用應用程序中接收消息,注意交貨時的Redelivered標志。 Redelivered標志只有在服務器認為它提供第一次消息消息時才會清除。如果之前已進行任何交貨嘗試,則重新送達標志將被設置。 Redelivered標志是一個非常有限的信息,只給出最多一次的語義。


用多節點RabbitMQ集群編碼(Coding with multi-node RabbitMQ clusters

在需要連續服務的情況下,可以通過一些仔細的編程和用於故障轉移的熱備份集群的可用性來對抗服務器故障的可能性。

失敗時的主要關注點是


    公布/承認的工作單位的原子性,以及備份服務器上已配置資源的可用性


消息生產者應注意使用事務,以便從服務器接收一組消息的肯定確認,並且應該保留他們為了執行它們的工作需要可用的交換,隊列和綁定的記錄,因此在故障切換時,可以在重放最近的要恢復的事務之前聲明適當的資源。


消息消費者應該意識到在故障轉移時丟失或重復消息的可能性:發布者可以決定重新發送其結果有疑問的事務,或者發布者認為完成的事務可能由於集群節點的故障而完全消失。

與外部資源交互

服務的常見模式是


    經由隊列接收服務請求

    更新一些外部資源,如文件或數據庫

    通過RabbitMQ進行回復,或至少向服務器確認觸發操作的消息已完成


至少一次模式的元素通常與外部資源模式一起出現 - 具體地,上面關於可靠消息傳遞的部分中討論的副作用通常對外部資源產生影響。


在交付必須被處理不超過一次並且與外部資源結合使用的情況下,重要的是編寫能夠在每個步驟的代碼以確定該步驟是否已經在完成整個交易的一些先前嘗試中采取,並且如果它具有,則能夠在該嘗試中省略它並繼續下一步驟。例如:


    如果工作觸發請求丟失,另一個副本將(最終)從最終請求者到達。

    如果已經執行了工作,例如更新了數據庫表,則在先前接收到有問題的工作項時,服務需要保持對於原子工作本身的原子的外部工作的完成的記錄:例如,在相同的數據庫事務中,可以更新尊敬請求的一些日志,或者可以更新被修改的行以包括引起修改的請求的ID,以及修改該行的先前請求ID題。


這使得重要的是能夠壓縮請求ID,以便它們不在執行的工作的日志中占用無界空間,並且使得我們不需要與最終請求者引入完全分布式垃圾收集協議。這樣做的一種方式是選擇使用嚴格增加的請求ID,使得可以使用“高水位線”。一旦知道已經執行了工作,並且已經產生了答復(如果存在答復),則可以根據需要將答復發送回請求者。請求者知道它期望的回復,並且可以丟棄不想要的重復。只要相同請求的重復總是收到相同的答復消息,則復制者不必小心發送太多的復制副本。一旦已經將答復發送到服務器,則可以確認請求消息為已接收並且與服務器服務器一起處理。在沒有對請求的答復的情況下,確認仍然有用以確保請求不丟失。







免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM