MassTransit 是一個自由、開源、輕量級的消息總線基於.Net框架, 用於創建分布式應用程序。方便搭建基於消息的松耦合異步通信的應用程序和服務。MassTransit 在現有消息傳輸上提供了一組廣泛的功能, 從而使開發人員能夠友好地使用基於消息的會話模式異步連接服務。基於消息的通信是實現面向服務的體系結構的可靠和可擴展的方式。
官網地址:http://masstransit-project.com/
發布訂閱模式
這種場景十分常見,發送一個消息(或事件)到消息隊列中,有一個或是多個訂閱方對預期的消息接收處理。
基於需要搭建了兩個WebApi程序,用於模擬發送方和訂閱方,其中的RabbitMQ已預先搭建好了,只在程序中引用包配置下即可。
<PackageReference Include="MassTransit" Version="7.2.0" />
<PackageReference Include="MassTransit.AspNetCore" Version="7.2.0" />
<PackageReference Include="MassTransit.RabbitMQ" Version="7.2.0" />
發布端配置
在Startup中增加MassTransit需要的服務及初始化配置。
- 對RabbitMQ的連接地址端口、虛擬主機、訪問賬號密碼等系列配置。
- 對發送方需要發送的消息初始化一個請求客戶端,配置請求信息及推送到MQ的地址。
services.AddMassTransit(x =>
{
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(Configuration["RabbitmqConfig:HostIP"], ushort.Parse(Configuration["RabbitmqConfig:HostPort"]), Configuration["RabbitmqConfig:VirtualHost"], h =>
{
h.Username(Configuration["RabbitmqConfig:Username"]);
h.Password(Configuration["RabbitmqConfig:Password"]);
});
});
x.AddRequestClient<ValueEntered>(new Uri(GetServiceAddress("events-valueentered")));
});
services.AddMassTransitHostedService();
為了快速了解,使用Controller在Action中發起對MQ的消息推送
[ApiController]
[Route("[controller]")]
public class ValueController : ControllerBase
{
readonly IPublishEndpoint _publishEndpoint;
public ValueController(IPublishEndpoint publishEndpoint)
{
_publishEndpoint = publishEndpoint;
}
[HttpPost]
public async Task<ActionResult> Post(string value)
{
await _publishEndpoint.Publish<ValueEntered>(new
{
Value = value
});
return Ok();
}
}
訂閱端配置
訂閱端也創建一個WebApi應用,在Startup中增加MassTransit的服務,使用到的Nuget包和發布端一樣。
- 對RabbitMQ的連接地址端口、虛擬主機、訪問賬號密碼等系列配置。
- 為訂閱端增加一個訂閱處理的Handler,即如下的ValueEnteredEventConsumer
- 增加一個接受點,指定隊列名稱,即發送端發送的隊列名稱,設置該隊列消費處理的Consumer,即ValueEnteredEventConsumer
services.AddMassTransit(x =>
{
x.AddConsumer<ValueEnteredEventConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(Configuration["RabbitmqConfig:HostIP"], ushort.Parse(Configuration["RabbitmqConfig:HostPort"]), Configuration["RabbitmqConfig:VirtualHost"], h =>
{
h.Username(Configuration["RabbitmqConfig:Username"]);
h.Password(Configuration["RabbitmqConfig:Password"]);
});
cfg.ReceiveEndpoint("events-valueentered", e =>
{
e.ConfigureConsumer<ValueEnteredEventConsumer>(context);
});
});
});
services.AddMassTransitHostedService();
如此一來,通過Postman發送一個請求,經發布端發布一個消息到RabbitMQ,訂閱端偵聽消息,處理消息,一切都很熟悉。
請求響應模式
在發布訂閱的基礎上,改變以往的習慣,當發布一個消息后,等待訂閱方的處理,並將消息推送回RabbitMQ,發送方接受到處理后的消息繼續執行。
請求端
在Startup中新加上一個用於發送消息(CheckOrderStatus)的請求客戶端及指定消息隊列名稱(為每一個消息創建一個單獨的隊列)。
x.AddRequestClient<CheckOrderStatus>(new Uri(GetServiceAddress("events-checkorderstatus")));
增加一個Controller及Action,來請求及獲取處理結果(OrderStatusResult)。
[ApiController]
[Route("[controller]")]
public class OrderController : ControllerBase
{
private readonly IRequestClient<CheckOrderStatus> _client;
public OrderController(IRequestClient<CheckOrderStatus> client)
{
_client = client;
}
public async Task<OrderStatusResult> Get(string id)
{
var response = await _client.GetResponse<OrderStatusResult>(new { OrderId = id });
return response.Message;
}
}
響應端
同樣在響應端Startup中對新的消息設置下消息偵聽隊列以及相應的Handler如下的ValueEnteredEventConsumer去消費消息並返回處理結果。
x.AddConsumer<CheckOrderStatusConsumer >();
x.UsingRabbitMq((context, cfg) =>
{
// ...
cfg.ReceiveEndpoint("events-checkorderstatus", e =>
{
e.ConfigureConsumer<CheckOrderStatusConsumer >(context);
});
});
Consumer中獲取請求參數,執行請求,返回執行結果。
public class CheckOrderStatusConsumer : IConsumer<CheckOrderStatus>
{
public async Task Consume(ConsumeContext<CheckOrderStatus> context)
{
if (context.Message.OrderId == "9527")
{
throw new InvalidOperationException("Order not found");
}
Console.WriteLine($"OrderId:{context.Message.OrderId}");
await context.RespondAsync<OrderStatusResult>(new
{
OrderId = context.Message.OrderId,
Timestamp = Guid.NewGuid().ToString(),
StatusCode = "1",
StatusText = "Close"
});
}
}
這樣一來,當請求端發起一個消息(事件)到RabbitMQ,響應端偵聽並處理完畢返回處理結果到RabbitMQ,請求端依照響應結果繼續執行后續請求。
HTTP方式差異
與以往的Http請求方式有所不同,通過httpClient.PostAsync發送請求,接收端處理並返回結果,而走requestClient發送請求到RabbitMQ,再由RabbitMQ推送到偵聽節點消費並返回結果,如下第一二部分結構。
2021-06-29,望技術有成后能回來看見自己的腳步