在.NET中使用Apache Kafka(一)


​曾經在你的應用程序中使用過異步處理嗎?在處理不需要立即執行的任務時,異步代碼似乎是不可避免的。Apache Kafka是最常用和最健壯的開源事件流平台之一。許多公司和開發者利用它的強大功能來創建高性能的異步操作,用於微服務的數據集成,以及用於應用程序健康指標的監控工具。這篇文章解釋了在.NET應用程序中使用Kafka的細節,還展示了如何在Windows操作系統上安裝及使用。

它是如何工作的

當今世界,數據正在以指數形式增長。為了容納不斷增長的數據,Kafka這樣的工具應運而生,提供了健壯而令人印象深刻的架構。

但是Kafka是如何在幕后工作的呢?

Kafka在生產者和消費者之間交換信息。生產者和消費者是這一線性過程的兩個主要角色。

Kafka也可以在一個或多個服務器的集群中工作。這些服務器被稱為Kafka代理,通過代理你可以受益於多種特性,例如數據復制、容錯和高可用。

這些代理由另一個叫做Zookeeper的工具管理。總之,它是一種旨在保持分布式系統中同步和組織配置數據的服務。

Kafka Topics

Kafka只是一個代理,所有的行為都發生在這。生產者向世界發送消息,而消費者讀取特定的數據塊。如何區分數據的一個特定部分與其他部分?消費者如何知道要使用哪些數據?要理解這一點,你需要一個新的內容:topic。

Kafka topics是傳遞消息的載體。由生產者產生的Kafka記錄被組織並存儲到topic中。

假設你正在處理一個用於記載植物目錄的API項目。你要確保公司中的每個人都能夠訪問每一個新注冊的植物。所以你選了Kafka。

在系統中注冊的每一個新植物都將通過Kafka進行廣播。topic的名稱是tree_catalog。

在這種情況下,topic像堆棧一樣工作。它將信息保存在到達時的相同位置,並保證數據不會丟失。

到達的每個數據記錄被存儲在一個slot中,並用一個稱為offset的唯一位置號注冊。

例如,當一個消費者消費了存儲在offset是0的消息時,它提交消息,聲明一切正常,然后移動到下一個offset,依此類推。這個過程通常是線性的。然而,由於許多消費者可以同時將記錄“插入”到同一個topic中,所以確定哪些數據位置已經被占用的責任留給了消費者。這意味着消費者可以決定使用消息的順序,甚至決定是否從頭開始重新開始處理(offset為0)。

分區

分布式系統的一個關鍵特性是數據復制。它允許一個更安全的體系結構,數據可以被復制到其他地方,以防不好的事情發生。Kafka通過分區處理復制。Kafka topics被配置為分散在幾個分區(可配置的)。每個分區通過唯一的offset保存數據記錄。

為了實現冗余,Kafka在分區(一個或多個)創建副本,並在集群中傳播數據。

這個過程遵循leader-follower模型,其中一個leader副本總是處理給定分區的請求,而follower復制該分區。每次制作人將消息推送到某個主題時,它都會直接傳遞給該主題的領導者。

消費組

在Kafka中,消費來自topic的消息最合適的方式是通過消費組。

顧名思義,這些組由一個或多個消費者組成,目的是獲取來自特定主題的所有消息。

為此,組必須始終具有唯一的id(由屬性group.id設置)。無論何時消費者想要加入那個組,它都將通過組id來完成。

每次你添加或刪除一個組的消費者,Kafka會重新平衡它們之間的負載,這樣就不會過載。

設置

現在,你已經了解了Kafka的通用工作原理,是時候開始環境設置了。為了簡化,這個例子將使用Docker來保存Kafka和Zookeeper映像,而不是將它們安裝到你的機器上。這樣可以節省一些空間和復雜性。

對於Windows用戶,Docker提供了一種安裝和管理Docker容器的簡單方式:Docker桌面。進入它的下載頁面並下載安裝程序。運行它,並在不更改默認設置選項的情況下繼續到最后。

確保在此過程完成后重新啟動計算機。重啟后,Docker可能會要求你安裝其他依賴項,所以請確保接受每一個依賴項。在Docker上安裝一個有效的Kafka本地環境最快的路徑之一是通過Docker Compose。通過這種方式,可以通過一個YAML文件建立應用程序服務,並快速地讓它們運行。

創建一個名為docker-compose的新文件,並將以下的內容保存到其中:

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_CREATE_TOPICS: "simpletalk_topic:1:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

注意,代碼從Docker Hub的wurstmeister帳戶中導入了兩個服務鏡像(kafka和zookeeper)。這是在Docker上使用Kafka最穩定的鏡像之一。端口也使用它們的推薦值進行設置,因此請注意不要更改它們。

其中最重要的設置之一屬於KAFKA_CREATE_TOPICS。在這里,你必須定義要創建的topic名稱。還有其他方法可以創建主題,以后你將看到。

通過命令行導航到docker-compose.yml所在的文件夾。然后執行如下命令啟動鏡像:

docker-compose up

這段代碼將加載所有依賴項並啟動鏡像。在此過程中,可能會看到大量的日志。

如果沒有錯誤日志顯示,說明啟動成功。

為了檢查Docker鏡像是否啟動,在另一個cmd窗口中運行以下命令:

docker ps

顯示如下:

親自動手

你的Kafka環境已經可以使用了。下一步是在Visual Studio中進行項目創建。進入項目創建窗口。搜索ASP.NET Core Web Application模板,單擊Next。

解決方案新建一個名稱消費者項目和生產者項目將在同一個解決方案中共存。

下一個窗口選擇API模板。取消勾選“配置為HTTPS”選項。

創建項目后,右鍵單擊解決方案,選擇添加新項目,然后,選擇ASP.NET Core Web Application項目類型。

繼續並像前面一樣選擇API模板。

現在,在ST-Kafka-NET解決方案中有兩個項目。

NuGet包

為了讓C#代碼理解如何產生和消費消息,你需要一個Kafka的客戶端。現在最常用的客戶端是Confluent’s Kafka .NET Client。

選擇並單擊Install。或者,你可以通過命令行添加它們:

PM> Install-Package Confluent.Kafka

設置消費者

現在來實現消費者項目。雖然它是一個類似rest的應用程序,但消費者不是必需的。任何類型的.net項目都可以監聽topic消息。

該項目已經包含一個Controllers文件夾。你需要創建一個名為Handlers的新類,並向其添加一個KafkaConsumerHandler.cs的文件。內容如下:

using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace ST_KafkaConsumer.Handlers
{
    public class KafkaConsumerHandler : IHostedService
    {
        private readonly string topic = "simpletalk_topic";
        public Task StartAsync(CancellationToken cancellationToken)
        {
            var conf = new ConsumerConfig
            {
                GroupId = "st_consumer_group",
                BootstrapServers = "localhost:9092",
                AutoOffsetReset = AutoOffsetReset.Earliest
            };
            using (var builder = new ConsumerBuilder<Ignore, 
                string>(conf).Build())
            {
                builder.Subscribe(topic);
                var cancelToken = new CancellationTokenSource();
                try
                {
                    while (true)
                    {
                        var consumer = builder.Consume(cancelToken.Token);
                        Console.WriteLine($"Message: {consumer.Message.Value} received from {consumer.TopicPartitionOffset}");
                    }
                }
                catch (Exception)
                {
                    builder.Close();
                }
            }
            return Task.CompletedTask;
        }
        public Task StopAsync(CancellationToken cancellationToken)
        {
            return Task.CompletedTask;
        }
    }
}

這個處理程序必須在一個單獨的線程中運行,因為它將永遠在while循環中監視傳入消息。因此,需要在這個類中使用異步任務。

請注意topic名稱和消費者配置。它們與docker-compose.yml中的設置完全匹配。一定要反復檢查你的輸入,否則可能會導致一些莫名其妙的錯誤。

消費者組id可以是任何你想要的。通常,它們都有直觀的名稱,以幫助進行維護和故障排除。

每當新消息被發布到simpletalk_topic時,該消費者將使用它並將其記錄到控制台。當然,在現實應用程序中,你會更好地利用這些數據。

你還需要將這個托管服務類添加到Startup中,因此,打開它,並在ConfigureServices方法中添加以下代碼行:

services.AddSingleton<IHostedService, KafkaConsumerHandler>();

並確保引入了以下命名空間:

using ST_KafkaConsumer.Handlers;

設置生產者

至於生產者,這里的處理方式會有所不同。由於不需要無限循環來監聽到達的消息,生產者可以簡單地從任何地方發布消息,甚至是從控制器。在實際的應用程序中,最好將這類代碼與MVC層分開,但本例堅持使用控制器,以保持簡單。

在Controllers文件夾中創建一個名為KafkaProducerController.cs的文件,並向其添加一下內容:

using System;
using Confluent.Kafka;
using Microsoft.AspNetCore.Mvc;
namespace Kafka.Producer.API.Controllers
{
    [Route("api/kafka")]
    [ApiController]
    public class KafkaProducerController : ControllerBase
    {
        private readonly ProducerConfig config = new ProducerConfig 
                             { BootstrapServers = "localhost:9092" };
        private readonly string topic = "simpletalk_topic";
        [HttpPost]
        public IActionResult Post([FromQuery] string message)
        {
            return Created(string.Empty, SendToKafka(topic, message));
        }
        private Object SendToKafka(string topic, string message)
        {
            using (var producer = 
                 new ProducerBuilder<Null, string>(config).Build())
            {
                try
                {
                    return producer.ProduceAsync(topic, new Message<Null, string> { Value = message })
                        .GetAwaiter()
                        .GetResult();
                }
                catch (Exception e)
                {
                    Console.WriteLine($"Oops, something went wrong: {e}");
                }
            }
            return null;
        }
    }
}

生產者代碼比消費者代碼簡單得多。ProducerBuilder類負責根據提供的配置選項、Kafka服務器和topic名稱創建一個功能齊全的Kafka生產者。

重要的是要記住整個過程是異步的。但是,你可以使用Confluent的API來檢索awaiter對象,然后從API方法返回結果。

測試

要測試這個示例,你需要分別運行生產者和使用者應用程序。在工具欄中,找到Startup Projects組合框並選擇ST-KafkaConsumer選項:

點擊按鈕IIS Express來運行消費者應用程序。這將啟動一個新的瀏覽器窗口,我們將忽略並最小化它,因為消費者API不是重點。

打開一個新的cmd窗口,跳轉到producer文件夾。運行命令dotnet run來啟動它。

請注意它所運行的URL和端口。

現在是時候通過producer API發送一些消息了。為此,你可以使用任何API測試工具,例如Postman。

為了讓下面的命令正常工作,必須確保Docker鏡像正常工作。因此,請確保再次執行docker ps來檢查。有時,重新啟動計算機會停止這些進程。

如果命令沒有任何日志信息,那么再運行一次docker-compose。

要測試發布-訂閱消息,打開另一個cmd窗口並發出以下命令:

curl -H "Content-Length: 0" -X POST "http://localhost:51249/api/kafka?message=Hello,kafka!"

這個請求發送到生產者API並向Kafka發布一個新消息。

要檢查消費者是否收到了它,你可以找到輸出窗口並選擇ST-KafkaConsumer – ASP.NET Core Web Server,如圖所示:

cmd窗口也可以顯示JSON結果。但是,它沒有格式化。要解決這個問題,如果你安裝了Python,你可以運行以下命令:

curl -H "Content-Length: 0" -X POST "http://localhost:51249/api/kafka?message=Hello,kafka!" | python -m json.tool

輸出如下:

這是目前可以獲得的關於topic message對象的所有信息。第二個測試將顯示當消費者項目關閉並發布消息時發生了什么。

停止Visual Studio中的consumer項目,但這一次有一個不同的消息:

curl -H "Content-Length: 0" -X POST "http://localhost:51249/api/kafka?message=Is%20anybody%20there?" | python -m json.tool

接着啟動消費者項目,觀察日志記錄,內容如下:

總結

Kafka是一個靈活和健壯的工具,它允許在許多類型的項目中進行強大的實現,這是它被廣泛采用的第一個原因。

這篇文章只是對它的世界的一個簡要介紹,但是還有更多的東西可以看到。在下一篇文章中,我將探討Kafka的功能。

原文鏈接:https://www.red-gate.com/simple-talk/dotnet/net-development/using-apache-kafka-with-net/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM