Masstransit開發基於消息傳遞的分布式應用


使用Masstransit開發基於消息傳遞的分布式應用

Masstransit作為.Net平台下的一款優秀的開源產品卻沒有得到應有的關注,這段時間有機會閱讀了Masstransit的源碼,我覺得我有必要普及一下這個框架的使用。

值得一提的是Masstransit的源碼寫的非常優秀,值得每個想提高自己編程能力的.Net選手閱讀,整個代碼看起來賞心悅目。反之,每次打開自己公司項目的時候心情都異常沉重。所以不是.Net不行,還是咱們水平不行。

學會了Masstransit你再也不用羡慕別人有Dubbo、Mule、Akka什么的了,當然在某些方面他們的使用場景還是有一些區別。另外插播一條廣告:本人目前在西安求職中,如果那位同學有好的工作機會希望能夠幫忙推薦。

閱讀本篇文章的前提是你需要對消息隊列有一些了解,特別是RabbitMq,Masstransit作為一款輕量級的ESB默認支持RabbitMq和MSMQ。本文的例子都使用RabbitMq來介紹,所以你最好能讀一下我之前寫的《如何優雅的使用RabbitMq》。

簡單來說,Masstransit提供了使用消息隊列場景的一種抽象,也就是說,如果你有使用消息隊列的需求,都可以通過Masstransit來完成,當然如果僅僅是拿消息隊列來發個短信、郵件之類的並不能體現出Masstransit的優越性。當整個業務系統都通過Masstransit過來構建和交互的時候,才能真正體現ESB的價值所在。

我寫了5不同場景個Demo,方便大家學習和參考。我會重點講解Real World的案例,也就是如何在真實場景使用Masstransit。如果僅僅是把一些組件融入到了項目中並且能夠運行,並不能算是一個合格的架構師,一個合格的架構師一定是可以將某個組件以最佳實踐的方式融入到了自己的項目中,並且能夠為開發者提供清晰且合理的抽象,然后針對這一方案制定一些約定和規則,隨着項目的推進,整個項目的代碼都能夠有章可循,始終在架構師的掌控之中。

一、發送命令模型(Send Command Pattern)

這種模型最常見的就是CQRS中C,用來向DomainHandler發送一個Command。另外系統的發送郵件服務、發送短信服務也可以通過這種模式來實現。這種模型跟郵遞員向郵箱投遞郵件有點相似。這一模型的特點是你需要知道對方終結點的地址,意味着你要明確要向哪個地址發送消息。從Masstransit提供的api就可以看出來:

1
2
3
4
5
6
7
8
var  endPoint =await bus.GetSendEndpoint(sendToUri);
             var  command = new  GreetingCommandA()
             {
                 Id = Guid.NewGuid(),
                 DateTime = DateTime.Now
             };
 
             await endPoint.Send(command);

這個Demo主要由2個工程組成,Client發送消息到Server,Server來響應這一消息。

二、發布/訂閱模型(publish/subscribe pattern)

之所以有基於消息傳遞的分布式應用這種架構模式,很大程度上就是依靠這種模式來完成。一個典型的例子是子系統A發布了一條消息,子系統B和子系統C都可以訂閱這一消息並異步處理該消息。而這一過程對子系統A來說是不關心的。從而減少不同的子系統之間的耦合和可擴展性。

三、消息的繼承層次

用過RabbitMQ的同學應該知道,RabbitMQ提供了3中類型的Exchange,分別為direct、fanout已經topic。所有這一切都是為了提供一種路由消息的機制。而這一切是通過匹配一種字符串類型的routingKey來實現的,當然有了Masstransit你就不用這么費勁了。C#作為一種強類型的語言,我們可以通過設計消息的繼承層次來實現消息的路由機制。比如我們可以設計下面的消息繼承體系:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public  interface  IMessage
    {
        Guid Id { get ; set ; }
    }
 
    public  class  Message : IMessage
    {
        public  Guid Id { get ; set ; }
        public  string  Type { get ; set ; }
    }
 
    public  class  UserUpdatedMessage : Message
    {
        public  Guid Id { get ; set ; }
    }

有了這樣的繼承體系,我們可以定義下面的Consumer類型:

1
2
3
4
5
6
7
public  class  BaseInterfaceMessageConsumer:IConsumer<IMessage>
     {
         public  async Task Consume(ConsumeContext<IMessage> context)
         {
             await Console.Out.WriteLineAsync($ "consumer is BaseInterfaceMessageConsumer,message type is {context.Message.GetType()}" );
         }
     }

還可以定義下面的Consumer類型:

1
2
3
4
5
6
7
public  class  UserUpdatedMessageConsumer: IConsumer<UserUpdatedMessage>
     {
         public  async Task Consume(ConsumeContext<UserUpdatedMessage> context)
         {
             await Console.Out.WriteLineAsync($ "consumer is UserUpdatedMessageConsumer,message type is {context.Message.GetType()}" );
         }
     }

這樣就可以路由不同的消息到相應的Consumer中了。

四、使用Topshelf來構建windows服務

我們最終要將consumer程序集打成windows服務來安裝在產品環境下,Topshelf為我們提供了一組DSL描述的api來創建window服務:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
HostFactory.Run(x =>                                
             {
                 x.Service<GreetingServer>(s =>                       
                 {
                     s.ConstructUsing(name => new  GreetingServer());    
                     s.WhenStarted(tc => tc.Start());           
                     s.WhenStopped(tc => tc.Stop());
                 });
                 x.StartAutomatically();
                 x.RunAsLocalSystem();                         
                 x.SetDescription( "A greeting service" );       
                 x.SetDisplayName( "Greeting Service" );                     
                 x.SetServiceName( "GreetingService" );    
             });

五、RPC調用(request/response pattern)

我們還可以通過Masstransit實現RPC調用:

1
2
3
var  response = await client.Request( new  SimpleRequest() {CustomerId = customerId});
 
Console.WriteLine( "Customer Name: {0}" , response.CusomerName);

這有點像是一個webservice調用,不過在ESB的設計中我們應該盡量避免這種設計,特別是在異構系統之間,應該盡量采用send command pattern和publish/subscriber pattern。

六、正式場景該如何使用Masstransit

在使用Masstranit的正式場景中,我們主要考慮以下幾個方面:

1、配置方式

定義一個抽象類,用來統一配置方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public  abstract  class  BusConfiguration
{
     public  abstract  string  RabbitMqAddress { get ; }
     public  abstract  string  QueueName { get ; }
     public  abstract  string  RabbitMqUserName { get ; }
     public  abstract  string  RabbitMqPassword { get ; }
     public  abstract  Action<IRabbitMqBusFactoryConfigurator,IRabbitMqHost> Configuration { get ; }
 
 
     public  virtual  IBus CreateBus()
     {
         var  bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
         {
             var  host = cfg.Host( new  Uri(RabbitMqAddress), hst =>
             {
                 hst.Username(RabbitMqUserName);
                 hst.Password(RabbitMqPassword);
             });
 
             Configuration?.Invoke(cfg, host);
         });
 
         return  bus;
     }
}

具體的項目會繼承該配置類做對應的配置:如UserManagementBusConfiguration、UserManagementServiceBusConfiguration等

2、能夠跟DI容器結合,本例以Castle Windsor Container為例:

在web項目中添加ServiceBusInstaller:

1
2
3
4
5
6
7
8
9
10
public  class  ServiceBusInstaller:IWindsorInstaller
     {
         public  void  Install(IWindsorContainer container, IConfigurationStore store)
         {
             container.Register(
                 Component.For<IBus, IBusControl>()
                     .Instance(UserManagementBusConfiguration.BusInstance)
                     .LifestyleSingleton());
         }
     }

然后我們就可以在controller中注入IBus了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private  readonly  IUserProvider _userProvider;
         private  readonly  IBus _bus;
 
         public  ValuesController(IUserProvider userProvider,IBus bus)
         {
             _userProvider = userProvider;
             _bus = bus;
         }
 
         [HttpGet]
         [Route( "api/values/createuser" )]
         public  string  CreateUser()
         {
             //save user in local db
 
             _bus.Publish( new  UserCreatedEvent() {UserName = "Tom" , Email = "tom@google.com" });
 
             return  "create user named Tom" ;
         }

同樣的道理,在consumer項目中也可以做同樣的配置,添加ConsumersInstaller:

1
2
3
4
5
6
7
8
public  class  ConsumersInstaller:IWindsorInstaller
     {
         public  void  Install(IWindsorContainer container, IConfigurationStore store)
         {
             container.Register(
                 Classes.FromThisAssembly().BasedOn( typeof  (IConsumer)).WithServiceBase().WithServiceSelf().LifestyleTransient());
         }
     }

在Consumer中注入一個組件試試:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public  class  UserCreatedEventConsumer : IConsumer<UserCreatedEvent>
     {
         private  readonly  GreetingWriter _greetingWriter;
 
         public  UserCreatedEventConsumer(GreetingWriter greetingWriter)
         {
             _greetingWriter = greetingWriter;
         }
 
         public  async Task Consume(ConsumeContext<UserCreatedEvent> context)
         {
             _greetingWriter.SayHello();
 
             await Console.Out.WriteLineAsync($ "user name is {context.Message.UserName}" );
             await Console.Out.WriteLineAsync($ "user email is {context.Message.Email}" );
         }
     }

把web項目和consumer服務都跑起來看看:

3、重試配置

1
cfg.UseRetry(Retry.Interval(3, TimeSpan.FromMinutes(1)));

消息消費失敗后重試3次,每次間隔1分鍾

4、熔斷機制

1
cfg.UseRateLimit(1000, TimeSpan.FromSeconds(1));

每分鍾消息消費數限定在1000之內

5、異常處理(待續)

6、單元測試(待續)

7、消息定時發送(待續)

8、自定義中間件(待續)

9、自定義觀察者(待續)

10、長生命周期的消費者:Turnout(待續)

11、長生命周期的狀態機:saga(待續)

12、Routing slip pattern的實現:Courier(待續)

整個Demo代碼提供下載:http://git.oschina.net/richieyangs/RabbitMQ.Practice

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM