http://www.cnblogs.com/Jusfr/p/5256791.html
事件總線(EventBus)及其演進過程必須提到內存模型、傳統的隊列模型、發布-訂閱模型。
- 內存模型:進程內模型,事件總線(EventBus)在內部遍歷消費者(Consumer)列表傳遞數據;
- 隊列模型:消息或事件持久化到傳統消息隊列(Queue)即返回,以實時性降低換取吞吐能力提升;
- 發布-訂閱模型:事件源(EventSource)得到強化,出現如分布式、持久化、消費復制/分區等特性;
文中使用了“術語(單詞)”的形式引入概念,用詞可能有差異,只是力求表義清楚,下文描述將直接使用單詞。
內存模型
內存模型可以很好地解耦,舉例來說,版本初期我們有 IUserService 負責用戶創建,邏輯如下:
interface IUserService { void CreateNewUser(String name); } class UserService1 : IUserService { public void CreateNewUser(String name) { Console.WriteLine("User \"{0}\" created", name); } }
現在希望在用戶創建后,進行一次消息服務調用,發送歡迎辭。為了解決這個需求,需要添加和實現新的 MessageService , 並添加依賴,在 CreateNewUser() 方法某入插入調用邏輯,於是代碼變這樣:
interface IMessageService { void NotifyWelcome(User user); } class UserService2 : IUserService { private readonly IMessageService _messageService; public UserService2(IMessageService messageService) { _messageService = messageService; } public void CreateNewUser(String name) { var user = new User { Name = name }; Console.WriteLine("User \"{0}\" created", user.Name); _messageService.NotifyWelcome(user); //添加消息服務調用 } }
目前看起來好像沒啥問題,因為代碼簡單,但是當邏輯越來越復雜時情況就變得不一樣了,比如我們希望用戶創建后將數據寫入索引,需要依賴 ISearchService;比如希望調用報表服務 IReportService 添加每日新增用戶數;
public void CreateNewUser(String name) { var user = new User { Name = name }; Console.WriteLine("User \"{0}\" created", user.Name); _messageService.NotifyWelcome(user); //添加消息服務調用 _searchService.SaveIndex(user) //添加搜索服務調用 _reportService.CounterNewUser(user); //添加報表服務調用; }
如此多的依賴實在時重負難堪,當然你可以說這些應該異步處理、應該放到后端隊列,沒錯。現實中需要同步處理的邏輯並不少見,而規模尚小時引入隊列將帶來額外的開發測試、部署監控成本。使用 EventBus 的內存模型可以比較優雅地處理此問題,以下是實現思路。
場景和實現思路
引入 EventBus 作為共同依賴,IUserService 視為生產者,IMessageService 視為對用戶創建事件感興趣的 Consumer ,其消費邏輯調用 NotifyWelcome() 方法。EventBus 內部維護了一份 EventType-Consumer 列表,遍歷列表分發 Event 實例;ISearchService 、IReportService 等類似,同樣注冊到 EventBus 內即可。
abstract class Event { } interface IConsumer { void Proceed(Event @event); } class EventBus { private readonly HashSet<IConsumer> _consumers = new HashSet<IConsumer>(); //... 更多細節 public void Publish(Event @event) { foreach (var consumer in _consumers) { consumer.Proceed(@event); } } } class UserService3 : IUserService { private readonly EventBus _eventBus; public UserService3(EventBus eventBus) { _eventBus = eventBus; } public void CreateNewUser(String name) { var user = new User { Name = name }; Console.WriteLine("User \"{0}\" created", user.Name); var @event = ... //創建消息 _eventBus.Publish(@event); //交由 EventBus 發布 } }
在此過程中,Consumer 並不知道誰創建了 Event,不同的 Producer 對各 Consumer 的依賴統一變更為對 EventBus 的依賴,內存模型達到了解耦目的。
隊列模型
在內存模型的場景中,我們確認這些業務需要由異步進程處理。從 MSMQ 到各種第3方實現方案眾多,但真實業務中 while(true) 循環有太多問題,比較棘手的像
- 異常處理:消息處理中發生異常,但短時間內重試可能解決不了問題;
- 多消費者:大家都有消費程序,可能監聽相同隊列;
對於異常,常規做法是使用監聽時間依次延長的多個異常隊列,定時檢查並出隊處理;
多消費者麻煩一點,由於傳統隊列出隊即消息的特性,這意味着要么數據寫多份大家各自消費,要么消費者集中管理遍歷調用。
- 異常隊列誰來監聽和分發?
- 如果數據寫多份,生產者如何得知消費者數量?寫入性能損失怎樣?動態添加消費者時怎么辦?消費者又如何路由到自己的隊列上?
- 如果數據寫一份,消費者同步調用還是異步調用?等待所有的消費邏輯完成既可能存在短板,某消費者出現異常時又如何進行進度區分?
發布-訂閱模型及各 EventSource 的諸多特性提供了解決思路。
發布-訂閱模型
本文是 Kafka 系列文章之一,故使用 Kafka 作為 EventSource 描述和參考,其他隊列並未過多涉及請有限參考。
隊列模型雖然存在許多問題,但應用與業務規模並不龐大時仍可一用。我們可以使用宿主代為監聽列隊和消息分發、插件式寄宿消費程序,使消費者可以專注於業務;由於消費者短板效應無法避免,可以在業務層面妥協,盡量聚合高效、有限的消費者等等。
在應用與業務繼續擴展時,發布訂閱模型的事件總線變得不可或缺,甚至流式處理框架也不可避免地提上日程,使用 Kafka 對前文問題作出解答。
- Kafka 基於文件系統,消息移除是基於時間和磁盤的策略,並不會輕易丟失數據,消費者出現異常也不用擔心;
- Kafka 將 Consumer 的當前位置的管理職責交由消費者負責,只是提供了可選的 OffsetCommit 和 OffsetFetch API,這帶來了極大的便利性和一定的復雜度;你可以從任何位置開始消費,也沒有重復消費限制,附加的是需要合適的 Offset 策略;
- Kafka 提供了 Topic Partition + Consumer Group 並定義了發布-訂閱語義,可以配合堵塞式 API 保障消息處理的低延遲。
關於推與拉
Kafka 遵循傳統的 Pull 模式,由消費者決定數據流速,畢竟寫入速率遠高於消費的情況下,消費者實際是處於過載狀態。個人的理解的推拉(Push/Pull 或 Publish/Subscribe)並不是主要差異而只是受制於事件源(EventSource)的實現細節。
關於 Chuye.Kafka
Chuye.Kafka 是 Kafka 0.9版本 API 的 .NET 實現,其 Consumer、Producer 是 low levl API 的輕度封裝,使用它實現 EventBus 並沒有過多障礙,消費者分組管理、狀態監控和異常策略才是重點。