C#訂閱Kafka消息一直不能消費的情況怎么處理?


最近跟數據部門對接時對方提供的kafka訂閱服務,於是找了資料,寫了個C#控制台程序消費了這個服務。

本文主要記錄的內容是C#消費Kafka消息時選用kafka-net組件,遇到offset不是從0開始的情況時處理方法。

按照入門教程搭建測試環境並調試一切正常。

在生產環境中部署后遇到一直閃爍卻無法消費的問題,看日志是可以連接成功的,后來跟生產方溝通后得知運維在部署kafka的時候設置了消息保留3天。

那么三天前的消息自動清除后,前面的offset在kafka里已經不存在了,這時候我們把offset設置成0時就會出現上述情況,那怎么獲得起始的offset呢?

一、首先在本地安裝jdk、Kafka和zookeeper。

1、jdk建議使用1.7以上版本。

2、下載安裝並運行zookeeper (下載地址 http://zookeeper.apache.org/releases.html)

3、安裝並運行Kafka (下載地址 http://kafka.apache.org/downloads.html

如上安裝步驟在車江毅的博客《.net windows Kafka 安裝與使用入門(入門筆記)》中寫的很詳細,我就不重復了。

二、在kafka中創建topic、生產者、消費者,上述博客中有詳細命令,這里跳過。

三、根據上述博客中推薦,選用了kafka-net。源碼地址: https://github.com/Jroland/kafka-net

1、生產者代碼

private static void Produce(string broker, string topic,string message)
        {
            var options = new KafkaOptions(new Uri(broker));
            var router = new BrokerRouter(options);
            var client = new Producer(router);

            var currentDatetime = DateTime.Now;
            var key = currentDatetime.Second.ToString();
            var events = new[] { new Message(message) };
            client.SendMessageAsync(topic, events).Wait(1500);
            Console.WriteLine("Produced: Key: {0}. Message: {1}", key, events[0].Value.ToUtf8String());

            using (client) { }
        }

//調用示例:
Produce("http://地址:端口", "TopicName", "Hello World");

2、消費者代碼,上面都是為了搭建測試環境,這次的任務是消費其他組提供的kafka消息

private static void Consume(string broker, string topic)
       {
            Uri[] UriArr = Array.ConvertAll<string, Uri>(broker.Split(','), delegate (string s) { return new Uri(s); });
            var options = new KafkaOptions(UriArr);
            var router = new BrokerRouter(options);
            var consumer = new Consumer(new ConsumerOptions(topic, router));

       //實際生產環境中消費不是每次都從0 offset開始的,查了資料發現java可以設置自動參數配置從最小offset開始或者最大offset開始
       //但kafka-net這個sdk中沒有找到,所以就下面這段代碼配置了offset的起始位置 Task<List<OffsetResponse>> offTask = consumer.GetTopicOffsetAsync(topic); var offsetResponseList = offTask.Result; var positionArr = new OffsetPosition[offsetResponseList.Count]; for (int i = 0; i < offsetResponseList.Count; i++) { //LogHelper.WriteLog(string.Format("獲取到Kafka OffsetPosition信息 Partition:{0} maxOffset:{1} minOffset:{2}", offsetResponseList[i].PartitionId, offsetResponseList[i].Offsets[0], offsetResponseList[i].Offsets[1])); positionArr[i] = new OffsetPosition(offsetResponseList[i].PartitionId, offsetResponseList[i].Offsets[1]);//這里Offsets[0]是最大值,我們從頭消費用的最小值Offsets[1] } Console.WriteLine("開始執行"); //如果作業執行一段時間后重啟,下面可以從數據庫中讀取后再配置開始讀取的位置 /*  try { using (IDbConnection dbConn = SingleDataBase.DBFactory.OpenRead<MessageQueue>()) { string sql = string.Format("SELECT [Partition], MAX(Offset)Offset FROM [表名] WITH(NOLOCK) WHERE topic='{0}' GROUP BY [Partition]", topic); var r = dbConn.SqlList<OffsetPositionEntity>(sql); if (r.Count > 0) { positionArr = Array.ConvertAll<OffsetPositionEntity, OffsetPosition>(r.ToArray(), delegate (OffsetPositionEntity p) { return new OffsetPosition(p.Partition, p.Offset + 1); }); } } } catch { //讀取上次獲取位置失敗,從頭開始獲取,可能會導致重復獲取 //LogHelper.WriteLog("沒有獲取到上次截至節點,將從頭開始獲取"); }        */ consumer.SetOffsetPosition(positionArr); //Consume returns a blocking IEnumerable (ie: never ending stream) foreach (var message in consumer.Consume()) { try { string messageStr = message.Value.ToUtf8String(); Console.WriteLine(messageStr); long r = 0; /*這里已經刪掉了寫入數據庫部分*/ //LogHelper.WriteLog(string.Format("Response: Partition {0},Offset {1} : {2} ,Insert : {3}", message.Meta.PartitionId, message.Meta.Offset, messageStr, r)); } catch (Exception ex) { //LogHelper.WriteLog("消費Kafka消息保存到數據庫時報錯:" + ex.Message); } } } //調用示例: Consume("http://100.162.136.70:9092,http://100.162.136.71:9092,http://100.162.136.72:9092", "YourTopicName");

  以上代碼中LogHelper.WriteLog部分用Console.WriteLine替換后可以直接輸出到屏幕上,這里沒有提供寫日志相關方法。

 

  這篇文章寫出來,就是為了記錄高亮顯示的那部分片段,其他都是為了讓文章盡可能的完整。如果有朋友對kafka-net比較熟悉,有更方便的配置方式請留言分享,多謝~~~

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM