基於kafka-net實現的可以長鏈接的消息生產者


      今天有點時間,我就來說兩句。最近接觸的Kafka相關的東西要多一些,其實以前也接觸過,但是在項目使用中的經驗不是很多。最近公司的項目里面使用了Kafka消息中間件,由於以前的人員編寫的客戶端的類不是很好,沒有設計的概念,就是一個簡單類的功能羅列,沒有考慮到后期的擴展和維護(以后可能會兼容其他形式的消息隊列,要做到無縫銜接),所以這個重構的任務就落到我的身上。

      先說說我的感受,然后再貼出代碼的實現吧。我第一次是基於Confluent.Kafka編寫的Kafka消息生產者,后來經過測試,同步操作的時間比較長,要完成20萬數據發送消息並更新到數據庫的時間大概是16-18分鍾,這個結果有點讓人不能接受。為了提高性能,也做了很多測試,都沒有辦法解決這個問題。后來抱着試試看的想法,我又基於kafka-net重新實現了Kafka消息的生產者。經過測試,完成同樣的任務,時間大概需要3分鍾左右。兩種實現方法完成同樣的任務,都是以同步的方式生產消息,並將消息成功發送到Broker后,再將數據插入到數據庫做記錄。大家不要糾結為什么這樣使用消息隊列,這是上頭的做法,我還不能做大的改動,我也無奈。

      目前看,基於kafka-net實現的消息生產者在生產消息並發送成功所需要的時間要比基於Confluent.Kafka實現的消息生產者的所需要的時間要少,尤其是發送的數據越多,這個時間的差距越大。具體的原因還不清楚,如果有高手可以不吝賜教。好了,我該上代碼了。

      開始代碼之前,要說明一點:Confluent.Kafka的Broker是不需要帶Http://這個前綴的,但是 kafka-net 的Broker是有http://這個前綴的,大家要注意這個,剛開始的時候我也被坑了一下子。
   

using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Enterprise.Framework.MessageQueue
{
    /// <summary>
    /// 消息生產者的接口定義,所有消息生產者的實現必須繼承該接口
    /// </summary>
    public interface IMessageProducer
    {
        /// <summary>
        /// 將指定的消息內容發送到消息服務器並存放在指定的主題名稱里
        /// </summary>
        /// <param name="topic">發送消息的主題名稱,這個主題就是對消息的分類,不同的主題存放不同的消息,該參數不能為空,空值會拋出異常</param>
        /// <param name="message">需要發送的消息內容,該參數不能為空,空值會拋出異常</param>        
        void Produce(string topic, string message);
    }
}
 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading.Tasks;
 6 
 7 namespace Enterprise.Framework.MessageQueue
 8 {
 9     /// <summary>
10     /// Kafka消息生產者的接口定義,所有Kafka消息生產者的實現必須繼承該接口
11     /// </summary>
12     public interface IKafkaMessageProducer : IMessageProducer
13     {
14         /// <summary>
15         /// 將指定的消息內容發送到消息服務器並存放在指定的主題名稱里
16         /// </summary>
17         /// <param name="topic">發送消息的主題名稱,這個主題就是對消息的分類,不同的主題存放不同的消息,該參數不能為空,空值會拋出異常</param>
18         /// <param name="message">需要發送的消息內容,該參數不能為空,空值會拋出異常</param>
19         /// <param name="producedAction">當消息生產完成並成功發送到服務器后,可以對成功生產並發送的消息執行代理所封裝方法的操作,默認值為空</param>
20         void Produce(string topic, string message, Action<MessageResult> producedAction = null);
21     }
22 }
 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Runtime.InteropServices;
 5 using System.Text;
 6 
 7 namespace Enterprise.Framework.AbstractInterface
 8 {
 9     /// <summary>
10     /// 該抽象類定義了所有需要釋放資源類型的抽象類
11     /// </summary>
12     public abstract class DisposableBase : IDisposable
13     {
14         private bool disposed = false;
15 
16         /// <summary>
17         /// 實現IDisposable中的Dispose方法
18         /// </summary>
19         public void Dispose()
20         {
21             //必須為true
22             Dispose(true);
23             //通知垃圾回收機制不再調用終結器(析構器)
24             GC.SuppressFinalize(this);
25         }
26 
27         /// <summary>
28         /// 不是必要的,提供一個Close方法僅僅是為了更符合其他語言(如C++)的規范
29         /// </summary>
30         public void Close()
31         {
32             Dispose();
33         }
34 
35         /// <summary>
36         /// 必須,以備程序員忘記了顯式調用Dispose方法
37         /// </summary>
38         ~DisposableBase()
39         {
40             //必須為false
41             Dispose(false);
42         }
43 
44         /// <summary>
45         /// 非密封類修飾用protected virtual
46         /// 密封類修飾用private
47         /// </summary>
48         /// <param name="disposing">是否要清理托管資源,true表示需要清理托管資源,false表示不需要清理托管資源</param>
49         protected virtual void Dispose(bool disposing)
50         {
51             if (disposed)
52             {
53                 return;
54             }
55             if (disposing)
56             {
57                 // 清理托管資源                
58                 DisposeManagedResources();
59             }
60             // 清理非托管資源
61             DisposeUnmanagedResource();
62             //讓類型知道自己已經被釋放
63             disposed = true;
64         }
65 
66         /// <summary>
67         /// 釋放托管資源
68         /// </summary>
69         protected abstract void DisposeManagedResources();
70 
71         /// <summary>
72         /// 釋放非托管資源
73         /// </summary>
74         protected abstract void DisposeUnmanagedResource();
75     }
76 }
  1 using KafkaNet;
  2 using KafkaNet.Model;
  3 using KafkaNet.Protocol;
  4 using System;
  5 using System.Collections.Generic;
  6 using System.IO;
  7 using System.Linq;
  8 using System.Text;
  9 using System.Threading;
 10 using System.Threading.Tasks;
 11 using ThreeSoft.Framework.AbstractInterface;
 12 
 13 namespace Enterprise.Framework.MessageQueue
 14 {
 15     /// <summary>
 16     /// Kafka消息生產者具體的實現類,可以針對長鏈接進行消息發送處理,不用頻繁進行消息組件的創建和銷毀的工作
 17     /// </summary>
 18     public sealed class KafkaMessageKeepAliveProducer : DisposableBase, IDisposable, IKafkaMessageProducer
 19     {
 20         #region 私有字段
 21 
 22         private KafkaNet.Producer _producer;
 23         private BrokerRouter _brokerRouter;
 24         private string _broker;
 25 
 26         #endregion
 27 
 28         #region 構造函數
 29 
 30         /// <summary>
 31         /// 通過構造函數初始化消息隊列的服務器
 32         /// </summary>
 33         /// <param name="broker">消息隊列服務器地址,該值不能為空</param>
 34         public KafkaMessageKeepAliveProducer(string broker)
 35         {
 36             if (string.IsNullOrEmpty(broker) || string.IsNullOrWhiteSpace(broker))
 37             {
 38                 throw new ArgumentNullException("消息隊列服務器的地址不可以為空!");
 39             }
 40 
 41             #region kafka-net實現
 42 
 43             Uri[] brokerUriList = null;
 44 
 45             if (broker.IndexOf(',') >= 0)
 46             {
 47                 string[] brokers = broker.Split(',');
 48                 brokerUriList = new Uri[brokers.Length];
 49 
 50                 for (int i = 0; i < brokers.Length; i++)
 51                 {
 52                     brokerUriList[i] = new Uri(brokers[i]);
 53                 }
 54             }
 55             else
 56             {
 57                 brokerUriList = new Uri[] { new Uri(broker) };
 58             }
 59 
 60             var kafkaOptions = new KafkaOptions(brokerUriList);
 61             _brokerRouter = new BrokerRouter(kafkaOptions);
 62             _producer = new KafkaNet.Producer(_brokerRouter);
 63 
 64             #endregion
 65 
 66             _broker = broker;
 67         }
 68 
 69         #endregion
 70 
 71         #region 實例屬性
 72 
 73         /// <summary>
 74         /// 獲取消息服務器的地址
 75         /// </summary>
 76         public string Broker
 77         {
 78             get { return _broker; }
 79         }
 80 
 81         #endregion
 82 
 83         #region 發送消息的方法
 84 
 85         /// <summary>
 86         /// 將指定的消息內容發送到消息服務器並存放在指定的主題名稱里
 87         /// </summary>        
 88         /// <param name="topic">發送消息的主題名稱,這個主題就是對消息的分類,不同的主題存放不同的消息,該參數不能為空,空值會拋出異常</param>
 89         /// <param name="message">需要發送的消息內容,該參數不能為空,空值會拋出異常</param>
 90         /// <param name="producedAction">當消息生產完成並成功發送到服務器后,可以對成功生產並發送的消息執行代理所封裝方法的操作</param>
 91         public void Produce(string topic, string message, Action<MessageResult> producedAction = null)
 92         {
 93             #region 同步實現
 94 
 95             var currentDatetime = DateTime.Now;
 96             var key = currentDatetime.Second.ToString();
 97             var events = new[] { new KafkaNet.Protocol.Message(message, key) };
 98             List<ProduceResponse> result = _producer.SendMessageAsync(topic, events).Result;
 99 
100             if (producedAction != null && result != null && result.Count > 0)
101             {
102                 MessageResult messageResult = new MessageResult { Broker = Broker, GroupID = null, Message = message, Offset = result[0].Offset, Partition = result[0].PartitionId, Topic = result[0].Topic };
103                 producedAction(messageResult);
104             }
105 
106             #endregion
107         }
108 
109         /// <summary>
110         /// 將指定的消息內容發送到消息服務器並存放在指定的主題名稱里
111         /// </summary>        
112         /// <param name="topic">發送消息的主題名稱,這個主題就是對消息的分類,不同的主題存放不同的消息,該參數不能為空,空值會拋出異常</param>
113         /// <param name="message">需要發送的消息內容,該參數不能為空,空值會拋出異常</param>        
114         public void Produce(string topic, string message)
115         {
116             Produce(topic, message, null);
117         }
118 
119         #endregion
120 
121         #region 實現消息隊列資源的釋放
122 
123         /// <summary>
124         /// 析構函數釋放資源
125         /// </summary>
126         ~KafkaMessageKeepAliveProducer()
127         {
128             Dispose(false);
129         }
130 
131         /// <summary>
132         /// 釋放托管資源
133         /// </summary>
134         protected override void DisposeManagedResources()
135         {
136             if (_producer != null)
137             {
138                 _producer.Dispose();
139             }
140             if (_brokerRouter != null)
141             {
142                 _brokerRouter.Dispose();
143             }
144         }
145 
146         /// <summary>
147         /// 釋放非托管資源
148         /// </summary>
149         protected override void DisposeUnmanagedResource(){}
150 
151         #endregion
152     }
153 }


    好了,今天就寫到這里了,每天進步一點點,努力堅持。不忘初心,繼續努力吧,歡迎大家前來討論。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM