在我們的業務中,我們通常需要在自己的業務子系統之間相互發送消息,一端去發送消息另一端去消費當前消息,這就涉及到使用消息隊列MQ的一些內容,消息隊列成熟的框架有多種,這里你可以讀這篇文章來了解這些MQ的不同,這篇文章的主要目的是用來系統講述如何在Asp.Net Core中使用Kafka,整篇文章將介紹如何寫消息發送方代碼、消費方代碼、配套的工具的使用,希望讀完這篇文章之后對整個消息的運行機制有一定的理解,在這里通過一張圖來簡要了解一下消息隊列中的一些概念。

圖一 Kafka消息隊列
一 安裝NUGET包
在寫代碼之前首先要做的就是安裝nuget包了,我們這里使用的是Confluent.Kafka 1.0.0-RC4版本,具體項目要根據具體的時間來確定引用包的版本,這些包可能更新比較快。

圖二 引用Kafka包依賴
二 消息發送方(Producer)
1 在項目中添加所有觸發事件的接口 IIntegrationEvent,后面所有的觸發事件都是繼承自這個接口。
/// <summary>
/// 集成事件的接口定義
/// </summary>
public interface IIntegrationEvent {
string Key { get; set; }
}
2 定義Kafka生產者
/// <summary>
/// Kafka 生產者的 Domain Service
/// </summary>
public class KafkaProducer : DomainService {
private readonly IConfiguration _config;
private readonly ILogger<KafkaProducer> _logger;
public KafkaProducer(IConfiguration config,
ILogger<KafkaProducer> logger) {
_config = config;
_logger = logger;
}
/// <summary>
/// 發送事件
/// </summary>
/// <param name="event"></param>
public void Produce(IIntegrationEvent @event) {
var topic = _config.GetValue<string>($"Kafka:Topics:{@event.GetType().Name}");
var producerConfig = new ProducerConfig {
BootstrapServers = _config.GetValue<string>("Kafka:BootstrapServers"),
MessageTimeoutMs = _config.GetValue<int>("Kafka:MessageTimeoutMs")
};
var builder = new ProducerBuilder<string, string>(producerConfig);
using (var producer = builder.Build()) {
try {
var json = JsonConvert.SerializeObject(@event);
var dr = producer.ProduceAsync(topic, new Message<string, string> { Key = @event.Key, Value = json }).GetAwaiter().GetResult();
_logger.LogDebug("發送事件 {0} 到 {1} 成功", dr.Value, dr.TopicPartitionOffset);
} catch (ProduceException<string, string> ex) {
_logger.LogError(ex, "發送事件到 {0} 失敗,原因 {1} ", topic, ex.Error.Reason);
}
}
}
}
在這里我們的Producer根據業務的需要定義在領域服務中,這里面最關鍵的就是Produce方法了,該方法的參數是繼承自IIntegrationEvent 接口的各種各樣事件,在這個方法中,我們獲取配置在appsetting.json中配置的各種Topic以及Kafka服務器的地址,具體的配置如下方截圖所示。

圖三 配置服務器地址以及各種Topic
通過當前配置我們就知道我們的消息要發往何處,然后我們就可以創建一個producer來將我們的事件(實際上是定義的數據結構)序列化成Json,然后通過異步的方式發送出去,這里需要注意我們創建的Producer要放在一個using塊中,這樣在創建完成並發送消息之后就會釋放當前生產者。這里如果發送失敗會在當前日志中記錄發送的值以及錯誤的原因從而便於進行調試。這里舉出其中的一個事件RepairContractFinishedEvent為例來說明。
/// <summary>
/// 維修合同完成的事件
/// </summary>
public class RepairContractFinishedEvent : IIntegrationEvent {
public RepairContract RepairContract { get; set; }
//一個維修合同會對應多個調整單
public List<RepairContractAdjust> RepairContractAdjusts { get; set; }
public string Key { get; set; }
}
這個里面RepairContract以及List集合都是我們定義的一種數據結構。
最后我們來看看在具體的領域層中我們該如何觸發此事件的,這里我們也定義了一個叫做IRepairContractEventManager接口的領域服務,並在里面定義了一個叫做Finished的接口,然后在RepairContractEventManager中實現該方法。
public class RepairContractEventManager : DomainService, IRepairContractEventManager {
private readonly KafkaProducer _producer;
private readonly IRepository<RepairContract, Guid> _repairContractRepository;
private readonly IRepository<RepairContractAdjust, Guid> _repairContractAdjustRepository;
public RepairContractEventManager(KafkaProducer producer,
IRepository<RepairContract, Guid> repairContractRepository,
IRepository<RepairContractAdjust, Guid> repairContractAdjustRepository) {
_producer = producer;
_repairContractRepository = repairContractRepository;
_repairContractAdjustRepository = repairContractAdjustRepository;
}
public void Finished(Guid repairContractId) {
var repairContract = _repairContractRepository.GetAll()
.Include(c => c.RepairContractWorkItems).ThenInclude(w => w.Materials)
.SingleOrDefaultAsync(c => c.Id == repairContractId).GetAwaiter().GetResult();
var repairContractAdjusts = _repairContractAdjustRepository.GetAll()
.Include(a => a.WorkItems).ThenInclude(w => w.Materials)
.Where(a => a.RepairContractId == repairContractId).ToListAsync().GetAwaiter().GetResult();
var @event = new RepairContractFinishedEvent {
Key = repairContract?.Code,
RepairContract = repairContract,
RepairContractAdjusts = repairContractAdjusts
};
_producer.Produce(@event);
}
}
這段代碼就是組裝RepairContractFinishedEvent的具體實現過程,然后調用我們之前創建的KafkaProducer對象然后將消息發送出去,這樣在需要觸發當前RepairContractFinishedEvent 的地方來注入IRepairContractEventManager接口,然后調對應的Finished方法,這樣就完成了整個消息的發送的過程了。
三 查看消息的發送
在發送完消息后我們可以到Kafka 集群 Control Center中查找我們發送的所有消息。選擇其中的一條消息,雙擊,然后選擇INSPECT來查看發送的消息

圖四 Kafka Control Center中查看發送消息
這里通過這個網頁去觀察發送和接收的消息時有時候會存在一定的延時,這里推薦另外一個 Kafka Tool 的工具,通過這個工具能夠比較好的實時監測發送和接收消息,先來看看整個界面,然后我們再來看看到底該怎么配置這個軟件。

圖五 Kafka Tool中查看發送消息
要想使用這個工具,首先第一步就是要配置Cluster端,具體的配置我們看看有哪些東西。

圖六 Kafka Tool中新增Cluster
另外一點需要注意的就是默認接收到的Message都是byte數組,我們這里需要配置ContentType,配置的方法是選中其中的一條消息--》Properties--》ContentType-->Update

圖七 Kafka Tool中配置ContentType
四 消息的接收方(Consumer)
在正確創建消息的發送方后緊接着就是定義消息的接收方了,消息的接收方顧名思義就是消費剛才消息的一方,這里的步驟和發送類似,但是也有很大的不同,消息的消費方核心是一個后台服務,並且在單獨的線程中監聽來自發送方的消息,並進行消費,這里我們先定義一個叫做KafkaConsumerHostedService的基類,我們具體來看看代碼。
/// <summary>
/// Kafka 消費者的后台服務基礎類
/// </summary>
/// <typeparam name="T">事件類型</typeparam>
public abstract class KafkaConsumerHostedService<T> : BackgroundService where T : IIntegrationEvent {
protected readonly IServiceProvider _services;
protected readonly IConfiguration _config;
protected readonly ILogger<KafkaConsumerHostedService<T>> _logger;
public KafkaConsumerHostedService(IServiceProvider services, IConfiguration config, ILogger<KafkaConsumerHostedService<T>> logger) {
_services = services;
_config = config;
_logger = logger;
}
/// <summary>
/// 消費該事件,比如調用 Application Service 持久化數據等
/// </summary>
/// <param name="event">事件內容</param>
protected abstract void DoWork(T @event);
/// <summary>
/// 構造 Kafka 消費者實例,監聽指定 Topic,獲得最新的事件
/// </summary>
/// <param name="stoppingToken">終止標識</param>
/// <returns></returns>
protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
await Task.Factory.StartNew(() => {
var topic = _config.GetValue<string>($"Kafka:Topics:{typeof(T).Name}");
var consumerConfig = new ConsumerConfig {
BootstrapServers = _config.GetValue<string>("Kafka:BootstrapServers"),
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = _config.GetValue<string>("Application:Name"),
EnableAutoCommit = true,
};
var builder = new ConsumerBuilder<string, string>(consumerConfig);
using (var consumer = builder.Build()) {
consumer.Subscribe(topic);
while (!stoppingToken.IsCancellationRequested) {
try {
var result = consumer.Consume(stoppingToken);
var @event = JsonConvert.DeserializeObject<T>(result.Value);
DoWork(@event);
//consumer.StoreOffset(result);
} catch (OperationCanceledException ex) {
consumer.Close();
_logger.LogDebug(ex, "Kafka 消費者結束,退出后台線程");
} catch (AbpValidationException ex) {
_logger.LogError(ex, $"Kafka {GetValidationErrorNarrative(ex)}");
} catch (ConsumeException ex) {
_logger.LogError(ex, "Kafka 消費者產生異常");
} catch (KafkaException ex) {
_logger.LogError(ex, "Kafka 產生異常");
} catch (ValidationException ex) {
_logger.LogError(ex, "Kafka 消息驗證失敗");
} catch (Exception ex) {
_logger.LogError(ex, "Kafka 捕獲意外異常");
}
}
}
}, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
private string GetValidationErrorNarrative(AbpValidationException validationException) {
var detailBuilder = new StringBuilder();
detailBuilder.AppendLine("驗證過程中檢測到以下錯誤");
foreach (var validationResult in validationException.ValidationErrors) {
detailBuilder.AppendFormat(" - {0}", validationResult.ErrorMessage);
detailBuilder.AppendLine();
}
return detailBuilder.ToString();
}
}
這段代碼中我們會創建一個consumer,這里我們會在一個While循環中去訂閱特定Topic消息,這里的BootstrapServers是和發送方保持一致,並且也是在當前應用程序中的appsetting.json中進行配置的,而且這里的consumer.Consume方法是一個阻塞式方法,當發送方發送特定事件后,這里會接收到同樣名稱的Topic的消息,然后將接收到的Json數據進行反序列化,然后交由后面的DoWork方法進行處理。這里還是以之前生成者發送的RepairContractFinished事件為例,這里也需要定義一個RepairContractFinishedEventHandler來處理生產者發送的消息。
public class RepairContractFinishedEventHandler : KafkaConsumerHostedService<RepairContractFinishedEvent> {
public RepairContractFinishedEventHandler(IServiceProvider services,
IConfiguration config, ILogger<KafkaConsumerHostedService<RepairContractFinishedEvent>> logger)
: base(services, config, logger) {
}
/// <summary>
/// 調用 Application Service,新增或更新維修合同及關聯實體
/// </summary>
/// <param name="event">待消費的事件</param>
protected override void DoWork(RepairContractFinishedEvent @event) {
using (var scope = _services.CreateScope()) {
var service = scope.ServiceProvider.GetRequiredService<IRepairContractAppService>();
service.AddOrUpdateRepairContract(@event.RepairContract, @event.RepairContractAdjusts);
}
}
}
這里需要特別注意的是在這里我么也需要定義一個繼承自IIntegrationEvent接口的事件,這里也是定義一種數據結構,並且這里的數據結構和生成者定義的要保持一致,否則消費方在反序列化的時候會丟失不能夠匹配的信息。
public class RepairContractFinishedEvent : IIntegrationEvent {
public RepairContractDto RepairContract { get; set; }
public List<RepairContractAdjustDto> RepairContractAdjusts { get; set; }
public string Key { get; set; }
}
另外在DoWork方法中我們也需要注意代碼也需要用using包裹,從而在消費方消費完后釋放掉當前的應用服務。最后需要注意的就是我們的每一個Handle都是一個后台服務,我們需要在Asp.Net Core的Startup的ConfigureServices進行配置,從而將當前的后台服務添加到Asp.Net Core依賴注入容器中。
/// <summary>
/// 注冊集成事件的處理器
/// </summary>
/// <param name="services"></param>
private void AddIntegrationEventHandlers(IServiceCollection services) {
services.AddHostedService<RepairContractFinishedEventHandler>();
services.AddHostedService<ProductTransferDataEventHandler>();
services.AddHostedService<PartUpdateEventHandler>();
services.AddHostedService<VehicleSoldFinishedEventHandler>();
services.AddHostedService<AddOrUpdateDealerEventHandler>();
services.AddHostedService<AddOrUpdateProductCategoryEventHandler>();
services.AddHostedService<CustomerFinishedEventHandler>();
services.AddHostedService<VehicleSoldUpdateStatusEventHandler>();
services.AddHostedService<AddCustomerEventHandler>();
}
最后我們也看看我們的appsetting.json的配置文件關於kafka的配置。
"Kafka": {
"BootstrapServers": "127.0.0.1:9092",
"MessageTimeoutMs": 5000,
"Topics": {
"RepairContractFinishedEvent": "repair-contract-finished",
"AddOrUpdateProductCategoryEvent": "add-update-product-category",
"AddOrUpdateDealerEvent": "add-update-dealer",
"ClaimApproveEvent": "claim-approve",
"ProductTransferDataEvent": "product-update",
"PartUpdateEvent": "part-update",
"VehicleSoldFinishedEvent": "vehiclesold-finished",
"CustomerFinishedEvent": "customer-update",
"VehicleInformationUpdateStatusEvent": "add-update-vehicle-info",
"AddCustomerEvent": "add-customer"
}
},
這里需要注意的是發送方和接收方必須保證Topic一致,並且配置的服務器名稱端口保持一致,這樣才能夠保證消息的准確發送和接收。最后對於服務端,這里推薦一個VSCode的插件kafka,能夠創建並發送消息,這樣就方便我們來發送我們需要的數據了,這里同樣需要我們先建立一個.kafka的文件,然后配置Kafka服務的地址和端口號。

圖八 利用VSCode Kafka插件發送消息
