MassTransit 入門(一)


本文地址源碼

  • MassTransit是一個面向.net的免費開源分布式應用程序框架。
  • MassTransit使得創建應用程序和服務變得很容易,這些應用程序和服務利用基於消息的、松散耦合的異步通信來獲得更高的可用性、可靠性和可伸縮性。
  • MassTransit 8.x版本。

實現簡單發布訂閱

  • 添加Nuget包引用:
    • MassTransit
    • MassTransit.RabbitMQ(演示也可基於內存)

生產端

  • 配置MassTransit
builder.Services.AddMassTransit(x =>
{
   
    // 使用內存
    //x.UsingInMemory();
    // 使用RabbitMq
    x.UsingRabbitMq((context, config) =>
    {
        config.Host("rabbitmq://localhost:5672", host =>
        {
            host.Username("admin");
            host.Password("admin");
        });
    });
});
  • 定義消息體
public class OrderEto
{
    public Guid Id { get; init; }

    public string Name { get; set; }
    
    public DateTime CreationTime { get; set; }
}
  • 發布消息
[ApiController]
[Route("[controller]")]
public class PublishController : ControllerBase
{
    private readonly ILogger<PublishController> _logger;
    private readonly IPublishEndpoint _publishEndpoint;

    public PublishController(ILogger<PublishController> logger, IPublishEndpoint publishEndpoint)
    {
        _logger = logger;
        _publishEndpoint = publishEndpoint;
    }

    [HttpGet]
    public async Task Get()
    {
        await _publishEndpoint.Publish<OrderEto>(new OrderEto()
        {
            Id = Guid.NewGuid(),
            Name = "Phone",
            CreationTime = DateTime.Now
        });
    }
}

消費者端

builder.Services.AddMassTransit(x =>
{
    
    // 通過掃描程序集注冊消費者
    x.AddConsumers(typeof(Program).Assembly);
   
    // 通過類型單個注冊消費者
    // x.AddConsumer<OrderEtoConsumer>(typeof(OrderEtoConsumerDefinition));
    
    // x.SetKebabCaseEndpointNameFormatter();
    
    // 通過泛型單個注冊消費者
    //x.AddConsumer<OrderEtoConsumer, OrderEtoConsumerDefinition>();
    
    // 通過指定命名空間注冊消費者
    // x.AddConsumersFromNamespaceContaining<OrderEtoConsumer>();
    
    // 使用內存隊列
    // x.UsingInMemory();
    x.UsingRabbitMq((context, config) =>
    {
      
        config.Host("rabbitmq://localhost:5672", hostconfig =>
        {
            hostconfig.Username("admin");
            hostconfig.Password("admin");
        });
        
        config.ConfigureEndpoints(context);
       
    });
});
  • 消費者定義
public class OrderEtoConsumer : IConsumer<OrderEto>
{
    private readonly ILogger<OrderEtoConsumer> _logger;

    public OrderEtoConsumer(ILogger<OrderEtoConsumer> logger)
    {
        _logger = logger;
    }

    public Task Consume(ConsumeContext<OrderEto> context)
    {
        _logger.LogInformation($"MassTransit.Consumer.One 收到消息:{JsonSerializer.Serialize(context.Message)}");
        return Task.CompletedTask;
    }
}

public class OrderEtoConsumerDefinition : ConsumerDefinition<OrderEtoConsumer>
{
    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator, IConsumerConfigurator<OrderEtoConsumer> consumerConfigurator)
    {
        endpointConfigurator.UseMessageRetry(r => r.Intervals(500, 1000));
    }
}

Abp Vnext Vue實現


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM