C# 使用kafka 消費者-生產者訂閱


一:kafka介紹

kafka(官網地址:http://kafka.apache.org)是一種高吞吐量的分布式發布訂閱的消息隊列系統,具有高性能和高吞吐率。

1.1 術語介紹

  • Broker

Kafka集群包含一個或多個服務器,這種服務器被稱為broker

  • Topic

主題:每條發布到Kafka集群的消息都有一個類別,這個類別被稱為Topic。(物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的Topic即可生產或消費數據而不必關心數據存於何處)

  • Partition

分區:Partition是物理上的概念,每個Topic包含一個或多個Partition.(一般為kafka節點數cpu的總核數)

  • Producer

生產者,負責發布消息到Kafka broker

  • Consumer

消費者:從Kafka broker讀取消息的客戶端。

  • Consumer Group

消費者組:每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬於默認的group)。
1.2 基本特性
可擴展性
在不需要下線的情況下進行擴容
數據流分區(partition)存儲在多個機器上
高性能
單個broker就能服務上千客戶端
單個broker每秒種讀/寫可達每秒幾百兆字節
多個brokers組成的集群將達到非常強的吞吐能力
性能穩定,無論數據多大
Kafka在底層摒棄了Java堆緩存機制,采用了操作系統級別的頁緩存,同時將隨機寫操作改為順序寫,再結合Zero-Copy的特性極大地改善了IO性能。
1.3 消息格式
一個topic對應一種消息格式,因此消息用topic分類
一個topic代表的消息有1個或者多個patition(s)組成
一個partition應該存放在一到多個server上,如果只有一個server,就沒有冗余備份,是單機而不是集群;如果有多個server,一個server為leader(領導者),其他servers為followers(跟隨者),leader需要接受讀寫請求,followers僅作冗余備份,leader出現故障,會自動選舉一個follower作為leader,保證服務不中斷;每個server都可能扮演一些partitions的leader和其它partitions的follower角色,這樣整個集群就會達到負載均衡的效果
消息按順序存放;消息順序不可變;只能追加消息,不能插入;每個消息都有一個offset,用作消息ID, 在一個partition中唯一;offset有consumer保存和管理,因此讀取順序實際上是完全有consumer決定的,不一定是線性的;消息有超時日期,過期則刪除
1.4 原理解析
producer創建一個topic時,可以指定該topic為幾個partition(默認是1,配置num.partitions),然后會把partition分配到每個broker上,分配的算法是:a個broker,第b個partition分配到b%a的broker上,可以指定有每個partition有幾分副本Replication,副本的分配策略為:第c個副本存儲在第(b+c)%a的broker上。一個partition在每個broker上是一個文件夾,文件夾中文件的命名方式為:topic名稱+有序序號。每個partition中文件是一個個的segment,segment file由.index和.log文件組成。兩個文件的命名規則是,上一個segmentfile的最后一個offset。這樣,可以快速的刪除old文件。

producer往kafka里push數據,會自動的push到所有的分區上,消息是否push成功有幾種情況:1,接收到partition的ack就算成功,2全部副本都寫成功才算成功;數據可以存儲多久,默認是兩天;producer的數據會先存到緩存中,等大小或時間達到閾值時,flush到磁盤,consumer只能讀到磁盤中的數據。

consumer從kafka里poll數據,poll到一定配置大小的數據放到內存中處理。每個group里的consumer共同消費全部的消息,不同group里的數據不能消費同樣的數據,即每個group消費一組數據。

consumer的數量和partition的數量相等時消費的效率最高。這樣,kafka可以橫向的擴充broker數量和partitions;數據順序寫入磁盤;producer和consumer異步

二:環境搭建(windows)

這里小編上一篇文章已經講過如何在windows搭建kafka -> https://www.cnblogs.com/IT-Ramon/p/12017745.html

三:基於.net的常用類庫

基於.net實現kafka的消息隊列應用,常用的類庫有kafka-net,Confluent.Kafka,官網推薦使用Confluent.Kafka,本文也是基於該庫的實現,使用版本預發行版1.0.0-beta2,創建控制台應用程序。

 

四:應用–生產者

生產者將數據發布到指定的主題,一般生產環境下的負載均衡,服務代理會有多個,BootstrapServers屬性則為以逗號隔開的多個代理地址

using Confluent.Kafka;
using System;

namespace kafka.Producer
{
    public class KafkaProducer
    {
        /// <summary>
        /// 生產者
        /// </summary>
        public static void Produce(string message)
        {
            var config = new ProducerConfig { BootstrapServers = "localhost:9092" };
            using (var producer = new Producer<Null, string>(config))
            {
                // 錯誤日志監視
                producer.OnError += (_, msg) => { Console.WriteLine($"Producer_Erro信息:Code:{msg.Code};Reason:{msg.Reason};IsError:{msg.IsError}"); };

                // 異步發送消息到主題
                producer.BeginProduce("MyTopic", new Message<Null, string> { Value = message }, r => {
                    Console.WriteLine(!r.Error.IsError ? $"Delivered message to {r.TopicPartitionOffset}" : $"Delivery Error: {r.Error.Reason}");
                });
                // 3后 Flush到磁盤
                producer.Flush(TimeSpan.FromSeconds(3));
            }
        }
    }
}

控制台調用

using System;

namespace kafka.Producer.App
{
    class Program
    {
        static void Main(string[] args)
        {
            while (true)
            {
                var s = Console.ReadLine();
                KafkaProducer.Produce(s);
            }
        }
    }
}

  

五:應用–消費者

消費者使用消費者組名稱標記自己,並且發布到主題的每個記錄被傳遞到每個訂閱消費者組中的一個消費者實例。消費者實例可以在單獨的進程中,也可以在不同的機器

如果所有消費者實例具有相同的消費者組,則記錄將有效地在消費者實例上進行負載平衡。

如果所有消費者實例具有不同的消費者組,則每個記錄將廣播到所有消費者進程

上圖為兩個服務器Kafka群集,托管四個分區(P0-P3),包含兩個消費者組。消費者組A有兩個消費者實例,B組有四個消費者實例。

默認EnableAutoCommit 是自動提交,只要從隊列取出消息,偏移量自動移到后一位,無論消息后續處理成功與否,該條消息都會消失,所以為免除處理失敗的數據丟失,消費者方可設置該屬性為false,后面進行手動commint()提交偏移

using Confluent.Kafka;
using System;

namespace kafkaTest
{
    public class KafkaConsumer
    {
        /// <summary>
        /// 消費者
        /// </summary>
        public static void Consumer()
        {
            var conf = new ConsumerConfig
            {
                GroupId = "test-consumer-group",
                BootstrapServers = "localhost:9092",
                AutoOffsetReset = AutoOffsetResetType.Earliest,
                EnableAutoCommit = false  // 設置非自動偏移,業務邏輯完成后手動處理偏移,防止數據丟失
            };
            using (var consumer = new Consumer<Ignore, string>(conf))
            {
                // 訂閱topic
                consumer.Subscribe("MyTopic");
                // 錯誤日志監視 
                consumer.OnError += (_, msg) => { Console.WriteLine($"Consumer_Error信息:Code:{msg.Code};Reason:{msg.Reason};IsError:{msg.IsError}"); };

                while (true)
                {
                    try
                    {
                        var consume = consumer.Consume();
                        string receiveMsg = consume.Value;
                        Console.WriteLine($"Consumed message '{receiveMsg}' at: '{consume.TopicPartitionOffset}'.");
                        // 開始我的業務邏輯

                        
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Consumer_Error occured: {e.Error.Reason}");
                    }
                }
            }
        }
    }
}

控制台調用

namespace kafkaTest
{
    class Program
    {
        static void Main(string[] args)
        {
            KafkaConsumer.Consumer();
        }
    }
}

效果圖

常見數據問題處理

  1. 重復消費最常見的原因:re-balance問題,通常會遇到消費的數據,處理很耗時,導致超過了Kafka的session
    timeout時間(0.10.x版本默認是30秒),那么就會re-balance重平衡,此時有一定幾率offset沒提交,會導致重平衡后重復消費。
    去重問題:消息可以使用唯一id標識
  2. 保證不丟失消息: 生產者(ack= -1 或 all 代表至少成功發送一次) 消費者
    (offset手動提交,業務邏輯成功處理后,提交offset)
  3. 保證不重復消費:落表(主鍵或者唯一索引的方式,避免重復數據)
    業務邏輯處理(選擇唯一主鍵存儲到Redis或者mongdb中,先查詢是否存在,若存在則不處理;若不存在,先插入Redis或Mongdb,再進行業務邏輯處理)

Kafka 可視化調試

借助可視化客戶端工具 kafka tool
具體使用可參考:https://www.cnblogs.com/frankdeng/p/9452982.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM