NET中解決KafKa多線程發送多主題的問題


  一般在KafKa消費程序中消費可以設置多個主題,那在同一程序中需要向KafKa發送不同主題的消息,如異常需要發到異常主題,正常的發送到正常的主題,這時候就需要實例化多個主題,然后逐個發送。

  在NET中用RdKafka組件來做消息處理,在Nuget中引用。

  在程序中初始化Producer,並創建多個Topic

        private string comtopic = "topic1";
        private string errtopic = "topic2";
        private string kfkip = "192.168.80.32:9092";
        Topic topic = null;
        Topic errTopic = null;

        public ExcuteFlow()
        {
            try
            {
                Producer producer = new Producer(kfkip);
                topic = producer.Topic(comtopic);
                errTopic = producer.Topic(errtopic);
            }
            catch (RdKafkaException ex)
            {
                LogHelper.Error("KafKa初始化KafKa異常 ", ex);
            }
            catch (Exception ex)
            {
                LogHelper.Error("KafKa初始化異常", ex);
            }

        }

  在程序中發送其中一個主題:

          try
            {

                if (topic != null)
                {
                    byte[] datas = Encoding.UTF8.GetBytes(JsonHelper.ToJson(flowCommond));
                    Task<DeliveryReport> deliveryReport = topic.Produce(datas);
                    var unused = deliveryReport.ContinueWith(task =>
                    {
                        LogHelper.Info("內容:{flowCommond.ID} 發送到分區:{task.Result.Partition}, Offset 為: {task.Result.Offset}");
                    });
                }
                else
                {
                    throw new Exception("發送消息到KafKa topic 為空");
                }
            }
            catch (RdKafkaException ex)
            {
                LogHelper.Error("發送消息到KafKa  KafKa異常", ex);
            }
            catch (Exception ex)
            {
                LogHelper.Error("發送消息到KafKa異常", ex);
            }

  flowCommond為要發送的對象內容,格式化為Json字符串再發送。

  另一個主題一樣處理。

   這里實現一個線程里面發送多個主題,那下面實現多個線程中如何發送多個主題。

  多線程中如果每個線程都new Producer(kfkip) 一次,那KafKa的連接很快會被占滿。

  那這里就用單例模式來解決這個問題,每次要用到Producer時檢查一下是否已經存在Producer實例,若存在則直接用不用再生成。

    /// <summary>
    /// 單例模式的實現
    /// </summary>
    public class SingleProduct : Producer
    {
        // 定義一個靜態變量來保存類的實例
        private static SingleProduct uniqueInstance;
        // 定義一個標識確保線程同步
        private static readonly object locker = new object();
        // 定義私有構造函數,使外界不能創建該類實例
        private SingleProduct(string brokerList) : base(brokerList)
        {
        }

        /// <summary>
        /// 定義公有方法提供一個全局訪問點,同時你也可以定義公有屬性來提供全局訪問點
        /// </summary>
        /// <returns></returns>
        public static SingleProduct GetInstance()
        {
            // 當第一個線程運行到這里時,此時會對locker對象 "加鎖",
            // 當第二個線程運行該方法時,首先檢測到locker對象為"加鎖"狀態,該線程就會掛起等待第一個線程解鎖
            // lock語句運行完之后(即線程運行完之后)會對該對象"解鎖"
            if (uniqueInstance == null)
            {
                lock (locker)
                {
                    // 如果類的實例不存在則創建,否則直接返回
                    if (uniqueInstance == null)
                    {
                        string kfkip = System.Configuration.ConfigurationManager.AppSettings["KfkIP"];

                        try
                        {
                            uniqueInstance = new SingleProduct(kfkip);
                            LogHelper.Error("單例模式 實例化 SingleProduct");
                        }
                        catch (RdKafkaException ex)
                        {
                            LogHelper.Error("單例模式 KafKa初始化KafKa異常 ", ex);
                        }
                        catch (Exception ex)
                        {
                            LogHelper.Error("單例模式 KafKa初始化異常", ex);
                        }
                    }
                }
            }

            return uniqueInstance;
        }
    }

   然后在初始化的代碼中替換Producer producer = new Producer(kfkip);為 Producer producer = SingleProduct.GetInstance();

  OK!以上就完成了多線程多主題的消息發送。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM