[Github]:https://github.com/iccb1013/Sheng.RabbitMQ.CommandExecuter
Sheng.RabbitMQ.CommandExecuter 是使用 .Net 對 RabbitMQ 的一個簡單封裝。
它通過XML配置文件定義Exchange及隊列等信息,根據此配置文件自動聲明及初始化相關隊列信息,方便 .Net 開發人員使用 RabbitMQ。
並實現了一個基於 MQ 的命令執行器,將 MQ 消息抽象化為命令,發布端和訂閱端通過命令進行交互。默認實現了兩個命令:
1)HTTP請求轉發,將收到的MQ消息的指定內容轉發到指定URL上;
2)數據庫同步,通過預先定義的配置文件,指明不同數據庫和表之間的關聯關系,發送端向 MQ 中發布數據庫同步命令后,訂閱方(可作為 windows 服務部署,已在工程中實現)負責解析並執行數據庫同步工作。
你可以直接使用基本的 RabbitMQ 封裝,也可以在此命令模式的基礎上實現你自己的命令。
Sheng.RabbitMQ.CommandExecuter.Contract:命令模式中的命令契約,命令的發布方和接收方通過此契約類庫共享已知類型。
Sheng.RabbitMQ.CommandExecuter.Core:命令模式的核心實現,命令的解釋和執行在此實現,並默認實現了兩個命令。
Sheng.RabbitMQ.CommandExecuter.RabbitMQ:RabbitMQ 的簡單封裝,通過XML配置文件初始化 RabbitMQ,可單獨使用此工程方便在 .Net 工程中使用 RabbitMQ。
Sheng.RabbitMQ.CommandExecuter.Service:Windows 服務,可做為命令模式中的接收端。
Sheng.RabbitMQ.CommandExecuter.WindowsForm:沒有實際功能,方便開發階段調試程序。
RabbitMQConfig.xml 用於配置 RabbitMQ 信關信息:
<?xml version="1.0" encoding="utf-8" ?> <rabbitMQ> <connectionFactory hostName="192.168.100.100" userName="user" password="1234"></connectionFactory> <exchangeList> <exchange name="exchangeName_A" type="direct"> <queueList> <queue name="queue_test_A" durable="true" exclusive="false" autoDelete="false" routingKey="routingKey_A" type="send"></queue> <queue name="queue_test_B" durable="true" exclusive="false" autoDelete="false" routingKey="routingKey_B" type="receive"></queue> </queueList> </exchange> </exchangeList> </rabbitMQ>
相關屬性與 RabbitMQ 名稱意義皆相同,只是注意一點,queue 節點中的 type 有 send 和 receive 兩種,send 表示此隊列只用於發送消息,receive 表示此隊列只用於接收消息。RabbitMQService 不會訂閱 type 為 send 的隊列的消息。
代碼簡要說明:
聲明核心類:RabbitMQService ,然后通過此類訂閱消息,發送消息。
RabbitMQService _rabbitMQService = RabbitMQService.Instance;
通過 Subscribe 方法訂閱消息:
private void btnStartService_Click(object sender, EventArgs e) { _rabbitMQService.Subscribe("routingKey_B", RabbitMQCallback); _rabbitMQService.Start(); MessageBox.Show("RabbitMQService 已啟動。"); } private void RabbitMQCallback(ulong deliveryTag, string routingKey, string body) { _rabbitMQService.Ack(deliveryTag, false); Debug.WriteLine(routingKey + Environment.NewLine + body); MessageBox.Show(routingKey + Environment.NewLine + body); }
_rabbitMQService.Start() 用於啟動 RabbitMQ 的監聽。
通過 Send 方法發送消息:
_rabbitMQService.Send("exchangeName_A", "routingKey_B", "123");
有關命令模式的實現說明:
在契約類庫中,所有的命令均繼承自 Command 類,並提供一個 CommandType 屬性
public abstract class Command { public string CommandType { get; protected set; } }
簡單的 URL 轉發命令的定義如下:
public class DatabaseSyncForwardCommand: Command { public const string CommandTypeName = "DatabaseSyncForwardCommand"; public DatabaseSyncForwardCommand() { base.CommandType = CommandTypeName; } public string Url { get;set; } public string CommandContent { get;set; } }
命令的解析和執行在 Sheng.RabbitMQ.CommandExecuter.Core 中
CommandExecuterService 用於向調用者提供命令執行服務的整體封裝,通過它啟動命令的監聽和執行。
CommandReceiver 目錄下是命令的接收和執行程序,每個命令都有一個對應的 *Receiver 類,這些類通過 CommandReceiverFactory 這個工廠類統一實例化和調用。
有關數據庫同步命令的說明
數據庫同步命令可以實現簡單的,多個數據庫之間的表數據同步。可以定義多個消費者,並為每個消費者定義多個數據提供者,並定義它們之間的表結構對應關系。在文件 DatabaseSyncConfig.xml 中定義:
<?xml version="1.0" encoding="utf-8" ?> <databaseSync> <connectionList> <connection name="connection1" connectionString=""/> <connection name="connection2" connectionString=""/> </connectionList> <consumerList> <consumer name="crm" connection="connection1"> <producerList> <producer name="erp" routingKey="routingKey_A" connection="connection2"> <tableDefinition> <table name="Customers" primaryKey="Id" consumerTable="Customers" consumerTablePrimaryKey="Id"> <Field name="Id" consumerField="Id" ></Field> <Field name="CustomerName" consumerField="CustomerName"></Field> <Field name="NopUserName" consumerField="NopUserName"></Field> </table> </tableDefinition> </producer> </producerList> </consumer> </consumerList> </databaseSync>
connectionList 節點用於定義數據庫連接,可以定義多個不同的數據庫連接,並在下面的配置環節中引用。
consumerList 用於配置消費者,consumer 下的 producer 表示針對這一消費者的數據提供者。
tableDefinition 用於定義消費者和數據提供者之間的表結構對應關系。
相關程序在收到數據庫同步命令時,通過數據提供者定義中的 routingKey 來判斷數據同步命令來自哪個數據提供者。
對於數據的提供者,通過類似如下代碼,向 MQ 中發送數據同步命令:
DatabaseSyncCommand cmd = new DatabaseSyncCommand(); DatabaseSyncItem item1 = new DatabaseSyncItem() { Action = DatabaseSyncAction.Add, Table = "Customers", PrimaryKeyValue = "062B54F5-69AA-A108-09F8-39DB9C2F58C4" }; cmd.SyncItemList.Add(item1); string json = JsonConvert.SerializeObject(cmd); _rabbitMQService.Send("exchangeName_A", "routingKey_A", json);
以上。
QQ: 279060597 @南京
引用請注明原文出處: http://sheng.city/post/sheng-rabbitmq-commandexecuter