一般在KafKa消費程序中消費可以設置多個主題,那在同一程序中需要向KafKa發送不同主題的消息,如異常需要發到異常主題,正常的發送到正常的主題,這時候就需要實例化多個主題,然后逐個發送。
在NET中用RdKafka組件來做消息處理,在Nuget中引用。
在程序中初始化Producer,並創建多個Topic
private string comtopic = "topic1"; private string errtopic = "topic2"; private string kfkip = "192.168.80.32:9092"; Topic topic = null; Topic errTopic = null; public ExcuteFlow() { try { Producer producer = new Producer(kfkip); topic = producer.Topic(comtopic); errTopic = producer.Topic(errtopic); } catch (RdKafkaException ex) { LogHelper.Error("KafKa初始化KafKa異常 ", ex); } catch (Exception ex) { LogHelper.Error("KafKa初始化異常", ex); } }
在程序中發送其中一個主題:
try { if (topic != null) { byte[] datas = Encoding.UTF8.GetBytes(JsonHelper.ToJson(flowCommond)); Task<DeliveryReport> deliveryReport = topic.Produce(datas); var unused = deliveryReport.ContinueWith(task => { LogHelper.Info("內容:{flowCommond.ID} 發送到分區:{task.Result.Partition}, Offset 為: {task.Result.Offset}"); }); } else { throw new Exception("發送消息到KafKa topic 為空"); } } catch (RdKafkaException ex) { LogHelper.Error("發送消息到KafKa KafKa異常", ex); } catch (Exception ex) { LogHelper.Error("發送消息到KafKa異常", ex); }
flowCommond為要發送的對象內容,格式化為Json字符串再發送。
另一個主題一樣處理。
這里實現一個線程里面發送多個主題,那下面實現多個線程中如何發送多個主題。
多線程中如果每個線程都new Producer(kfkip) 一次,那KafKa的連接很快會被占滿。
那這里就用單例模式來解決這個問題,每次要用到Producer時檢查一下是否已經存在Producer實例,若存在則直接用不用再生成。
/// <summary> /// 單例模式的實現 /// </summary> public class SingleProduct : Producer { // 定義一個靜態變量來保存類的實例 private static SingleProduct uniqueInstance; // 定義一個標識確保線程同步 private static readonly object locker = new object(); // 定義私有構造函數,使外界不能創建該類實例 private SingleProduct(string brokerList) : base(brokerList) { } /// <summary> /// 定義公有方法提供一個全局訪問點,同時你也可以定義公有屬性來提供全局訪問點 /// </summary> /// <returns></returns> public static SingleProduct GetInstance() { // 當第一個線程運行到這里時,此時會對locker對象 "加鎖", // 當第二個線程運行該方法時,首先檢測到locker對象為"加鎖"狀態,該線程就會掛起等待第一個線程解鎖 // lock語句運行完之后(即線程運行完之后)會對該對象"解鎖" if (uniqueInstance == null) { lock (locker) { // 如果類的實例不存在則創建,否則直接返回 if (uniqueInstance == null) { string kfkip = System.Configuration.ConfigurationManager.AppSettings["KfkIP"]; try { uniqueInstance = new SingleProduct(kfkip); LogHelper.Error("單例模式 實例化 SingleProduct"); } catch (RdKafkaException ex) { LogHelper.Error("單例模式 KafKa初始化KafKa異常 ", ex); } catch (Exception ex) { LogHelper.Error("單例模式 KafKa初始化異常", ex); } } } } return uniqueInstance; } }
然后在初始化的代碼中替換Producer producer = new Producer(kfkip);為 Producer producer = SingleProduct.GetInstance();
OK!以上就完成了多線程多主題的消息發送。