事件消息生產消費中間件-OSS.DataFlow


  系統重構解耦的過程涉及不同領域服務分拆,或同一服務下實時響應部分和非響應部分分拆,分解后的各部分通過異步消息的流轉傳遞,完成整體的業務邏輯,但是頻繁的在業務層面直接調用不同消息隊列的SDK,個人感覺不夠簡潔,最近開源一個中間件OSS.Dataflow,希望能幫到看到的同學。

  OSS.Dataflow主要實現異步消息傳遞的過程抽象,在業務層面提供消息發布訂閱的統一抽象接口,在業務邏輯分支之間,以簡單的調用完成消息的傳遞,和具體的消息存儲觸發實現無關。同時,在底層的存儲和觸發層面提取接口,能夠在系統的全局適配具體的消息基礎設施。(在這些接口之上,還實現了事件處理器,通過消息的重復投放,實現事件執行的容錯補充機制,這個后邊文章再介紹,源代碼單元測試有示例。)

一. 消息業務側使用

   OSS.Dataflow 的代碼可以通過GiteeGitHub獲取,使用時可以通過Nuget直接安裝,也可以通過命令行:Install-Package OSS.DataFlow

  組件的使用非常簡單,只需要關注:

  1. 消息發布者接口,由組件注冊時返回,供業務方法調用傳入消息體。
  2. 消息訂閱(消費)者接口實現或委托方法,在組件注冊時傳入。

  具體示例:

  1. 消息的發布訂閱獨立調用示例
       // 全局初始化,注入訂閱者實現
    const string msgPSKey = "Publisher-Subscriber-MsgKey";
    DataFlowFactory.RegisterSubscriber<MsgData>(msgPSKey, async (data) =>
            {
                // 當前通過注入消費的委托方法,也可通過接口實現
                // DoSomething(data);
                return true;
            });

    //    獲取發布者接口
    private static readonly IDataPublisher publisher = DataFlowFactory.CreatePublisher(); 

    //  業務方法中發布消息
    await publisher.Publish(msgPSKey,new MsgData() {name = "test"});    

   2. 消息的流式調用示例

    // 直接注冊消費實現並獲取消息發布接口
    private static readonly IDataPublisher _delegateFlowpusher = 
        DataFlowFactory.RegisterFlow<MsgData>("delegate_flow",async (data) =>
            {
                // 當前通過注入消費的委托方法,也可通過接口實現
                // DoSomething(data);
                return true;
            });

    // 業務方法中發布消息
    await _delegateFlowpusher.Publish("normal_flow",new MsgData() {name = "test"});

  如上,只需要獲取發布者,並注入消費實現,即可完成整個消息的異步消費處理,同一個消息key可以注冊多個消費實現,當有消息進入消費時,會並發處理。

二. 消息底層存儲適配擴展

  前邊介紹了業務接口的使用,和具體消息隊列或數據庫等隔離,這是對接業務層面的使用。因為業務場景不同,不同的項目對消息的響應速度和處理機制又各有需求,所以 OSS.DataFlow 同樣提供了對接消息產品的擴展接口,方便使用者適配已有消息基礎設施。

 1. 消息存儲適配接口

  對於事件消息處理,需要關注兩件事情:接收存儲 和 消費觸發。在類庫中提供了 DataFlowManager 消息流管理類,用戶可以通過實現IDataPublisherProvider接口,完成具體的存儲實現。

  同時在不同的消息產品觸發消費時(比如數據庫定時任務或者RabbitMQ消費), 調用通知方法(NotifySubscriber ),來觸發通過類庫注冊的具體的業務訂閱處理。

    // 消息流核心部件管理者
    public static class DataFlowManager
    {
        /// <summary>
        /// 自定義 數據流發布(存儲)實現的 提供者
        /// </summary>
        public static IDataPublisherProvider PublisherProvider { get; set; }

        /// <summary>
        ///  通過自定義消息觸發機制通知訂閱者
        ///     調用時請做異常攔截,防止臟數據導致 msgData 類型錯誤
        /// </summary>
        /// <param name="msgDataKey"></param>
        /// <param name="msgData">消息內容,自定義觸發時,請注意和注冊訂閱者的消費數據類型轉換安全</param>
        /// <returns></returns>
        public static Task<bool> NotifySubscriber(string msgDataKey, object msgData)
        {
            ....
        }
    }

  關於 IDataPublisherProvider

   public interface IDataPublisherProvider
    {
        /// <summary>
        /// 數據發布者
        /// </summary>
        /// <param name="option"></param>
        /// <returns> 返回消息發布接口實現 </returns>
        IDataPublisher CreatePublisher(DataPublisherOption option);
    }

  /// <summary>
    ///  數據的發布者
    /// </summary>
    public interface IDataPublisher
    {
        /// <summary>
        /// 推進數據(存儲具體消息隊列或者數據庫實現)
        /// </summary>
        /// <param name="dataKey"></param>
        /// <param name="data"></param>
        /// <returns>是否推入成功</returns>
        Task<bool> Publish<TData>(string dataKey,TData data);
    }

  可以看到 IDataPublisher 接口負責具體的存儲實現,可以根據 DataPublisherOption 的 source_name 業務屬性實現對不同業務需求返回不同的具體實現。

 2. 默認實現介紹

  借助.Net 自身的內存消息隊列,在類庫中提供了默認的內部消息存儲轉發實現(內存級別),使用者可以自行實現擴展相關接口並進行全局配置。

  內置的.Net Core消息隊列, 設置了默認1個隊列,最大並發為32線程。 如果需要可以通過設置DataPublisherOption的source_name,類庫將會為每個source_name 創建獨立的內存隊列。

 

如果你已經看到這里,並且感覺還行的話可以在下方點個贊,或者也可以關注我的公總號(見二維碼)


 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM