支持異步同步的分布式CommandBus MSMQ實現 - 支持Session傳遞、多實例處理


先上一張本文所描述的適用場景圖

 

分布式場景,共3台server:

  1. 前端Server
  2. Order App Server
  3. Warehouse App Server

功能:

  1. 前端Server可以不停的發送Command到CommandBus,然后由CommandBus分配不同的Command到各自的app server去處理。
  2. 前端Server可以只發送Command而不必等待Response
  3. 前端Server可以同步等待Response返回
  4. MSMQ消息超過3.5M會自動轉為網絡共享方式傳輸消息
  5. 對於同一Command的處理,可以通過增加App Server的方式來提高並發處理速度(比如:可以開2個app server instance來同時處理ACommand的處理)
  6. 在App server中能讀取前端Server的Session value(寫Session還不支持)

本文目標是用msmq實現分布式CommandBus的應用(已經更新到A2D Framework中了)。

前端Server發送Command的方式(異步):

ACommand cmd = new ACommand() { Tag = "aaa" };
CommandBusDistributer<ACommand, ACommandResult> cmdDistributer = new CommandBusDistributer<ACommand, ACommandResult>();
cmdDistributer.ResultCatached += new CommandResultCatchedDelegate<ACommandResult>(cmdDistributer_ResultCatached);
cmdDistributer.SendRequest(cmd);

 

同步方式:

ACommand cmd = new ACommand() { Tag = "aaa" };
CommandBusDistributer<ACommand, ACommandResult> cmdDistributer = new CommandBusDistributer<ACommand, ACommandResult>(); 
cmdDistributer.SendRequest(cmd);
ACommandResult result=cmdDistributer.WaitResponse();

配置文件

<?xml version="1.0" encoding="utf-8" ?>
<CommandBusSetting>
  <AutoCreateIfNotExists>true</AutoCreateIfNotExists>
  <WebSessionSupport>true</WebSessionSupport>
  <CommandQueue>PC-20130606HCVP\private$\Commands_{0}</CommandQueue>
  <ResponseQueue>PC-20130606HCVP\private$\CommandResponses</ResponseQueue>
  <NetworkLocation>\\PC-20130606HCVP\network</NetworkLocation>
</CommandBusSetting>

 

 

Command的編寫方式

[QueueName("ACommand")]//這個可選,沒有QueueName時,默認對應的msmq隊列名為類名,此處為ACommand public class ACommand : BaseCommand  //需要繼承自BaseCommand
{
        public string Tag { get; set; }//自定義的屬性
}

 

 

后端App Server的編寫

CommandHandlerHost1: Console程序,相當於App Server 1,會處理部分Command

static void Main(string[] args)
        {
            Thread.Sleep(2000);

            CommandHandlerListener listener = new CommandHandlerListener();
            listener.AddHandler(new TestCommandHandlers());
            listener.AddHandler(new Test2CommandHandlers());
            listener.Start();
            Console.ReadKey();
        }

 

CommandHandlerHost2: Console程序,相當於App Server 2,會處理部分Command

static void Main(string[] args)
        {
            Thread.Sleep(2000);

            CommandHandlerListener listener = new CommandHandlerListener();
            listener.AddHandler(new Test3CommandHandlers());
            listener.Start();
            Console.ReadKey();
        }

 

CommandHandlers: 所有的Command處理函數都會在這個項目中實現

public class TestCommandHandlers : ICommandHandlers,
        ICommandHandler<ACommand, ACommandResult>,
        ICommandHandler<BCommand, BCommandResult>
    {
        public ACommandResult Handler(ACommand cmd, ISessionContext session)
        {
            Console.WriteLine("From [public ACommandResult Handler(ACommand cmd)]: " + cmd.Tag);

            ACommandResult result = new ACommandResult();
            result.Result = "result from ACommand";
            return result;
        }

        public BCommandResult Handler(BCommand cmd, ISessionContext session)
        {
            Console.WriteLine("From [public BCommandResult Handler(BCommand cmd)]: " + cmd.Tag);

            if (session != null)
            {
                Console.WriteLine("session not null");
                object o = session.GetWebSession("key1");
                if (o != null)
                {
                    Console.WriteLine("From Session['key1']: " + Convert.ToString(o));
                    //session.Set("key1", "changed from command handler: " + DateTime.Now);
                }
            }

            BCommandResult result = new BCommandResult();
            result.Result = "result from BCommand";
            return result;
        }
    }

 

 

下面是目前的性能測試:

 

下載代碼 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM