Kafka基礎教程(三):C#使用Kafka消息隊列


  接上篇Kafka的安裝,我安裝的Kafka集群地址:192.168.209.133:9092,192.168.209.134:9092,192.168.209.135:9092,所以這里直接使用這個集群來演示

  首先創建一個項目,演示采用的是控制台(.net core 3.1),然后使用Nuget安裝 Confluent.Kafka 包:

  

  上面的截圖中有Confluent.Kafka的源碼地址,感興趣的可以去看看:https://github.com/confluentinc/confluent-kafka-dotnet/

  

  消息發布

  先直接上Demo  

    static void Main(string[] args)
    {
        ProducerConfig config = new ProducerConfig();
        config.BootstrapServers = "192.168.209.133:9092,192.168.209.134:9092,192.168.209.135:9092";
        var builder = new ProducerBuilder<string, object>(config);
        builder.SetValueSerializer(new KafkaConverter());//設置序列化方式
        var producer = builder.Build();
        producer.Produce("test", new Message<string, object>() { Key = "Test", Value = "hello world" }); 

     Console.ReadKey();
}

  上述代碼執行后,就可以使用上一節提到的kafkatool工具查看到消息了。

  1、消息發布需要使用生產者對象,它由ProducerBuilder<,>類構造,有兩個泛型參數,第一個是路由Key的類型,第二個是消息的類型,開發過程中,我們多數使用ProducerBuilder<string, object>或者ProducerBuilder<string, string>。

  2、ProducerBuilder<string, object>在實例化時需要一個配置參數,這個配置參數是一個集合(IEnumerable<KeyValuePair<string, string>>),ProducerConfig其實是實現了這個集合接口的一個類型,在舊版本的Confluent.Kafka中,是沒有這個ProducerConfig類型的,之前都是使用Dictionary<string,string>來構建ProducerBuilder<string, object>,比如上面的Demo,其實也可以寫成:  

    static void Main(string[] args)
    {
        Dictionary<string, string> config = new Dictionary<string, string>();
        config["bootstrap.servers"]= "192.168.209.133:9092,192.168.209.134:9092,192.168.209.135:9092";
        var builder = new ProducerBuilder<string, object>(config);
        builder.SetValueSerializer(new KafkaConverter());//設置序列化方式
        var producer = builder.Build();
        producer.Produce("test", new Message<string, object>() { Key = "Test", Value = "hello world" });

        Console.ReadKey();
    }

  這兩種方式是一樣的效果,只是ProducerConfig對象最終也是生成一個KeyValuePair<string, string>集合,ProducerConfig中的屬性都會有一個Key與它對應,比如上面的ProducerConfig的BootstrapServers屬性最終會映射成bootstrap.servers,表示Kafka集群地址,多個地址之間使用逗號分隔。

  其他配置信息可以參考官方配置文檔:https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

  3、Confluent.Kafka還要求提供一個實現了ISerializer<TValue>或者IAsyncSerializer<TValue>接口的序列化類型,比如上面的Demo中的KafkaConverter:  

    public class KafkaConverter : ISerializer<object>
    {
        /// <summary>
        /// 序列化數據成字節
        /// </summary>
        /// <param name="data"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public byte[] Serialize(object data, SerializationContext context)
        {
            var json = JsonConvert.SerializeObject(data);
            return Encoding.UTF8.GetBytes(json);
        }
    }

   這里我采用的是Json格式序列化,需要使用Nuget安裝Newtonsoft.Json。

  4、發布消息使用Produce方法,它有幾個重載,還有幾個異步發布方法。第一個參數是topic,如果想指定Partition,需要使用TopicPartition對象,第二個參數是消息,它是Message<TKey, TValue>類型,Key即路由,Value就是我們的消息,消息會經過ISerializer<TValue>接口序列化之后發送到Kafka,第三個參數是Action<DeliveryReport<TKey, TValue>>類型的委托,它是異步執行的,其實是發布的結果通知。

 

  消息消費

  先直接上Demo

    static void Main(string[] args)
    {
        ConsumerConfig config = new ConsumerConfig();
        config.BootstrapServers = "192.168.209.133:9092,192.168.209.134:9092,192.168.209.135:9092";
        config.GroupId = "group.1";
        config.AutoOffsetReset = AutoOffsetReset.Earliest;
        config.EnableAutoCommit = false;

        var builder = new ConsumerBuilder<string, object>(config);
        builder.SetValueDeserializer(new KafkaConverter());//設置反序列化方式
        var consumer = builder.Build();
        consumer.Subscribe("test");//訂閱消息使用Subscribe方法
        //consumer.Assign(new TopicPartition("test", new Partition(1)));//從指定的Partition訂閱消息使用Assign方法

        while (true)
        {
            var result = consumer.Consume();
            Console.WriteLine($"recieve message:{result.Message.Value}");
            consumer.Commit(result);//手動提交,如果上面的EnableAutoCommit=true表示自動提交,則無需調用Commit方法
        }
    }

  1、和消息發布一樣,消費者的構建是通過ConsumerBuilder<, >對象來完成的,同樣也有一個ConsumerConfig配置對象,它在舊版本中也是不存在的,舊版本中也是使用Dictionary<string,string>來實現的,比如上面的例子等價於:

    static void Main(string[] args)
    {
        Dictionary<string, string> config = new Dictionary<string, string>();
        config["bootstrap.servers"] = "192.168.209.133:9092,192.168.209.134:9092,192.168.209.135:9092";
        config["group.id"] = "group.1";
        config["auto.offset.reset"] = "earliest";
        config["enable.auto.commit"] = "false";

        var builder = new ConsumerBuilder<string, object>(config);
        builder.SetValueDeserializer(new KafkaConverter());//設置反序列化方式
        var consumer = builder.Build();
        consumer.Subscribe("test");//訂閱消息使用Subscribe方法
        //consumer.Assign(new TopicPartition("test", new Partition(1)));//從指定的Partition訂閱消息使用Assign方法

        while (true)
        {
            var result = consumer.Consume();
            Console.WriteLine($"recieve message:{result.Message.Value}");
            consumer.Commit(result);//手動提交,如果上面的EnableAutoCommit=true表示自動提交,則無需調用Commit方法
        }
    }

  實際上,它和ProducerConfig一樣也是一個KeyValuePair<string, string>集合,它的屬性最終都會有一個Key與它對應。其他配置信息可以參考官方配置文檔:https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

  這里順帶提一下這個例子用到的幾個配置:  

   BootstrapServers:Kafka集群地址,多個地址之間使用逗號分隔。
  GroupId:消費者的Group,注意了,Kafka以Group的形式消費消息,一個消息只會被同一Group中的一個消費者消費,另外,一般的,同一Group中的消費者應該實現相同的邏輯
  EnableAutoCommit:是否自動提交,如果設置成true,那么消費者接收到消息就相當於被消費了,我們可以設置成false,然后在我們處理完邏輯之后手動提交。
  AutoOffsetReset:自動重置offset的行為,默認是Latest,這是kafka讀取數據的策略,有三個可選值:Latest,Earliest,Error,個人推薦使用Earliest    

   關於AutoOffsetReset配置,這里再提一點  

  Latest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
  Earliest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
  Error:topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常

  上面幾句話說得有點蒙,舉個例子:

  當有一個消費者連接到Kafka,那這個消費者該從哪個位置開始消費呢?

  首先,我們知道Kafka的消費者以群組Group的形式去消費,Kafka會記錄每個Group在每個Partition中的到哪個位置,也就是offset。  

  當有消費者連接到Kafka要消費消息是,如果這個消費者所在的群組Group之前有消費過並提交過offset(也就是存在offset記錄),那么這個消費者就從這個offset的位置開始消費,這一點Latest,Earliest,Error三個配置的行為是一樣的。

  但是如果連接的消費者所在的群組是一個新的群組時(也就是不存在offset記錄),Latest,Earliest,Error三個配置表現出不一樣的行為:

  Latest:從連接到Kafka那一刻開始消費之后產生的消息,之前發布的消息不在消費,這也是默認的行為

  Earliest:從offset最小值(如果消息全部有效的話,那就是最開頭)處開始消費,也就是說會消費連接到Kafka之前發布的消息

  Error:簡單暴力的拋出異常

  2、生產消息需要序列化,消費消息就需要反序列化了,我們需要提供一個實現了IDeserializer<TValue>接口的類型,比如上面的例子采用Json序列化:  

    public class KafkaConverter : IDeserializer<object>
    {/// <summary>
        /// 反序列化字節數據成實體數據
        /// </summary>
        /// <param name="data"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public object Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
        {
            if (isNull) return null;

            var json = Encoding.UTF8.GetString(data.ToArray());
            try
            {
                return JsonConvert.DeserializeObject(json);
            }
            catch
            {
                return json;
            }
        }
    }

   3、Kafka是發布/訂閱方式的消息隊列,Confluent.Kafka提供了兩個訂閱的方法:Subscribe和Assign

  Subscribe:從一個或者多個topic訂閱消息

  Assign:從一個或者多個topic的指定partition中訂閱消息

  另外,Confluent.Kafka還提供了兩個取消訂閱的方法:Unsubscribe和Unassign

  4、獲取消息使用Consume方法,方法返回一個ConsumeResult<TKey, TValue>對象,我們要的消息就在這個對象中,它還包含offset等等其他信息。

  另外,Consume方法會導致當前線程阻塞,直至有獲取到消息可以消費,或者超時。

  5、如果我們創建消費者時,設置了EnableAutoCommit=false,那么我們就需要手動調用Commit方法提交消息,切記。

   

  完整的Demo例子

  上面有提到,生產消息需要一個實現序列化消息接口的對象,而消費消息需要一個實現了反序列化接口的對象,這兩者建議用同一個類實現,於是一個完整的實現類:  

    public class KafkaConverter : ISerializer<object>, IDeserializer<object>
    {
        /// <summary>
        /// 序列化數據成字節
        /// </summary>
        /// <param name="data"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public byte[] Serialize(object data, SerializationContext context)
        {
            var json = JsonConvert.SerializeObject(data);
            return Encoding.UTF8.GetBytes(json);
        }
        /// <summary>
        /// 反序列化字節數據成實體數據
        /// </summary>
        /// <param name="data"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public object Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
        {
            if (isNull) return null;

            var json = Encoding.UTF8.GetString(data.ToArray());
            try
            {
                return JsonConvert.DeserializeObject(json);
            }
            catch
            {
                return json;
            }
        }
    }

  一個完整的Demo例子如下:  

    static void Main(string[] args)
    {
        var bootstrapServers = "192.168.209.133:9092,192.168.209.134:9092,192.168.209.135:9092";
        var group1 = "group.1";
        var group2 = "group.2";
        var topic = "test";

        new Thread(() =>
        {
            ConsumerConfig config = new ConsumerConfig();
            config.BootstrapServers = bootstrapServers;
            config.GroupId = group1;
            config.AutoOffsetReset = AutoOffsetReset.Earliest;
            config.EnableAutoCommit = false;

            var builder = new ConsumerBuilder<string, object>(config);
            builder.SetValueDeserializer(new KafkaConverter());//設置反序列化方式
            var consumer = builder.Build();
            //consumer.Subscribe(topic);//訂閱消息使用Subscribe方法
            consumer.Assign(new TopicPartition(topic, new Partition(0)));//從指定的Partition訂閱消息使用Assign方法

            while (true)
            {
                var result = consumer.Consume();
                Console.WriteLine($"{group1} recieve message:{result.Message.Value}");
                consumer.Commit(result);//手動提交,如果上面的EnableAutoCommit=true表示自動提交,則無需調用Commit方法
            }

        }).Start();

        new Thread(() =>
        {
            ConsumerConfig config = new ConsumerConfig();
            config.BootstrapServers = bootstrapServers;
            config.GroupId = group2;
            config.AutoOffsetReset = AutoOffsetReset.Earliest;
            config.EnableAutoCommit = false;

            var builder = new ConsumerBuilder<string, object>(config);
            builder.SetValueDeserializer(new KafkaConverter());//設置反序列化方式
            var consumer = builder.Build();
            //consumer.Subscribe(topic);//訂閱消息使用Subscribe方法
            consumer.Assign(new TopicPartition(topic, new Partition(1)));//從指定的Partition訂閱消息使用Assign方法

            while (true)
            {
                var result = consumer.Consume();
                Console.WriteLine($"{group2} recieve message:{result.Message.Value}");
                consumer.Commit(result);//手動提交,如果上面的EnableAutoCommit=true表示自動提交,則無需調用Commit方法
            }

        }).Start();

        int index = 0;
        ProducerConfig config = new ProducerConfig();
        config.BootstrapServers = bootstrapServers;
        var builder = new ProducerBuilder<string, object>(config);
        builder.SetValueSerializer(new KafkaConverter());//設置序列化方式
        var producer = builder.Build();
        while (true)
        {
            Console.Write("請輸入消息:");
            var line = Console.ReadLine();

            int partition = index % 3;
            var topicPartition = new TopicPartition(topic, new Partition(partition));
            producer.Produce(topicPartition, new Message<string, object>() { Key = "Test", Value = line });

            index++;
        }
    }

 

  封裝使用

  這里做一個簡單的封裝,使用幾個常用的配置以方便使用,當然,還是要使用nuget安裝 Confluent.Kafka 和 Newtonsoft.Json兩個包,具體幾個類如下:  

  
    public abstract class KafkaBaseOptions
    {
        /// <summary>
        /// 服務器地址
        /// </summary>
        public string[] BootstrapServers { get; set; }
    }
KafkaBaseOptions
  
    public class KafkaConsumer : IDisposable
    {
        ConsumerBuilder<string, object> builder;
        List<IConsumer<string, object>> consumers;
        bool disposed = false;

        /// <summary>
        /// kafka服務節點
        /// </summary>
        public string BootstrapServers { get; private set; }
        /// <summary>
        /// 群組
        /// </summary>
        public string GroupId { get; private set; }
        /// <summary>
        /// 是否允許自動提交(enable.auto.commit)
        /// </summary>
        public bool EnableAutoCommit { get; set; } = false;

        /// <summary>
        /// 異常事件
        /// </summary>
        public event Action<object, Exception> ErrorHandler;
        /// <summary>
        /// 統計事件
        /// </summary>
        public event Action<object, string> StatisticsHandler;
        /// <summary>
        /// 日志事件
        /// </summary>
        public event Action<object, KafkaLogMessage> LogHandler;

        public KafkaConsumer(string groupId, params string[] bootstrapServers)
        {
            if (bootstrapServers == null || bootstrapServers.Length == 0)
            {
                throw new Exception("at least one server must be assigned");
            }

            this.GroupId = groupId;
            this.BootstrapServers = string.Join(",", bootstrapServers);
            this.consumers = new List<IConsumer<string, object>>();
        }

        #region Private
        /// <summary>
        /// 創建消費者生成器
        /// </summary>
        private void CreateConsumerBuilder()
        {
            if (disposed)
            {
                throw new ObjectDisposedException(nameof(KafkaConsumer));
            }

            if (builder == null)
            {
                lock (this)
                {
                    if (builder == null)
                    {
                        ConsumerConfig config = new ConsumerConfig();
                        config.BootstrapServers = BootstrapServers;
                        config.GroupId = GroupId;
                        config.AutoOffsetReset = AutoOffsetReset.Earliest;
                        config.EnableAutoCommit = EnableAutoCommit;
                        //config.EnableAutoOffsetStore = true;
                        //config.IsolationLevel = IsolationLevel.ReadCommitted;
                        //config.MaxPollIntervalMs = 10000;


                        //List<KeyValuePair<string, string>> config = new List<KeyValuePair<string, string>>();
                        //config.Add(new KeyValuePair<string, string>("bootstrap.servers", BootstrapServers));
                        //config.Add(new KeyValuePair<string, string>("group.id", GroupId));
                        //config.Add(new KeyValuePair<string, string>("auto.offset.reset", "earliest"));
                        //config.Add(new KeyValuePair<string, string>("enable.auto.commit", EnableAutoCommit.ToString().ToLower()));
                        //config.Add(new KeyValuePair<string, string>("max.poll.interval.ms", "10000"));
                        //config.Add(new KeyValuePair<string, string>("session.timeout.ms", "10000"));
                        //config.Add(new KeyValuePair<string, string>("isolation.level", "read_uncommitted"));

                        builder = new ConsumerBuilder<string, object>(config);

                        Action<Delegate, object> tryCatchWrap = (@delegate, arg) =>
                        {
                            try
                            {
                                @delegate?.DynamicInvoke(arg);
                            }
                            catch { }
                        };
                        builder.SetErrorHandler((p, e) => tryCatchWrap(ErrorHandler, new Exception(e.Reason)));
                        builder.SetStatisticsHandler((p, e) => tryCatchWrap(StatisticsHandler, e));
                        builder.SetLogHandler((p, e) => tryCatchWrap(LogHandler, new KafkaLogMessage(e)));
                        builder.SetValueDeserializer(new KafkaConverter());
                    }
                }
            }
        }
        /// <summary>
        /// 內部處理消息
        /// </summary>
        /// <param name="consumer"></param>
        /// <param name="cancellationToken"></param>
        /// <param name="action"></param>
        private void InternalListen(IConsumer<string, object> consumer, CancellationToken cancellationToken, Action<RecieveResult> action)
        {
            try
            {
                var result = consumer.Consume(cancellationToken);
                if (!cancellationToken.IsCancellationRequested)
                {
                    CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
                    if (!EnableAutoCommit && result != null)
                    {
                        cancellationTokenSource.Token.Register(() =>
                        {
                            consumer.Commit(result);
                        });
                    }
                    action?.Invoke(result == null ? null : new RecieveResult(result, cancellationTokenSource));
                }
            }
            catch { }
        }
        /// <summary>
        /// 驗證消費主題和分區
        /// </summary>
        /// <param name="subscribers"></param>
        private void CheckSubscribers(params KafkaSubscriber[] subscribers)
        {
            if (subscribers == null || subscribers.Length == 0)
            {
                throw new InvalidOperationException("subscriber cann't be empty");
            }

            if (subscribers.Any(f => string.IsNullOrEmpty(f.Topic)))
            {
                throw new InvalidOperationException("topic cann't be empty");
            }
        }
        /// <summary>
        /// 設置監聽主題
        /// </summary>
        /// <param name="consumer"></param>
        private void SetSubscribers(IConsumer<string, object> consumer, params KafkaSubscriber[] subscribers)
        {
            var topics = subscribers.Where(f => f.Partition == null).Select(f => f.Topic).ToArray();
            var topicPartitions = subscribers.Where(f => f.Partition != null).Select(f => new TopicPartition(f.Topic, new Partition(f.Partition.Value))).ToArray();

            if (topics.Length > 0)
            {
                consumer.Subscribe(topics);
            }

            if (topicPartitions.Length > 0)
            {
                consumer.Assign(topicPartitions);
            }
        }
        /// <summary>
        /// 創建一個消費者
        /// </summary>
        /// <param name="listenResult"></param>
        /// <param name="subscribers"></param>
        /// <returns></returns>
        private IConsumer<string, object> CreateConsumer(ListenResult listenResult, params KafkaSubscriber[] subscribers)
        {
            if (disposed)
            {
                throw new ObjectDisposedException(nameof(KafkaConsumer));
            }

            CheckSubscribers(subscribers);

            CreateConsumerBuilder();

            var consumer = builder.Build();
            listenResult.Token.Register(() =>
            {
                consumer.Dispose();
            });

            SetSubscribers(consumer, subscribers);

            consumers.Add(consumer);

            return consumer;
        }
        #endregion

        #region Listen
        /// <summary>
        /// 監聽一次並阻塞當前線程,直至有消息獲取或者取消獲取
        /// </summary>
        /// <param name="topics"></param>
        public RecieveResult ListenOnce(params string[] topics)
        {
            return ListenOnce(topics.Select(f => new KafkaSubscriber() { Partition = null, Topic = f }).ToArray());
        }
        /// <summary>
        /// 監聽一次並阻塞當前線程,直至有消息獲取或者取消獲取
        /// </summary>
        /// <param name="subscribers"></param>
        public RecieveResult ListenOnce(params KafkaSubscriber[] subscribers)
        {
            ListenResult listenResult = new ListenResult();
            var consumer = CreateConsumer(listenResult, subscribers);

            RecieveResult result = null;
            InternalListen(consumer, listenResult.Token, rr =>
            {
                result = rr;
            });
            return result;
        }
        /// <summary>
        /// 異步監聽一次
        /// </summary>
        /// <param name="topics"></param>
        /// <returns></returns>
        public async Task<RecieveResult> ListenOnceAsync(params string[] topics)
        {
            return await ListenOnceAsync(topics.Select(f => new KafkaSubscriber() { Partition = null, Topic = f }).ToArray());
        }
        /// <summary>
        /// 異步監聽一次
        /// </summary>
        /// <param name="subscribers"></param>
        /// <returns></returns>
        public async Task<RecieveResult> ListenOnceAsync(params KafkaSubscriber[] subscribers)
        {
            return await Task.Run(() =>
            {
                return ListenOnce(subscribers);
            });
        }
        /// <summary>
        /// 監聽
        /// </summary>
        /// <param name="topics"></param>
        /// <param name="action"></param>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        public void Listen(string[] topics, Action<RecieveResult> action = null, CancellationToken cancellationToken = default)
        {
            Listen(topics.Select(f => new KafkaSubscriber() { Partition = null, Topic = f }).ToArray(), action, cancellationToken);
        }
        /// <summary>
        /// 監聽
        /// </summary>
        /// <param name="subscribers"></param>
        /// <param name="action"></param>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        public void Listen(KafkaSubscriber[] subscribers, Action<RecieveResult> action = null, CancellationToken cancellationToken = default)
        {
            ListenResult result = new ListenResult();
            var consumer = CreateConsumer(result, subscribers);
            cancellationToken.Register(() =>
            {
                result.Stop();
            });
            while (!result.Stoped)
            {
                InternalListen(consumer, result.Token, action);
            }
        }
        /// <summary>
        /// 異步監聽
        /// </summary>
        /// <param name="topics"></param>
        /// <param name="action"></param>
        /// <returns></returns>
        public async Task<ListenResult> ListenAsync(string[] topics, Action<RecieveResult> action = null)
        {
            return await ListenAsync(topics.Select(f => new KafkaSubscriber() { Partition = null, Topic = f }).ToArray(), action);
        }
        /// <summary>
        /// 異步監聽
        /// </summary>
        /// <param name="subscribers"></param>
        /// <param name="action"></param>
        /// <returns></returns>
        public async Task<ListenResult> ListenAsync(KafkaSubscriber[] subscribers, Action<RecieveResult> action = null)
        {
            ListenResult result = new ListenResult();
            new Task(() =>
            {
                var consumer = CreateConsumer(result, subscribers);
                while (!result.Stoped)
                {
                    InternalListen(consumer, result.Token, action);
                }
            }).Start();
            return await Task.FromResult(result);
        }
        #endregion
        /// <summary>
        /// 釋放資源
        /// </summary>
        public void Dispose()
        {
            disposed = true;
            builder = null;

            foreach (var consumer in consumers)
            {
                consumer?.Close();
                consumer?.Dispose();
            }

            GC.Collect();
        }

        public static KafkaConsumer Create(string groupId, KafkaBaseOptions kafkaBaseOptions)
        {
            return new KafkaConsumer(groupId, kafkaBaseOptions.BootstrapServers);
        }
        public override string ToString()
        {
            return BootstrapServers;
        }
    }

    public class RecieveResult
    {
        CancellationTokenSource cancellationTokenSource;

        internal RecieveResult(ConsumeResult<string, object> consumeResult, CancellationTokenSource cancellationTokenSource)
        {
            this.Topic = consumeResult.Topic;
            this.Message = consumeResult.Message.Value?.ToString();
            this.Offset = consumeResult.Offset.Value;
            this.Partition = consumeResult.Partition.Value;
            this.Key = consumeResult.Message.Key;

            this.cancellationTokenSource = cancellationTokenSource;
        }

        /// <summary>
        /// Kafka消息所屬的主題
        /// </summary>
        public string Topic { get; private set; }
        /// <summary>
        /// 鍵值
        /// </summary>
        public string Key { get; private set; }
        /// <summary>
        /// 我們需要處理的消息具體的內容
        /// </summary>
        public string Message { get; private set; }
        /// <summary>
        /// Kafka數據讀取的當前位置
        /// </summary>
        public long Offset { get; private set; }
        /// <summary>
        /// 消息所在的物理分區
        /// </summary>
        public int Partition { get; private set; }

        /// <summary>
        /// 提交
        /// </summary>
        public void Commit()
        {
            if (cancellationTokenSource == null || cancellationTokenSource.IsCancellationRequested) return;

            cancellationTokenSource.Cancel();
            cancellationTokenSource.Dispose();
            cancellationTokenSource = null;
        }
    }
    public class ListenResult : IDisposable
    {
        CancellationTokenSource cancellationTokenSource;

        /// <summary>
        /// CancellationToken
        /// </summary>
        public CancellationToken Token { get { return cancellationTokenSource.Token; } }
        /// <summary>
        /// 是否已停止
        /// </summary>
        public bool Stoped { get { return cancellationTokenSource.IsCancellationRequested; } }

        public ListenResult()
        {
            cancellationTokenSource = new CancellationTokenSource();
        }

        /// <summary>
        /// 停止監聽
        /// </summary>
        public void Stop()
        {
            cancellationTokenSource.Cancel();
        }

        public void Dispose()
        {
            Stop();
        }
    }
KafkaConsumer
  
    public class KafkaConverter : ISerializer<object>, IDeserializer<object>
    {
        /// <summary>
        /// 序列化數據成字節
        /// </summary>
        /// <param name="data"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public byte[] Serialize(object data, SerializationContext context)
        {
            var json = JsonConvert.SerializeObject(data);
            return Encoding.UTF8.GetBytes(json);
        }
        /// <summary>
        /// 反序列化字節數據成實體數據
        /// </summary>
        /// <param name="data"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public object Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
        {
            if (isNull) return null;

            var json = Encoding.UTF8.GetString(data.ToArray());
            try
            {
                return JsonConvert.DeserializeObject(json);
            }
            catch
            {
                return json;
            }
        }
    }
KafkaConverter
  
    public class KafkaMessage
    {
        /// <summary>
        /// 主題
        /// </summary>
        public string Topic { get; set; }
        /// <summary>
        /// 分區,不指定分區即交給kafka指定分區
        /// </summary>
        public int? Partition { get; set; }
        /// <summary>
        /// 鍵值
        /// </summary>
        public string Key { get; set; }
        /// <summary>
        /// 消息
        /// </summary>
        public object Message { get; set; }
    }
KafkaMessage
  
    public class KafkaProducer : IDisposable
    {
        /// <summary>
        /// 負責生成producer
        /// </summary>
        ProducerBuilder<string, object> builder;
        ConcurrentQueue<IProducer<string, object>> producers;
        bool disposed = false;

        /// <summary>
        /// kafka服務節點
        /// </summary>
        public string BootstrapServers { get; private set; }
        /// <summary>
        /// Flush超時時間(ms)
        /// </summary>
        public int FlushTimeOut { get; set; } = 10000;
        /// <summary>
        /// 保留發布者數
        /// </summary>
        public int InitializeCount { get; set; } = 5;
        /// <summary>
        /// 默認的消息鍵值
        /// </summary>
        public string DefaultKey { get; set; }
        /// <summary>
        /// 默認的主題
        /// </summary>
        public string DefaultTopic { get; set; }
        /// <summary>
        /// 異常事件
        /// </summary>
        public event Action<object, Exception> ErrorHandler;
        /// <summary>
        /// 統計事件
        /// </summary>
        public event Action<object, string> StatisticsHandler;
        /// <summary>
        /// 日志事件
        /// </summary>
        public event Action<object, KafkaLogMessage> LogHandler;

        public KafkaProducer(params string[] bootstrapServers)
        {
            if (bootstrapServers == null || bootstrapServers.Length == 0)
            {
                throw new Exception("at least one server must be assigned");
            }

            this.BootstrapServers = string.Join(",", bootstrapServers);
            producers = new ConcurrentQueue<IProducer<string, object>>();
        }

        #region Private
        /// <summary>
        /// producer構造器
        /// </summary>
        /// <returns></returns>
        private ProducerBuilder<string, object> CreateProducerBuilder()
        {
            if (builder == null)
            {
                lock (this)
                {
                    if (builder == null)
                    {
                        ProducerConfig config = new ProducerConfig();
                        config.BootstrapServers = BootstrapServers;

                        //var config = new KeyValuePair<string, string>[] { new KeyValuePair<string, string>("bootstrap.servers", BootstrapServers) };

                        builder = new ProducerBuilder<string, object>(config);
                        Action<Delegate, object> tryCatchWrap = (@delegate, arg) =>
                        {
                            try
                            {
                                @delegate?.DynamicInvoke(arg);
                            }
                            catch { }
                        };
                        builder.SetErrorHandler((p, e) => tryCatchWrap(ErrorHandler, new Exception(e.Reason)));
                        builder.SetStatisticsHandler((p, e) => tryCatchWrap(StatisticsHandler, e));
                        builder.SetLogHandler((p, e) => tryCatchWrap(LogHandler, new KafkaLogMessage(e)));
                        builder.SetValueSerializer(new KafkaConverter());
                    }
                }
            }

            return builder;
        }
        /// <summary>
        /// 租賃一個發布者
        /// </summary>
        /// <returns></returns>
        private IProducer<string, object> RentProducer()
        {
            if (disposed)
            {
                throw new ObjectDisposedException(nameof(KafkaProducer));
            }

            IProducer<string, object> producer;
            lock (producers)
            {
                if (!producers.TryDequeue(out producer) || producer == null)
                {
                    CreateProducerBuilder();
                    producer = builder.Build();
                }
            }
            return producer;
        }
        /// <summary>
        /// 返回保存發布者
        /// </summary>
        /// <param name="producer"></param>
        private void ReturnProducer(IProducer<string, object> producer)
        {
            if (disposed) return;

            lock (producers)
            {
                if (producers.Count < InitializeCount && producer != null)
                {
                    producers.Enqueue(producer);
                }
                else
                {
                    producer?.Dispose();
                }
            }
        }
        #endregion

        #region Publish
        /// <summary>
        /// 發送消息
        /// </summary>
        /// <param name="key"></param>
        /// <param name="message"></param>
        /// <param name="callback"></param>
        public void PublishWithKey(string key, object message, Action<DeliveryResult> callback = null)
        {
            Publish(DefaultTopic, null, key, message, callback);
        }
        /// <summary>
        /// 發送消息
        /// </summary>
        /// <param name="partition"></param>
        /// <param name="key"></param>
        /// <param name="message"></param>
        /// <param name="callback"></param>
        public void PublishWithKey(int? partition, string key, object message, Action<DeliveryResult> callback = null)
        {
            Publish(DefaultTopic, partition, key, message, callback);
        }
        /// <summary>
        /// 發送消息
        /// </summary>
        /// <param name="topic"></param>
        /// <param name="key"></param>
        /// <param name="message"></param>
        /// <param name="callback"></param>
        public void PublishWithKey(string topic, string key, object message, Action<DeliveryResult> callback = null)
        {
            Publish(topic, null, key, message, callback);
        }
        /// <summary>
        /// 發送消息
        /// </summary>
        /// <param name="message"></param>
        /// <param name="callback"></param>
        public void Publish(object message, Action<DeliveryResult> callback = null)
        {
            Publish(DefaultTopic, null, DefaultKey, message, callback);
        }
        /// <summary>
        /// 發送消息
        /// </summary>
        /// <param name="partition"></param>
        /// <param name="message"></param>
        /// <param name="callback"></param>
        public void Publish(int? partition, object message, Action<DeliveryResult> callback = null)
        {
            Publish(DefaultTopic, partition, DefaultKey, message, callback);
        }
        /// <summary>
        /// 發送消息
        /// </summary>
        /// <param name="topic"></param>
        /// <param name="message"></param>
        /// <param name="callback"></param>
        public void Publish(string topic, object message, Action<DeliveryResult> callback = null)
        {
            Publish(topic, null, DefaultKey, message, callback);
        }
        /// <summary>
        /// 發送消息
        /// </summary>
        /// <param name="topic"></param>
        /// <param name="partition"></param>
        /// <param name="message"></param>
        /// <param name="callback"></param>
        public void Publish(string topic, int? partition, object message, Action<DeliveryResult> callback = null)
        {
            Publish(topic, partition, DefaultKey, message, callback);
        }
        /// <summary>
        /// 發送消息
        /// </summary>
        /// <param name="topic"></param>
        /// <param name="partition"></param>
        /// <param name="key"></param>
        /// <param name="message"></param>
        /// <param name="callback"></param>
        public void Publish(string topic, int? partition, string key, object message, Action<DeliveryResult> callback = null)
        {
            Publish(new KafkaMessage() { Key = key, Message = message, Partition = partition, Topic = topic }, callback);
        }
        /// <summary>
        /// 發送消息
        /// </summary>
        /// <param name="kafkaMessage"></param>
        /// <param name="callback"></param>
        public void Publish(KafkaMessage kafkaMessage, Action<DeliveryResult> callback = null)
        {
            if (string.IsNullOrEmpty(kafkaMessage.Topic))
            {
                throw new ArgumentException("topic can not be empty", nameof(kafkaMessage.Topic));
            }
            if (string.IsNullOrEmpty(kafkaMessage.Key))
            {
                throw new ArgumentException("key can not be empty", nameof(kafkaMessage.Key));
            }

            var producer = RentProducer();
            if (kafkaMessage.Partition == null)
            {
                producer.Produce(kafkaMessage.Topic, new Message<string, object>() { Key = kafkaMessage.Key, Value = kafkaMessage.Message }, dr => callback?.Invoke(new DeliveryResult(dr)));
            }
            else
            {
                var topicPartition = new TopicPartition(kafkaMessage.Topic, new Partition(kafkaMessage.Partition.Value));
                producer.Produce(topicPartition, new Message<string, object>() { Key = kafkaMessage.Key, Value = kafkaMessage.Message }, dr => callback?.Invoke(new DeliveryResult(dr)));
            }

            producer.Flush(TimeSpan.FromMilliseconds(FlushTimeOut));

            ReturnProducer(producer);
        }
        #endregion

        #region PublishAsync
        /// <summary>
        /// 異步發送消息
        /// </summary>
        /// <param name="key"></param>
        /// <param name="message"></param>
        public async Task<DeliveryResult> PublishWithKeyAsync(string key, object message)
        {
            return await PublishAsync(DefaultTopic, null, key, message);
        }
        /// <summary>
        /// 異步發送消息
        /// </summary>
        /// <param name="partition"></param>
        /// <param name="key"></param>
        /// <param name="message"></param>
        public async Task<DeliveryResult> PublishWithKeyAsync(int? partition, string key, object message)
        {
            return await PublishAsync(DefaultTopic, partition, key, message);
        }
        /// <summary>
        /// 異步發送消息
        /// </summary>
        /// <param name="topic"></param>
        /// <param name="key"></param>
        /// <param name="message"></param>
        public async Task<DeliveryResult> PublishWithKeyAsync(string topic, string key, object message)
        {
            return await PublishAsync(topic, null, key, message);
        }
        /// <summary>
        /// 異步發送消息
        /// </summary>
        /// <param name="message"></param>
        public async Task<DeliveryResult> PublishAsync(object message)
        {
            return await PublishAsync(DefaultTopic, null, DefaultKey, message);
        }
        /// <summary>
        /// 異步發送消息
        /// </summary>
        /// <param name="partition"></param>
        /// <param name="message"></param>
        public async Task<DeliveryResult> PublishAsync(int? partition, object message)
        {
            return await PublishAsync(DefaultTopic, partition, DefaultKey, message);
        }
        /// <summary>
        /// 異步發送消息
        /// </summary>
        /// <param name="topic"></param>
        /// <param name="message"></param>
        public async Task<DeliveryResult> PublishAsync(string topic, object message)
        {
            return await PublishAsync(topic, null, DefaultKey, message);
        }
        /// <summary>
        /// 異步發送消息
        /// </summary>
        /// <param name="topic"></param>
        /// <param name="partition"></param>
        /// <param name="message"></param>
        public async Task<DeliveryResult> PublishAsync(string topic, int? partition, object message)
        {
            return await PublishAsync(topic, partition, DefaultKey, message);
        }
        /// <summary>
        /// 異步發送消息
        /// </summary>
        /// <param name="topic"></param>
        /// <param name="partition"></param>
        /// <param name="key"></param>
        /// <param name="message"></param>
        /// <returns></returns>
        public async Task<DeliveryResult> PublishAsync(string topic, int? partition, string key, object message)
        {
            return await PublishAsync(new KafkaMessage() { Key = key, Message = message, Partition = partition, Topic = topic });
        }
        /// <summary>
        /// 異步發送消息
        /// </summary>
        /// <param name="kafkaMessage"></param>
        /// <returns></returns>
        public async Task<DeliveryResult> PublishAsync(KafkaMessage kafkaMessage)
        {
            if (string.IsNullOrEmpty(kafkaMessage.Topic))
            {
                throw new ArgumentException("topic can not be empty", nameof(kafkaMessage.Topic));
            }
            if (string.IsNullOrEmpty(kafkaMessage.Key))
            {
                throw new ArgumentException("key can not be empty", nameof(kafkaMessage.Key));
            }

            var producer = RentProducer();
            DeliveryResult<string, object> deliveryResult;
            if (kafkaMessage.Partition == null)
            {
                deliveryResult = await producer.ProduceAsync(kafkaMessage.Topic, new Message<string, object>() { Key = kafkaMessage.Key, Value = kafkaMessage.Message });
            }
            else
            {
                var topicPartition = new TopicPartition(kafkaMessage.Topic, new Partition(kafkaMessage.Partition.Value));
                deliveryResult = await producer.ProduceAsync(topicPartition, new Message<string, object>() { Key = kafkaMessage.Key, Value = kafkaMessage.Message });
            }

            producer.Flush(new TimeSpan(0, 0, 0, 0, FlushTimeOut));

            ReturnProducer(producer);

            return new DeliveryResult(deliveryResult);
        }

        #endregion

        /// <summary>
        /// 釋放資源
        /// </summary>
        public void Dispose()
        {
            disposed = true;
            while (producers.Count > 0)
            {
                IProducer<string, object> producer;
                producers.TryDequeue(out producer);
                producer?.Dispose();
            }
            GC.Collect();
        }

        public static KafkaProducer Create(KafkaBaseOptions kafkaBaseOptions)
        {
            return new KafkaProducer(kafkaBaseOptions.BootstrapServers);
        }
        public override string ToString()
        {
            return BootstrapServers;
        }
    }

    public class DeliveryResult
    {
        internal DeliveryResult(DeliveryResult<string, object> deliveryResult)
        {
            this.Topic = deliveryResult.Topic;
            this.Partition = deliveryResult.Partition.Value;
            this.Offset = deliveryResult.Offset.Value;
            switch (deliveryResult.Status)
            {
                case PersistenceStatus.NotPersisted: this.Status = DeliveryResultStatus.NotPersisted; break;
                case PersistenceStatus.Persisted: this.Status = DeliveryResultStatus.Persisted; break;
                case PersistenceStatus.PossiblyPersisted: this.Status = DeliveryResultStatus.PossiblyPersisted; break;
            }
            this.Key = deliveryResult.Key;
            this.Message = deliveryResult.Value;

            if (deliveryResult is DeliveryReport<string, object>)
            {
                var dr = deliveryResult as DeliveryReport<string, object>;
                this.IsError = dr.Error.IsError;
                this.Reason = dr.Error.Reason;
            }
        }

        /// <summary>
        /// 是否異常
        /// </summary>
        public bool IsError { get; private set; }
        /// <summary>
        /// 異常原因
        /// </summary>
        public string Reason { get; private set; }
        /// <summary>
        /// 主題
        /// </summary>
        public string Topic { get; private set; }
        /// <summary>
        /// 分區
        /// </summary>
        public int Partition { get; private set; }
        /// <summary>
        /// 偏移
        /// </summary>
        public long Offset { get; private set; }
        /// <summary>
        /// 狀態
        /// </summary>
        public DeliveryResultStatus Status { get; private set; }
        /// <summary>
        /// 消息鍵值
        /// </summary>
        public string Key { get; private set; }
        /// <summary>
        /// 消息
        /// </summary>
        public object Message { get; private set; }
    }
    public enum DeliveryResultStatus
    {
        /// <summary>
        /// 消息提交失敗
        /// </summary>
        NotPersisted = 0,
        /// <summary>
        /// 消息已提交,是否成功未知
        /// </summary>
        PossiblyPersisted = 1,
        /// <summary>
        /// 消息提交成功
        /// </summary>
        Persisted = 2
    }
    public class KafkaLogMessage
    {
        internal KafkaLogMessage(LogMessage logMessage)
        {
            this.Name = logMessage.Name;
            this.Facility = logMessage.Facility;
            this.Message = logMessage.Message;

            switch (logMessage.Level)
            {
                case SyslogLevel.Emergency: this.Level = LogLevel.Emergency; break;
                case SyslogLevel.Alert: this.Level = LogLevel.Alert; break;
                case SyslogLevel.Critical: this.Level = LogLevel.Critical; break;
                case SyslogLevel.Error: this.Level = LogLevel.Error; break;
                case SyslogLevel.Warning: this.Level = LogLevel.Warning; break;
                case SyslogLevel.Notice: this.Level = LogLevel.Notice; break;
                case SyslogLevel.Info: this.Level = LogLevel.Info; break;
                case SyslogLevel.Debug: this.Level = LogLevel.Debug; break;
            }
        }
        /// <summary>
        /// 名稱
        /// </summary>
        public string Name { get; private set; }
        /// <summary>
        /// 級別
        /// </summary>
        public LogLevel Level { get; private set; }
        /// <summary>
        /// 裝置
        /// </summary>
        public string Facility { get; private set; }
        /// <summary>
        /// 信息
        /// </summary>
        public string Message { get; private set; }

        public enum LogLevel
        {
            Emergency = 0,
            Alert = 1,
            Critical = 2,
            Error = 3,
            Warning = 4,
            Notice = 5,
            Info = 6,
            Debug = 7
        }
    }
KafkaProducer
  
    public class KafkaSubscriber
    {
        /// <summary>
        /// 主題
        /// </summary>
        public string Topic { get; set; }
        /// <summary>
        /// 分區
        /// </summary>
        public int? Partition { get; set; }
    }
KafkaSubscriber

  使用方法,比如上面的Demo例子:  

    class Program
    {
        static void Main(string[] args)
        {
            var bootstrapServers = new string[] { "192.168.209.133:9092", "192.168.209.134:9092", "192.168.209.135:9092" };
            var group1 = "group.1";
            var group2 = "group.2";
            var topic = "test";

            {
                KafkaConsumer consumer = new KafkaConsumer(group1, bootstrapServers);
                consumer.EnableAutoCommit = false;
                consumer.ListenAsync(new KafkaSubscriber[] { new KafkaSubscriber() { Topic = topic, Partition = 0 } }, result =>
                {
                    Console.WriteLine($"{group1} recieve message:{result.Message}");
                    result.Commit();//手動提交,如果上面的EnableAutoCommit=true表示自動提交,則無需調用Commit方法
                }).Wait();
            }

            {
                KafkaConsumer consumer = new KafkaConsumer(group2, bootstrapServers);
                consumer.EnableAutoCommit = false;
                consumer.ListenAsync(new KafkaSubscriber[] { new KafkaSubscriber() { Topic = topic, Partition = 1 } }, result =>
                {
                    Console.WriteLine($"{group2} recieve message:{result.Message}");
                    result.Commit();//手動提交,如果上面的EnableAutoCommit=true表示自動提交,則無需調用Commit方法
                }).Wait();
            }

            KafkaProducer producer = new KafkaProducer(bootstrapServers);

            int index = 0;
            while (true)
            {
                Console.Write("請輸入消息:");
                var line = Console.ReadLine();

                int partition = index % 3;
                producer.Publish(topic, partition, "Test", line);
                index++;
            }
        }
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM