在上一篇中我們主要介紹如何在Asp.Net Core中同步Kafka消息,通過上一篇的操作我們發現上面一篇中介紹的只能夠進行簡單的首發kafka消息並不能夠消息重發、重復消費、樂觀鎖沖突等問題,這些問題在實際的生產環境中是非常要命的,如果在消息的消費方沒有做好必須的冪等性操作,那么消費者重復消費的問題會比較嚴重的,另外對於消息的生產者來說,記錄日志的方式也不是足夠友好,很多時候在后台監控程序中我們需要知道記錄更多的關於消息的分區、偏移等更多的消息。而在消費者這邊我們更多的需要去解決發送方發送重復消息,以及面對樂觀鎖沖突的時候該怎么解決這些問題,當然代碼中的這些方案都是我們在實際生產中摸索出來的一些方案,當然這些都是需要后續進行進一步優化的,這里我們將分別就生產者和消費者中出現的問題來進行分析和說明。

圖一 消費者方幾乎同一時刻接收到兩條同樣的Kafka消息(Grafana監控)
一 生產者方
using System;
using System.ComponentModel.DataAnnotations;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using Abp.Dependency;
using Confluent.Kafka;
using JetBrains.Annotations;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Sunlight.Kafka.Abstractions;
[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")]
namespace Sunlight.Kafka {
/// <summary>
/// Kafka 生產者的 Domain Service
/// </summary>
public class KafkaProducer : ISingletonDependency, IDisposableDependencyObjectWrapper<IProducer<string, string>>, IMessageProducer {
private readonly IConfiguration _config;
private readonly ILogger<KafkaProducer> _logger;
private readonly IProducer<string, string> _producer;
/// <summary>
/// 構造 <see cref="KafkaProducer"/>
/// </summary>
/// <param name="config"></param>
/// <param name="logger"></param>
public KafkaProducer(IConfiguration config,
ILogger<KafkaProducer> logger) {
_config = config;
_logger = logger;
var producerConfig = new ProducerConfig {
BootstrapServers = _config.GetValue<string>("Kafka:BootstrapServers"),
MessageTimeoutMs = _config.GetValue<int>("Kafka:MessageTimeoutMs")
};
var builder = new ProducerBuilder<string, string>(producerConfig);
_producer = builder.Build();
Object = _producer;
}
/// <summary>
/// 發送事件
/// </summary>
/// <param name="event"></param>
public void Produce(IIntegrationEvent @event) {
ProduceAsync(@event).GetAwaiter().GetResult();
}
/// <summary>
/// 發送事件
/// </summary>
/// <param name="event"></param>
public async Task ProduceAsync(IIntegrationEvent @event) {
await ProduceAsync(@event, @event.GetType().Name);
}
/// <inheritdoc />
public async Task ProduceAsync(IIntegrationEvent @event, [NotNull] string eventName) {
if (string.IsNullOrEmpty(eventName)) {
throw new ArgumentNullException(nameof(eventName));
}
var topic = _config.GetValue<string>($"Kafka:Topics:{eventName}");
if (string.IsNullOrEmpty(topic)) {
throw new NullReferenceException("topic不能為空");
}
var key = Guid.NewGuid().ToString();
try {
var json = JsonConvert.SerializeObject(@event);
var dr = await _producer.ProduceAsync(topic, new Message<string, string> { Key = key, Value = json });
_logger.LogInformation($"成功發送消息 {dr.Key}.{ @event.Key}, offSet: {dr.TopicPartitionOffset}");
} catch (ProduceException<string, string> ex) {
_logger.LogError(ex, $"發送失敗 {topic}.{key}.{ @event.Key}, 原因 {ex.Error.Reason} ");
throw new ValidationException("當前服務器繁忙,請稍后再嘗試");
}
}
/// <summary>
/// 釋放方法
/// </summary>
public void Dispose() {
_producer?.Dispose();
}
/// <summary>
/// 要釋放的對象
/// </summary>
public IProducer<string, string> Object { get; }
}
}
在這里我們來看看IMessageProducer接口定義
using System.Threading.Tasks;
using Sunlight.Kafka.Abstractions;
namespace Sunlight.Kafka
{
/// <summary>
/// 消息的生產者
/// </summary>
public interface IMessageProducer
{
/// <summary>
/// 發送事件
/// </summary>
/// <param name="event"></param>
void Produce(IIntegrationEvent @event);
/// <summary>
/// 發送事件
/// </summary>
/// <param name="event"></param>
Task ProduceAsync(IIntegrationEvent @event);
/// <summary>
/// 發送事件
/// </summary>
/// <param name="event"></param>
/// <param name="eventName">指定事件的名稱</param>
/// <returns></returns>
Task ProduceAsync(IIntegrationEvent @event, string eventName);
}
}
在接口中我們分別定義了消息發送的同步和異步及重載方法,另外我們還繼承了ABP中的IDisposableDependencyObjectWrapper接口,關於這個接口我們來看一下接口的聲明和定義(想了解更多的關於ABP的知識,也可點擊這里關注本人之前的博客)。
using System;
namespace Abp.Dependency
{
/// <summary>
/// This interface is used to wrap an object that is resolved from IOC container.
/// It inherits <see cref="IDisposable"/>, so resolved object can be easily released.
/// In <see cref="IDisposable.Dispose"/> method, <see cref="IIocResolver.Release"/> is called to dispose the object.
/// This is non-generic version of <see cref="IDisposableDependencyObjectWrapper{T}"/> interface.
/// </summary>
public interface IDisposableDependencyObjectWrapper : IDisposableDependencyObjectWrapper<object>
{
}
}
如果想了解關於這個接口更多的信息,請點擊這里。
另外在實際發送消息的時候,我們需要記錄消息的具體Partition以及Offset這樣我們就能夠快速找到這條消息,從而方便后面的重試,另外有時候由於服務器的網絡問題的時候可能拋出MessageTimeout的消息,這個時候我們需要通過Confluent.Kafka庫中的ProduceException異常來捕獲這些信息記錄拋出異常信息,另外在我們的業務層需要給出一個“當前服務器繁忙,請稍后再嘗試”這樣一個友好的提示信息。
另外在發送消息的時候,每一次都會產生一個Guid類型的Key發送到消息的消費方,這個Key將會作為接收消息的實體KafkaReceivedMessage 的主鍵Id,這個會在后文有具體的解釋。
二 消費者方
在我們這篇文章記錄的重點就是消費方,因為這里我們需要解決諸如消息重復消費以及樂觀鎖沖突的一系列問題,后面我們將會就這些問題來一一進行講解和說明。
2.1 如何解決消息重復消費
在這里我們通過KafkaReceivedMessage這樣一個實體來在數據庫中記錄收到的消息,並且在發送方每次發送時候傳遞唯一的一個Guid,這樣我們就簡單利用每次插入消息時主鍵Id不允許重復來處理重復發送的同一條消息的問題,我們首先來看看這個實體。
/// <summary>
/// Kafka消費者收到的消息記錄
/// </summary>
public class KafkaReceivedMessage : Entity<Guid> {
/// <summary>
/// 消費者組
/// </summary>
[MaxLength(50)]
[Required]
public string Group { get; set; }
/// <summary>
/// 消息主題
/// </summary>
[MaxLength(100)]
[Required]
public string Topic { get; set; }
/// <summary>
/// 消息編號, 用於記錄日志, 便於區分, 建議用編號
/// </summary>
[MaxLength(50)]
public string Code { get; set; }
/// <summary>
/// 消息內容
/// </summary>
[MaxLength(int.MaxValue)]
public string Content { get; set; }
/// <summary>
/// kafka 中的 partition
/// </summary>
public int? Partition { get; set; }
/// <summary>
/// kafka 中的 offset
/// </summary>
[MaxLength(100)]
[Required]
public string Offset { get; set; }
/// <summary>
/// 接受時間
/// </summary>
public DateTime ReceivedTime { get; set; }
/// <summary>
/// 過期時間
/// </summary>
public DateTime? ExpiresAt { get; set; }
/// <summary>
/// 重試次數
/// </summary>
public int Retries { get; set; }
/// <summary>
/// 不是用Guid做全局唯一約束的消息
/// </summary>
public bool Old { get; set; }
/// <inheritdoc />
public override string ToString() {
return $"{Group}.{Topic}.{Id}.{Code},{Partition}:{Offset}";
}
有了這個實體,我們在接收到這條消息的時候我們首先會嘗試將這條消息存入到數據庫,如果存入成功就說明不是重復消息,如果存入失敗,就記錄Kafka收到重復消息,我們先來看一下具體的實現。
using System;
using System.ComponentModel.DataAnnotations;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Data.SqlClient;
using Abp.Domain.Uow;
using Abp.Runtime.Validation;
using Confluent.Kafka;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Sunlight.Kafka.Abstractions;
using Sunlight.Kafka.Models;
namespace Sunlight.Kafka {
/// <summary>
/// Kafka 消費者的后台服務基礎類
/// </summary>
/// <typeparam name="T">事件類型</typeparam>
public abstract class KafkaConsumerHostedService<T> : BackgroundService where T : IIntegrationEvent {
/// <summary>
/// IOC服務提供方
/// </summary>
protected IServiceProvider Services { get; }
/// <summary>
/// 配置文件
/// </summary>
protected IConfiguration Config { get; }
/// <summary>
/// 主題
/// </summary>
protected string Topic { get; }
/// <summary>
/// 日志
/// </summary>
protected ILogger<KafkaConsumerHostedService<T>> Logger { get; }
/// <summary>
/// DbContext的類型, 必須是業務中實際的類型
/// </summary>
protected Type DbContextType { get; }
/// <summary>
/// 消費者的配置
/// </summary>
protected ConsumerConfig ConsumerConfig { get; }
/// <summary>
/// 保存失敗時的重復次數, 一般用於 DbUpdateConcurrencyException
/// </summary>
protected int SaveDataRetries { get; }
/// <summary>
/// 構造 <see cref="KafkaConsumerHostedService{T}"/>
/// </summary>
/// <param name="services"></param>
/// <param name="config"></param>
/// <param name="logger"></param>
/// <param name="dbContext">DbContext的類型, 必須是業務中實際的類型</param>
protected KafkaConsumerHostedService(IServiceProvider services,
IConfiguration config,
ILogger<KafkaConsumerHostedService<T>> logger, DbContext dbContext) {
Services = services;
Config = config;
Logger = logger;
DbContextType = dbContext.GetType();
Topic = Config.GetValue<string>($"Kafka:Topics:{typeof(T).Name}");
if (string.IsNullOrWhiteSpace(Topic)) {
Logger.LogCritical($"未能找到{typeof(T).Name}所對應的Topic");
Environment.Exit(0);
}
const int MaxRetries = 5;
const int DefaultRetries = 2;
SaveDataRetries = Config.GetValue<int?>("Kafka:SaveDataRetries") ?? DefaultRetries;
SaveDataRetries = Math.Min(SaveDataRetries, MaxRetries);
ConsumerConfig = new ConsumerConfig {
BootstrapServers = Config.GetValue<string>("Kafka:BootstrapServers"),
AutoOffsetReset = AutoOffsetReset.Earliest,
GroupId = Config.GetValue<string>("Application:Name"),
EnableAutoCommit = true
};
}
/// <summary>
/// 消費該事件,比如調用 Application Service 持久化數據等
/// </summary>
/// <param name="event">事件內容</param>
protected abstract void DoWork(T @event);
/// <summary>
/// 保存收到的消息到數據庫, 防止重復消費
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
private async Task<bool> SaveMessageAsync(KafkaReceivedMessage message) {
using var scope = Services.CreateScope();
var service = (DbContext)scope.ServiceProvider.GetRequiredService(DbContextType);
service.Set<KafkaReceivedMessage>().Add(message);
try {
await service.SaveChangesAsync();
Logger.LogInformation($"Kafka 收到消息 {message}");
return true;
} catch (DbUpdateException ex) when (ex.InnerException?.Message.Contains("PRIMARY KEY") == true) {
Logger.LogError($"Kafka 收到重復消息 {message}");
} finally {
service.Entry(message).State = EntityState.Detached;
}
return false;
}
/// <summary>
/// 反序列化消息
/// </summary>
/// <param name="result"></param>
/// <param name="message"></param>
/// <returns></returns>
protected virtual async Task<T> DeserializeEvent(ConsumeResult<string, string> result, KafkaReceivedMessage message) {
T @event;
try {
@event = JsonConvert.DeserializeObject<T>(result.Value);
} catch (Exception e) when(e is JsonReaderException || e is JsonSerializationException || e is JsonException) {
@event = default;
if (!await SaveMessageAsync(message))
Logger.LogError(e, ErrorMessageTemp, message, e.InnerException?.Message ?? e.Message);
}
if (Guid.TryParse(result.Key, out var key) && result.Key != @event?.Key) {
message.Code = @event?.Key;
message.Id = key;
} else {
message.Id = Guid.NewGuid();
message.Code = result.Key;
message.Old = true;
}
return await SaveMessageAsync(message) ? @event : default;
}
private async Task TryDoWork(T @event, KafkaReceivedMessage message, int saveRetries) {
if (saveRetries <= 0) {
Logger.LogError(ErrorMessageTemp, message, "樂觀鎖沖突");
return;
}
try {
DoWork(@event);
// 在遇到 樂觀鎖沖突的時候, 需要重試幾次, 因為這很容易就發生了.
} catch (DbUpdateConcurrencyException) {
#pragma warning disable SCS0005 // Weak random generator
// 這樣在收到重復消息的時候, 能降低沖突的概率
await Task.Delay(new Random(DateTime.Now.Millisecond).Next(10,100));
await TryDoWork(@event, message, --saveRetries);
} catch (AbpDbConcurrencyException) {
await Task.Delay(new Random(DateTime.Now.Millisecond).Next(10,100));
await TryDoWork(@event, message, --saveRetries);
}
#pragma warning restore SCS0005 // Weak random generator
}
const string ErrorMessageTemp = "Kafka 消息 {0} 消費失敗, 原因: {1}";
/// <summary>
/// 構造 Kafka 消費者實例,監聽指定 Topic,獲得最新的事件
/// </summary>
/// <param name="stoppingToken">終止標識</param>
/// <returns></returns>
protected override async Task ExecuteAsync(CancellationToken stoppingToken) {
await Task.Factory.StartNew(async () => {
var builder = new ConsumerBuilder<string, string>(ConsumerConfig);
using var consumer = builder.Build();
consumer.Subscribe(Topic);
//當前事件的Key
Logger.LogInformation($"Kafka 消費者訂閱 {Topic}");
while (!stoppingToken.IsCancellationRequested) {
try {
var result = consumer.Consume(stoppingToken);
//包含分區和OffSet的詳細信息
var message = new KafkaReceivedMessage {
Group = ConsumerConfig.GroupId,
Topic = result.Topic,
Content = result.Value,
Partition = result.Partition,
Offset = result.Offset.ToString(),
ReceivedTime = DateTime.Now
};
try {
var @event = await DeserializeEvent(result, message);
if (@event == null)
continue;
TryDoWork(@event, message, SaveDataRetries);
} catch (ValidationException ex) {
Logger.LogError(ex, ErrorMessageTemp, message, ex.InnerException?.Message ?? ex.Message);
} catch (AbpValidationException ex) {
Logger.LogError(ex, ErrorMessageTemp, message, GetValidationErrorNarrative(ex));
} catch (SqlException ex) {
Logger.LogError(ex, ErrorMessageTemp, message, ex.InnerException?.Message ?? ex.Message);
} catch (Exception ex) {
Logger.LogError(ex, ErrorMessageTemp, message, ex.InnerException?.Message ?? ex.Message);
}
} catch (OperationCanceledException ex) {
consumer.Close();
Logger.LogInformation(ex, "Kafka 消費者結束,退出后台線程");
} catch (ConsumeException ex) {
Logger.LogError(ex, "Kafka 消費者產生異常,");
} catch (KafkaException ex) {
Logger.LogError(ex, "Kafka 產生異常,");
}
}
}, stoppingToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
private static string GetValidationErrorNarrative(AbpValidationException validationException) {
var detailBuilder = new StringBuilder();
detailBuilder.AppendLine("驗證過程中檢測到以下錯誤");
foreach (var validationResult in validationException.ValidationErrors) {
detailBuilder.AppendFormat(" - {0}", validationResult.ErrorMessage);
detailBuilder.AppendLine();
}
return detailBuilder.ToString();
}
}
}
這里我們通過SaveMessageAsync這個異步方法來保存數據到數據庫,檢測的時候我們通過捕獲InnerException里面的Message中是否包含"PRIMARY KEY"來判斷是不是主鍵沖突的。
3.2 樂觀鎖沖突校驗
在做了第一步消息重復消費校驗后,我們需要利用數據庫中的DbUpdateConcurrencyException來捕獲樂觀鎖的沖突,因為我們的業務處理都是通過繼承KafkaConsumerHostedService這個基類,然后重載里面的DoWork方法來實現對業務代碼的調用的,當然由於Kafka消息的異步特性,所以不可避免多個消息同時修改同一個實體,而由於這些異步消息產生的DbUpdateConcurrencyException就不可避免,在這里我們采用的默認次數是2次,最多可以重試5次的機制,通過這種方式來保證樂觀鎖沖突,如果5次重試還是失敗則會提示樂觀鎖沖突,並且日志記錄當前錯誤內容,通過這種方式能夠在一定程度上減少由於並發問題導致的消費者消費失敗的概率,當然關於這方面的探索還在隨着業務的不斷深入而不斷去優化,期待后續的持續關注。
3.3 異常的捕獲與處理
在我們的接收到消息以后會產生各種異常,如果處理這些異常也是非常重要的,當然根據這些異常的級別分別記錄不同級別的日志是非常重要的,這里僅選擇一種AbpValidationException這一種特例來進行說明,如果你對ABP中的AbpValidationException還不是很熟悉的話,請先閱讀這篇文章。
private static string GetValidationErrorNarrative(AbpValidationException validationException) {
var detailBuilder = new StringBuilder();
detailBuilder.AppendLine("驗證過程中檢測到以下錯誤");
foreach (var validationResult in validationException.ValidationErrors) {
detailBuilder.AppendFormat(" - {0}", validationResult.ErrorMessage);
detailBuilder.AppendLine();
}
return detailBuilder.ToString();
}
由於在這里ABP中ValidationException中ValidationErrors會記錄一組之前驗證的錯誤信息,所以這里需要特別注意,這里在閱讀的時候需要特別注意。
