.net6 部署、操作kafka


.net6 操作kafka

使用docker部署kafka(單服務)

  本人使用的docker部署kafka 具體的部署流程可以參考如下地址:
  https://blog.csdn.net/steve_frank/article/details/109782212
  https://www.jianshu.com/p/e642793cd5de

 

詳解Kafka消息隊列的兩種模式

  Kafka是一種高吞吐量的分布式發布訂閱消息系統,有如下特性

    • 通過O的磁盤數據結構提供消息的持久化,這種結構對於即使數以TB的消息存儲也能夠保持長時間的穩定性能。
    • 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒數百萬 [2] 的消息。
    • 支持通過Kafka服務器和消費機集群來分區消息。
    • 支持Hadoop並行數據加載。
      Kafka通過官網發布了最新版本2.3.0

  1. 點對點模式(一對一)

  1)模式特點:
      消息生產者生產消息發送到Queue中,然后消息消費者從Queue中取出並且消費消息。消息被消費以后,queue中不再存儲該條消息,所以消息消費者不可能消費到已經被消費的消息。Queue支持存在多個消費者,但是對一個消息而言,只會有一個消費者消費。

  2)數據拉取方式:消費者主動拉取。

  3)模式缺點:消息不能被重復消費。

  點對點模式下包括三個角色:

  消息隊列

  發送者 (生產者)

  接收者(消費者)

 

 

 

  2. 發布/訂閱模式(一對多)
  1)模式特點:
      消息生產者(發布)將消息發布到topic中,同時有多個消息消費者(訂閱)消費該消息。和點對點方式不同,發布到topic的消息會被所有訂閱者消費。

  2)數據拉取方式:消費者主動拉取、消費者被動接受(類似微信公眾號)

  3)模式缺點:當數據拉取方式為消費者被動接受時,消費者的消費速度可能跟不上生產者的生產速度。

  發布/訂閱模式下包括三個角色:

  角色主題(Topic)

  發布者(Publisher)

  訂閱者(Subscriber)

 

 

   kafka更多詳細資料:https://lotabout.me/2018/kafka-introduction/

  

代碼實例(點對點模式)  

   public interface IKafkaService
    {
        /// <summary>
        /// 發生消息至指定主題
        /// </summary>
        /// <typeparam name="TMessage"></typeparam>
        /// <param name="topicName"></param>
        /// <param name="message"></param>
        /// <returns></returns>
        Task PublishAsync<TMessage>(string host,string topicName, TMessage message) where TMessage : class;


        /// <summary>
        /// 從指定主題訂閱消息
        /// </summary>
        /// <typeparam name="TMessage"></typeparam>
        /// <param name="topics"></param>
        /// <param name="messageFunc"></param>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        Task SubscribeAsync<TMessage>(IEnumerable<string> topics,Action<TMessage> messageFunc,CancellationToken cancellationToken) where TMessage : class;
    }

//封裝

public class KafkaService : IKafkaService
    {
        public readonly string host = "120.79.77.91:9092";
        public async Task PublishAsync<TMessage>(string host, string topicName, TMessage message) where TMessage : class
        {
            var config = new ProducerConfig
            {
                BootstrapServers = host
            };
            using var producer = new ProducerBuilder<string, string>(config).Build();
            var data = new Message<string, string> { Key = Guid.NewGuid().ToString(), Value = Newtonsoft.Json.JsonConvert.SerializeObject(message) };
            await producer.ProduceAsync(topicName, data);
        }

        public async Task SubscribeAsync<TMessage>(IEnumerable<string> topics, Action<TMessage> messageFunc, CancellationToken cancellationToken) where TMessage : class
        {
            var config = new ConsumerConfig
            {
                BootstrapServers = host,
                GroupId = "consumer",
                EnableAutoCommit = false,
                StatisticsIntervalMs = 5000,
                SessionTimeoutMs = 6000,
                AutoOffsetReset = AutoOffsetReset.Earliest,
                EnablePartitionEof = true
            };
            //const int commitPeriod = 5;
            using var consumer = new ConsumerBuilder<Ignore, string>(config)
             .SetErrorHandler((_, e) =>
             {
                 Console.WriteLine($"Error: {e.Reason}");
             })
             .SetStatisticsHandler((_, json) =>
             {
                 Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息監聽中..");
             })
             .SetPartitionsAssignedHandler((c, partitions) =>
             {
                 string partitionsStr = string.Join(", ", partitions);
                 Console.WriteLine($" - 分配的 kafka 分區: {partitionsStr}");
             })
             .SetPartitionsRevokedHandler((c, partitions) =>
             {
                 string partitionsStr = string.Join(", ", partitions);
                 Console.WriteLine($" - 回收了 kafka 的分區: {partitionsStr}");
             })
             .Build();
            consumer.Subscribe(topics);
            try
            {
                while (true)
                {
                    try
                    {
                        var consumeResult = consumer.Consume(cancellationToken);
                        Console.WriteLine($"Consumed message '{consumeResult.Message?.Value}' at: '{consumeResult?.TopicPartitionOffset}'.");
                        if (consumeResult.IsPartitionEOF)
                        {
                            Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已經到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
                            continue;
                        }
                        TMessage messageResult = null;
                        try
                        {
                            messageResult = JsonConvert.DeserializeObject<TMessage>(consumeResult.Message.Value);
                        }
                        catch (Exception ex)
                        {
                            var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失敗,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}";
                            Console.WriteLine(errorMessage);
                            messageResult = null;
                        }
                        if (messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/)
                        {
                            messageFunc(messageResult);
                            try
                            {
                                consumer.Commit(consumeResult);
                            }
                            catch (KafkaException e)
                            {
                                Console.WriteLine(e.Message);
                            }
                        }
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Consume error: {e.Error.Reason}");
                    }
                }
            }
            catch (OperationCanceledException)
            {
                Console.WriteLine("Closing consumer.");
                consumer.Close();
            }
            await Task.CompletedTask;
        }
    }

//生產者

string[] configList = { "xxx.xx.xx.xx:9092", "sun" };

//調用接口定義的方法
KafkaService kafkaService = new KafkaService();
while (true)
{
    var data = Console.ReadLine();
    await kafkaService.PublishAsync<string>(configList.First(), configList.Last(), data);
}

//消費者

KafkaService kafkaService = new KafkaService();
await kafkaService.SubscribeAsync<string>(new string[] { "sun" }, like, new CancellationToken());

static void like(string like)
{
    Console.WriteLine($"這個是接受到的結果:{like}");
}

代碼實例(發布/訂閱模式) 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM