系統重構解耦的過程涉及不同領域服務分拆,或同一服務下實時響應部分和非響應部分分拆,分解后的各部分通過異步消息的流轉傳遞,完成整體的業務邏輯,但是頻繁的在業務層面直接調用不同消息隊列的SDK,個人感覺不夠簡潔,最近開源一個中間件OSS.Dataflow,希望能幫到看到的同學。
OSS.Dataflow主要實現異步消息傳遞的過程抽象,在業務層面提供消息發布訂閱的統一抽象接口,在業務邏輯分支之間,以簡單的調用完成消息的傳遞,和具體的消息存儲觸發實現無關。同時,在底層的存儲和觸發層面提取接口,能夠在系統的全局適配具體的消息基礎設施。(在這些接口之上,還實現了事件處理器,通過消息的重復投放,實現事件執行的容錯補充機制,這個后邊文章再介紹,源代碼單元測試有示例。)
一. 消息業務側使用
OSS.Dataflow 的代碼可以通過Gitee和GitHub獲取,使用時可以通過Nuget直接安裝,也可以通過命令行:Install-Package OSS.DataFlow
組件的使用非常簡單,只需要關注:
- 消息發布者接口,由組件注冊時返回,供業務方法調用傳入消息體。
- 消息訂閱(消費)者接口實現或委托方法,在組件注冊時傳入。
具體示例:
- 消息的發布訂閱獨立調用示例
// 全局初始化,注入訂閱者實現 const string msgPSKey = "Publisher-Subscriber-MsgKey"; DataFlowFactory.RegisterSubscriber<MsgData>(msgPSKey, async (data) => { // 當前通過注入消費的委托方法,也可通過接口實現 // DoSomething(data); return true; }); // 獲取發布者接口 private static readonly IDataPublisher publisher = DataFlowFactory.CreatePublisher(); // 業務方法中發布消息 await publisher.Publish(msgPSKey,new MsgData() {name = "test"});
2. 消息的流式調用示例
// 直接注冊消費實現並獲取消息發布接口 private static readonly IDataPublisher _delegateFlowpusher = DataFlowFactory.RegisterFlow<MsgData>("delegate_flow",async (data) => { // 當前通過注入消費的委托方法,也可通過接口實現 // DoSomething(data); return true; }); // 業務方法中發布消息 await _delegateFlowpusher.Publish("normal_flow",new MsgData() {name = "test"});
如上,只需要獲取發布者,並注入消費實現,即可完成整個消息的異步消費處理,同一個消息key可以注冊多個消費實現,當有消息進入消費時,會並發處理。
二. 消息底層存儲適配擴展
前邊介紹了業務接口的使用,和具體消息隊列或數據庫等隔離,這是對接業務層面的使用。因為業務場景不同,不同的項目對消息的響應速度和處理機制又各有需求,所以 OSS.DataFlow 同樣提供了對接消息產品的擴展接口,方便使用者適配已有消息基礎設施。
1. 消息存儲適配接口
對於事件消息處理,需要關注兩件事情:接收存儲 和 消費觸發。在類庫中提供了 DataFlowManager 消息流管理類,用戶可以通過實現IDataPublisherProvider接口,完成具體的存儲實現。
同時在不同的消息產品觸發消費時(比如數據庫定時任務或者RabbitMQ消費), 調用通知方法(NotifySubscriber ),來觸發通過類庫注冊的具體的業務訂閱處理。
// 消息流核心部件管理者 public static class DataFlowManager { /// <summary> /// 自定義 數據流發布(存儲)實現的 提供者 /// </summary> public static IDataPublisherProvider PublisherProvider { get; set; } /// <summary> /// 通過自定義消息觸發機制通知訂閱者 /// 調用時請做異常攔截,防止臟數據導致 msgData 類型錯誤 /// </summary> /// <param name="msgDataKey"></param> /// <param name="msgData">消息內容,自定義觸發時,請注意和注冊訂閱者的消費數據類型轉換安全</param> /// <returns></returns> public static Task<bool> NotifySubscriber(string msgDataKey, object msgData) { .... } }
關於 IDataPublisherProvider
public interface IDataPublisherProvider { /// <summary> /// 數據發布者 /// </summary> /// <param name="option"></param> /// <returns> 返回消息發布接口實現 </returns> IDataPublisher CreatePublisher(DataPublisherOption option); } /// <summary> /// 數據的發布者 /// </summary> public interface IDataPublisher { /// <summary> /// 推進數據(存儲具體消息隊列或者數據庫實現) /// </summary> /// <param name="dataKey"></param> /// <param name="data"></param> /// <returns>是否推入成功</returns> Task<bool> Publish<TData>(string dataKey,TData data); }
可以看到 IDataPublisher 接口負責具體的存儲實現,可以根據 DataPublisherOption 的 source_name 業務屬性實現對不同業務需求返回不同的具體實現。
2. 默認實現介紹
借助.Net 自身的內存消息隊列,在類庫中提供了默認的內部消息存儲轉發實現(內存級別),使用者可以自行實現擴展相關接口並進行全局配置。
內置的.Net Core消息隊列, 設置了默認1個隊列,最大並發為32線程。 如果需要可以通過設置DataPublisherOption的source_name,類庫將會為每個source_name 創建獨立的內存隊列。
如果你已經看到這里,並且感覺還行的話可以在下方點個贊,或者也可以關注我的公總號(見二維碼)