RabbitMQ的事件總線
在上文中,我們討論了事件處理器中對象生命周期的問題,在進入新的討論之前,首先讓我們總結一下,我們已經實現了哪些內容。下面的類圖描述了我們已經實現的組件及其之間的關系,貌似系統已經變得越來越復雜了。
其中綠色的部分就是上文中新實現的部分,包括一個簡單的Event Store,一個事件處理器執行上下文的接口,以及一個基於ASP.NET Core依賴注入框架的執行上下文的實現。接下來,我們打算淘汰PassThroughEventBus,然后基於RabbitMQ實現一套新的事件總線。
事件總線的重構
根據前面的結論,事件總線的執行需要依賴於事件處理器執行上下文,也就是上面類圖中PassThroughEventBus對於IEventHandlerExecutionContext的引用。更具體些,是在事件總線訂閱某種類型的事件時,需要將事件處理器注冊到IEventHandlerExecutionContext中。那么在實現RabbitMQ時,也會有着類似的設計需求,即RabbitMQEventBus也需要依賴IEventHandlerExecutionContext接口,以保證事件處理器生命周期的合理性。
為此,我們新建一個基類:BaseEventBus,並將這部分公共的代碼提取出來,需要注意以下幾點:
- 通過BaseEventBus的構造函數傳入IEventHandlerExecutionContext實例,也就限定了所有子類的實現中,必須在構造函數中傳入IEventHandlerExecutionContext實例,這對於框架的設計非常有利:在實現新的事件總線時,框架的使用者無需查看API文檔,即可知道事件總線與IEventHandlerExecutionContext之間的關系,這符合SOLID原則中的Open/Closed Principle
- BaseEventBus的實現應該放在EdaSample.Common程序集中,更確切地說,它應該放在EdaSample.Common.Events命名空間下,因為它是屬於框架級別的組件,並且不會依賴任何基礎結構層的組件
BaseEventBus的代碼如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
public
abstract
class
BaseEventBus : IEventBus
{
protected
readonly
IEventHandlerExecutionContext eventHandlerExecutionContext;
protected
BaseEventBus(IEventHandlerExecutionContext eventHandlerExecutionContext)
{
this
.eventHandlerExecutionContext = eventHandlerExecutionContext;
}
public
abstract
Task PublishAsync<TEvent>(TEvent @
event
, CancellationToken cancellationToken =
default
)
where
TEvent : IEvent;
public
abstract
void
Subscribe<TEvent, TEventHandler>()
where
TEvent : IEvent
where
TEventHandler : IEventHandler<TEvent>;
// Disposable接口實現代碼省略
}
|
在上面的代碼中,PublishAsync和Subscribe方法是抽象方法,以便子類根據不同的需要來實現。
接下來就是調整PassThroughEventBus,使其繼承於BaseEventBus:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
public
sealed
class
PassThroughEventBus : BaseEventBus
{
private
readonly
EventQueue eventQueue =
new
EventQueue();
private
readonly
ILogger logger;
public
PassThroughEventBus(IEventHandlerExecutionContext context,
ILogger<PassThroughEventBus> logger)
:
base
(context)
{
this
.logger = logger;
logger.LogInformation($
"PassThroughEventBus構造函數調用完成。Hash Code:{this.GetHashCode()}."
);
eventQueue.EventPushed += EventQueue_EventPushed;
}
private
async
void
EventQueue_EventPushed(
object
sender, EventProcessedEventArgs e)
=> await
this
.eventHandlerExecutionContext.HandleEventAsync(e.Event);
public
override
Task PublishAsync<TEvent>(TEvent @
event
, CancellationToken cancellationToken =
default
)
{
return
Task.Factory.StartNew(() => eventQueue.Push(@
event
));
}
public
override
void
Subscribe<TEvent, TEventHandler>()
{
if
(!
this
.eventHandlerExecutionContext.HandlerRegistered<TEvent, TEventHandler>())
{
this
.eventHandlerExecutionContext.RegisterHandler<TEvent, TEventHandler>();
}
}
// Disposable接口實現代碼省略
}
|
代碼都很簡單,也就不多做說明了,接下來,我們開始實現RabbitMQEventBus。
RabbitMQEventBus的實現
首先需要新建一個.NET Standard 2.0的項目,使用.NET Standard 2.0的項目模板所創建的項目,可以同時被.NET Framework 4.6.1或者.NET Core 2.0的應用程序所引用。創建新的類庫項目的目的,是因為RabbitMQEventBus的實現需要依賴RabbitMQ C#開發庫這個外部引用。因此,為了保證框架核心的純凈和穩定,需要在新的類庫項目中實現RabbitMQEventBus。
Note:對於RabbitMQ及其C#庫的介紹,本文就不再涉及了,網上有很多資料和文檔,博客園有很多朋友在這方面都有使用經驗分享,RabbitMQ官方文檔也寫得非常詳細,當然是英文版的,如果英語比較好的話,建議參考官方文檔。
以下就是在EdaSample案例中,RabbitMQEventBus的實現,我們先讀一讀代碼,再對這部分代碼做些分析。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
public
class
RabbitMQEventBus : BaseEventBus
{
private
readonly
IConnectionFactory connectionFactory;
private
readonly
IConnection connection;
private
readonly
IModel channel;
private
readonly
string
exchangeName;
private
readonly
string
exchangeType;
private
readonly
string
queueName;
private
readonly
bool
autoAck;
private
readonly
ILogger logger;
private
bool
disposed;
public
RabbitMQEventBus(IConnectionFactory connectionFactory,
ILogger<RabbitMQEventBus> logger,
IEventHandlerExecutionContext context,
string
exchangeName,
string
exchangeType = ExchangeType.Fanout,
string
queueName =
null
,
bool
autoAck =
false
)
:
base
(context)
{
this
.connectionFactory = connectionFactory;
this
.logger = logger;
this
.connection =
this
.connectionFactory.CreateConnection();
this
.channel =
this
.connection.CreateModel();
this
.exchangeType = exchangeType;
this
.exchangeName = exchangeName;
this
.autoAck = autoAck;
this
.channel.ExchangeDeclare(
this
.exchangeName,
this
.exchangeType);
this
.queueName =
this
.InitializeEventConsumer(queueName);
logger.LogInformation($
"RabbitMQEventBus構造函數調用完成。Hash Code:{this.GetHashCode()}."
);
}
public
override
Task PublishAsync<TEvent>(TEvent @
event
, CancellationToken cancellationToken =
default
(CancellationToken))
{
var
json = JsonConvert.SerializeObject(@
event
,
new
JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All });
var
eventBody = Encoding.UTF8.GetBytes(json);
channel.BasicPublish(
this
.exchangeName,
@
event
.GetType().FullName,
null
,
eventBody);
return
Task.CompletedTask;
}
public
override
void
Subscribe<TEvent, TEventHandler>()
{
if
(!
this
.eventHandlerExecutionContext.HandlerRegistered<TEvent, TEventHandler>())
{
this
.eventHandlerExecutionContext.RegisterHandler<TEvent, TEventHandler>();
this
.channel.QueueBind(
this
.queueName,
this
.exchangeName,
typeof
(TEvent).FullName);
}
}
protected
override
void
Dispose(
bool
disposing)
{
if
(!disposed)
{
if
(disposing)
{
this
.channel.Dispose();
this
.connection.Dispose();
logger.LogInformation($
"RabbitMQEventBus已經被Dispose。Hash Code:{this.GetHashCode()}."
);
}
disposed =
true
;
base
.Dispose(disposing);
}
}
private
string
InitializeEventConsumer(
string
queue)
{
var
localQueueName = queue;
if
(
string
.IsNullOrEmpty(localQueueName))
{
localQueueName =
this
.channel.QueueDeclare().QueueName;
}
else
{
this
.channel.QueueDeclare(localQueueName,
true
,
false
,
false
,
null
);
}
var
consumer =
new
EventingBasicConsumer(
this
.channel);
consumer.Received += async (model, eventArgument) =>
{
var
eventBody = eventArgument.Body;
var
json = Encoding.UTF8.GetString(eventBody);
var
@
event
= (IEvent)JsonConvert.DeserializeObject(json,
new
JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All });
await
this
.eventHandlerExecutionContext.HandleEventAsync(@
event
);
if
(!autoAck)
{
channel.BasicAck(eventArgument.DeliveryTag,
false
);
}
};
this
.channel.BasicConsume(localQueueName, autoAck:
this
.autoAck, consumer: consumer);
return
localQueueName;
}
}
|
閱讀上面的代碼,需要注意以下幾點:
- 正如上面所述,構造函數需要接受IEventHandlerExecutionContext對象,並通過構造函數的base調用,將該對象傳遞給基類
- 構造函數中,queueName參數是可選參數,也就是說:
- 如果通過RabbitMQEventBus發送事件消息,則無需指定queueName參數,僅需指定exchangeName即可,因為在RabbitMQ中,消息的發布方無需知道消息是發送到哪個隊列中
- 如果通過RabbitMQEventBus接收事件消息,那么也分兩種情況:
- 如果兩個進程在使用RabbitMQEventBus時,同時指定了queueName參數,並且queueName的值相同,那么這兩個進程將會輪流處理路由至queueName隊列的消息
- 如果兩個進程在使用RabbitMQEventBus時,同時指定了queueName參數,但queueName的值不相同,或者都沒有指定queueName參數,那么這兩個進程將會同時處理路由至queueName隊列的消息
- 有關Exchange和Queue的概念,請參考RabbitMQ的官方文檔
- 在Subscribe方法中,除了將事件處理器注冊到事件處理器執行上下文之外,還通過QueueBind方法,將指定的隊列綁定到Exchange上
- 事件數據都通過Newtonsoft.Json進行序列化和反序列化,使用TypeNameHandling.All這一設定,使得序列化的JSON字符串中帶有類型名稱信息。在此處這樣做既是合理的,又是必須的,因為如果沒有帶上類型名稱的信息,JsonConvert.DeserializeObject反序列化時,將無法判定得到的對象是否可以轉換為IEvent對象,這樣就會出現異常。但如果是實現一個更為通用的消息系統,應用程序派發出去的事件消息可能還會被由Python或者Java所實現的應用程序所使用,那么對於這些應用,它們並不知道Newtonsoft.Json是什么,也無法通過Newtonsoft.Json加入的類型名稱來獲知事件消息的初衷(Intent),Newtonsoft.Json所帶的類型信息又會顯得冗余。因此,簡單地使用Newtonsoft.Json作為事件消息的序列化、反序列化工具,其實是欠妥的。更好的做法是,實現自定義的消息序列化、反序列化器,在進行序列化的時候,將.NET相關的諸如類型信息等,作為Metadata(元數據)附着在序列化的內容上。理論上說,在序列化的數據中加上一些元數據信息是合理的,只不過我們對這些元數據做一些標注,表明它是由.NET框架產生的,第三方系統如果不關心這些信息,可以對元數據不做任何處理
- 在Dispose方法中,注意將RabbitMQ所使用的資源dispose掉
使用RabbitMQEventBus
在Customer服務中,使用RabbitMQEventBus就非常簡單了,只需要引用RabbitMQEventBus的程序集,然后在Startup.cs文件的ConfigureServices方法中,替換PassThroughEventBus的使用即可:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
public
void
ConfigureServices(IServiceCollection services)
{
this
.logger.LogInformation(
"正在對服務進行配置..."
);
services.AddMvc();
services.AddTransient<IEventStore>(serviceProvider =>
new
DapperEventStore(Configuration[
"mssql:connectionString"
],
serviceProvider.GetRequiredService<ILogger<DapperEventStore>>()));
var
eventHandlerExecutionContext =
new
EventHandlerExecutionContext(services,
sc => sc.BuildServiceProvider());
services.AddSingleton<IEventHandlerExecutionContext>(eventHandlerExecutionContext);
// services.AddSingleton<IEventBus, PassThroughEventBus>();
var
connectionFactory =
new
ConnectionFactory { HostName =
"localhost"
};
services.AddSingleton<IEventBus>(sp =>
new
RabbitMQEventBus(connectionFactory,
sp.GetRequiredService<ILogger<RabbitMQEventBus>>(),
sp.GetRequiredService<IEventHandlerExecutionContext>(),
RMQ_EXCHANGE,
queueName: RMQ_QUEUE));
this
.logger.LogInformation(
"服務配置完成,已注冊到IoC容器!"
);
}
|
Note:一種更好的做法是通過配置文件來配置IoC容器,在曾經的Microsoft Patterns and Practices Enterprise Library Unity Container中,使用配置文件是很方便的。這樣只需要Customer服務能夠通過配置文件來配置IoC容器,同時只需要讓Customer服務依賴(注意,不是程序集引用)於不同的事件總線的實現即可,無需對Customer服務重新編譯。
下面來驗證一下效果。首先確保RabbitMQ已經配置並啟動妥當,我是安裝在本地機器上,使用默認安裝。首先啟動ASP.NET Core Web API,然后通過Powershell發起兩次創建Customer的請求:
查看一下數據庫是否更新正常:
並檢查一下日志信息:
RabbitMQ中Exchange的信息:
總結
本文提供了一種RabbitMQEventBus的實現,目前來說是夠用的,而且這種實現是可以使用在實際項目當中的。在實際使用中,或許也會碰到一些與RabbitMQ本身有關的問題,這就需要具體問題具體分析了。此外,本文沒有涉及事件消息丟失、重發然后保證最終一致性的問題,這些內容會在后面討論。從下文開始,我們着手逐步實現CQRS架構的領域事件和事件存儲部分。
源代碼的使用
本系列文章的源代碼在https://github.com/daxnet/edasample這個Github Repo里,通過不同的release tag來區分針對不同章節的源代碼。本文的源代碼請參考chapter_3這個tag,如下:
歡迎訪問我的博客新站:http://sunnycoding.net。