MassTransit中Request&Response基本使用


MassTransit 是一個自由、開源、輕量級的消息總線基於.Net框架, 用於創建分布式應用程序。方便搭建基於消息的松耦合異步通信的應用程序和服務。MassTransit 在現有消息傳輸上提供了一組廣泛的功能, 從而使開發人員能夠友好地使用基於消息的會話模式異步連接服務。基於消息的通信是實現面向服務的體系結構的可靠和可擴展的方式。

官網地址:http://masstransit-project.com/

發布訂閱模式

這種場景十分常見,發送一個消息(或事件)到消息隊列中,有一個或是多個訂閱方對預期的消息接收處理。

圖片

基於需要搭建了兩個WebApi程序,用於模擬發送方和訂閱方,其中的RabbitMQ已預先搭建好了,只在程序中引用包配置下即可。

<PackageReference Include="MassTransit" Version="7.2.0" />
<PackageReference Include="MassTransit.AspNetCore" Version="7.2.0" />
<PackageReference Include="MassTransit.RabbitMQ" Version="7.2.0" />

發布端配置

在Startup中增加MassTransit需要的服務及初始化配置。

  • 對RabbitMQ的連接地址端口、虛擬主機、訪問賬號密碼等系列配置。
  • 對發送方需要發送的消息初始化一個請求客戶端,配置請求信息及推送到MQ的地址。
services.AddMassTransit(x =>
{
    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host(Configuration["RabbitmqConfig:HostIP"], ushort.Parse(Configuration["RabbitmqConfig:HostPort"]), Configuration["RabbitmqConfig:VirtualHost"], h =>
        {
            h.Username(Configuration["RabbitmqConfig:Username"]);
            h.Password(Configuration["RabbitmqConfig:Password"]);
        });
    });
    x.AddRequestClient<ValueEntered>(new Uri(GetServiceAddress("events-valueentered")));
});
services.AddMassTransitHostedService();

為了快速了解,使用Controller在Action中發起對MQ的消息推送

[ApiController]
[Route("[controller]")]
public class ValueController : ControllerBase
{
    readonly IPublishEndpoint _publishEndpoint;
    public ValueController(IPublishEndpoint publishEndpoint)
    {
        _publishEndpoint = publishEndpoint;
    }
    [HttpPost]
    public async Task<ActionResult> Post(string value)
    {
        await _publishEndpoint.Publish<ValueEntered>(new
        {
            Value = value
        });
        return Ok();
    }
}

訂閱端配置

訂閱端也創建一個WebApi應用,在Startup中增加MassTransit的服務,使用到的Nuget包和發布端一樣。

  • 對RabbitMQ的連接地址端口、虛擬主機、訪問賬號密碼等系列配置。
  • 為訂閱端增加一個訂閱處理的Handler,即如下的ValueEnteredEventConsumer
  • 增加一個接受點,指定隊列名稱,即發送端發送的隊列名稱,設置該隊列消費處理的Consumer,即ValueEnteredEventConsumer
services.AddMassTransit(x =>
{
    x.AddConsumer<ValueEnteredEventConsumer>();
    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host(Configuration["RabbitmqConfig:HostIP"], ushort.Parse(Configuration["RabbitmqConfig:HostPort"]), Configuration["RabbitmqConfig:VirtualHost"], h =>
        {
            h.Username(Configuration["RabbitmqConfig:Username"]);
            h.Password(Configuration["RabbitmqConfig:Password"]);
        });
        cfg.ReceiveEndpoint("events-valueentered", e =>
        {
            e.ConfigureConsumer<ValueEnteredEventConsumer>(context);
        });
    });
});
services.AddMassTransitHostedService();

如此一來,通過Postman發送一個請求,經發布端發布一個消息到RabbitMQ,訂閱端偵聽消息,處理消息,一切都很熟悉。
圖片

請求響應模式

在發布訂閱的基礎上,改變以往的習慣,當發布一個消息后,等待訂閱方的處理,並將消息推送回RabbitMQ,發送方接受到處理后的消息繼續執行。

圖片

請求端

在Startup中新加上一個用於發送消息(CheckOrderStatus)的請求客戶端及指定消息隊列名稱(為每一個消息創建一個單獨的隊列)。

x.AddRequestClient<CheckOrderStatus>(new Uri(GetServiceAddress("events-checkorderstatus")));

增加一個Controller及Action,來請求及獲取處理結果(OrderStatusResult)。

[ApiController]
[Route("[controller]")]
public class OrderController : ControllerBase
{
    private readonly IRequestClient<CheckOrderStatus> _client;
    public OrderController(IRequestClient<CheckOrderStatus> client)
    {
        _client = client;
    }
    public async Task<OrderStatusResult> Get(string id)
    {
        var response = await _client.GetResponse<OrderStatusResult>(new { OrderId = id });
        return response.Message;
    }
}

響應端

同樣在響應端Startup中對新的消息設置下消息偵聽隊列以及相應的Handler如下的ValueEnteredEventConsumer去消費消息並返回處理結果。

x.AddConsumer<CheckOrderStatusConsumer >();
x.UsingRabbitMq((context, cfg) =>
{
    // ...
    cfg.ReceiveEndpoint("events-checkorderstatus", e =>
    {
    e.ConfigureConsumer<CheckOrderStatusConsumer >(context);
    });
});

Consumer中獲取請求參數,執行請求,返回執行結果。

public class CheckOrderStatusConsumer : IConsumer<CheckOrderStatus>
{
    public async Task Consume(ConsumeContext<CheckOrderStatus> context)
    {
        if (context.Message.OrderId == "9527")
        {
            throw new InvalidOperationException("Order not found");
        }
        Console.WriteLine($"OrderId:{context.Message.OrderId}");
        await context.RespondAsync<OrderStatusResult>(new
        {
            OrderId = context.Message.OrderId,
            Timestamp = Guid.NewGuid().ToString(),
            StatusCode = "1",
            StatusText = "Close"
        });
    }
}

這樣一來,當請求端發起一個消息(事件)到RabbitMQ,響應端偵聽並處理完畢返回處理結果到RabbitMQ,請求端依照響應結果繼續執行后續請求。
圖片

HTTP方式差異

與以往的Http請求方式有所不同,通過httpClient.PostAsync發送請求,接收端處理並返回結果,而走requestClient發送請求到RabbitMQ,再由RabbitMQ推送到偵聽節點消費並返回結果,如下第一二部分結構。

圖片

2021-06-29,望技術有成后能回來看見自己的腳步


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM