Github開源:Sheng.RabbitMQ.CommandExecuter (RabbitMQ 的命令模式實現)


[Github]:https://github.com/iccb1013/Sheng.RabbitMQ.CommandExecuter

Sheng.RabbitMQ.CommandExecuter 是使用 .Net 對 RabbitMQ 的一個簡單封裝。

它通過XML配置文件定義Exchange及隊列等信息,根據此配置文件自動聲明及初始化相關隊列信息,方便 .Net 開發人員使用 RabbitMQ。

並實現了一個基於 MQ 的命令執行器,將 MQ 消息抽象化為命令,發布端和訂閱端通過命令進行交互。默認實現了兩個命令:
1)HTTP請求轉發,將收到的MQ消息的指定內容轉發到指定URL上;
2)數據庫同步,通過預先定義的配置文件,指明不同數據庫和表之間的關聯關系,發送端向 MQ 中發布數據庫同步命令后,訂閱方(可作為 windows 服務部署,已在工程中實現)負責解析並執行數據庫同步工作。

你可以直接使用基本的 RabbitMQ 封裝,也可以在此命令模式的基礎上實現你自己的命令。

 

Sheng.RabbitMQ.CommandExecuter.Contract:命令模式中的命令契約,命令的發布方和接收方通過此契約類庫共享已知類型。

Sheng.RabbitMQ.CommandExecuter.Core:命令模式的核心實現,命令的解釋和執行在此實現,並默認實現了兩個命令。

Sheng.RabbitMQ.CommandExecuter.RabbitMQ:RabbitMQ 的簡單封裝,通過XML配置文件初始化 RabbitMQ,可單獨使用此工程方便在 .Net 工程中使用 RabbitMQ。

Sheng.RabbitMQ.CommandExecuter.Service:Windows 服務,可做為命令模式中的接收端。

Sheng.RabbitMQ.CommandExecuter.WindowsForm:沒有實際功能,方便開發階段調試程序。

 

RabbitMQConfig.xml 用於配置 RabbitMQ 信關信息:

<?xml version="1.0" encoding="utf-8" ?>
<rabbitMQ>
  <connectionFactory hostName="192.168.100.100" userName="user" password="1234"></connectionFactory>
  <exchangeList>
    <exchange name="exchangeName_A" type="direct">
      <queueList>
        <queue name="queue_test_A" durable="true" exclusive="false" autoDelete="false" routingKey="routingKey_A" type="send"></queue>
        <queue name="queue_test_B" durable="true" exclusive="false" autoDelete="false" routingKey="routingKey_B" type="receive"></queue>
      </queueList>
    </exchange>
  </exchangeList>
</rabbitMQ>

 

 

相關屬性與 RabbitMQ 名稱意義皆相同,只是注意一點,queue 節點中的 type 有 send 和 receive 兩種,send 表示此隊列只用於發送消息,receive 表示此隊列只用於接收消息。RabbitMQService 不會訂閱 type 為 send 的隊列的消息。

 

代碼簡要說明:

聲明核心類:RabbitMQService ,然后通過此類訂閱消息,發送消息。

RabbitMQService _rabbitMQService = RabbitMQService.Instance;

通過 Subscribe 方法訂閱消息:

private void btnStartService_Click(object sender, EventArgs e)
       {
           _rabbitMQService.Subscribe("routingKey_B", RabbitMQCallback);
 
           _rabbitMQService.Start();
 
           MessageBox.Show("RabbitMQService 已啟動。");
       }
 
       private void RabbitMQCallback(ulong deliveryTag, string routingKey, string body)
       {
           _rabbitMQService.Ack(deliveryTag, false);
 
           Debug.WriteLine(routingKey + Environment.NewLine + body);
 
           MessageBox.Show(routingKey + Environment.NewLine + body);
       }

_rabbitMQService.Start() 用於啟動 RabbitMQ 的監聽。

通過 Send 方法發送消息:

_rabbitMQService.Send("exchangeName_A", "routingKey_B", "123");

 有關命令模式的實現說明:

在契約類庫中,所有的命令均繼承自 Command 類,並提供一個 CommandType 屬性

public abstract class Command
    {
        public string CommandType
        {
            get;
            protected set;
        }
    }

簡單的 URL 轉發命令的定義如下:

public class DatabaseSyncForwardCommand: Command
   {
       public const string CommandTypeName = "DatabaseSyncForwardCommand";
 
       public DatabaseSyncForwardCommand()
       {
           base.CommandType = CommandTypeName;
       }
 
       public string Url
       {
           get;set;
       }
 
       public string CommandContent
       {
           get;set;
       }
   }

命令的解析和執行在 Sheng.RabbitMQ.CommandExecuter.Core 中

CommandExecuterService 用於向調用者提供命令執行服務的整體封裝,通過它啟動命令的監聽和執行。

CommandReceiver 目錄下是命令的接收和執行程序,每個命令都有一個對應的 *Receiver 類,這些類通過 CommandReceiverFactory 這個工廠類統一實例化和調用。

 

有關數據庫同步命令的說明

數據庫同步命令可以實現簡單的,多個數據庫之間的表數據同步。可以定義多個消費者,並為每個消費者定義多個數據提供者,並定義它們之間的表結構對應關系。在文件 DatabaseSyncConfig.xml 中定義:

<?xml version="1.0" encoding="utf-8" ?>
<databaseSync>
  <connectionList>
    <connection name="connection1" connectionString=""/>
    <connection name="connection2" connectionString=""/>
  </connectionList>
  <consumerList>
    <consumer name="crm" connection="connection1">
      <producerList>
        <producer name="erp" routingKey="routingKey_A" connection="connection2">
          <tableDefinition>
            <table name="Customers" primaryKey="Id" consumerTable="Customers" consumerTablePrimaryKey="Id">
              <Field name="Id" consumerField="Id" ></Field>
              <Field name="CustomerName" consumerField="CustomerName"></Field>
              <Field name="NopUserName" consumerField="NopUserName"></Field>
            </table>
          </tableDefinition>
        </producer>
      </producerList>
    </consumer>
  </consumerList>
</databaseSync>

connectionList 節點用於定義數據庫連接,可以定義多個不同的數據庫連接,並在下面的配置環節中引用。

consumerList 用於配置消費者,consumer 下的 producer 表示針對這一消費者的數據提供者。
tableDefinition 用於定義消費者和數據提供者之間的表結構對應關系。
相關程序在收到數據庫同步命令時,通過數據提供者定義中的 routingKey 來判斷數據同步命令來自哪個數據提供者。

對於數據的提供者,通過類似如下代碼,向 MQ 中發送數據同步命令:

DatabaseSyncCommand cmd = new DatabaseSyncCommand();
 
DatabaseSyncItem item1 = new DatabaseSyncItem()
{
    Action = DatabaseSyncAction.Add,
    Table = "Customers",
    PrimaryKeyValue = "062B54F5-69AA-A108-09F8-39DB9C2F58C4"
};
 
cmd.SyncItemList.Add(item1);
 
string json = JsonConvert.SerializeObject(cmd);
 
_rabbitMQService.Send("exchangeName_A", "routingKey_A", json);

以上。

QQ: 279060597    @南京

引用請注明原文出處: http://sheng.city/post/sheng-rabbitmq-commandexecuter

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM