Kafka安裝配置


  Kafka是由Apache軟件基金會開發的一個高吞吐量的分布式發布訂閱消息系統,由Scala和Java編寫。官網地址:http://kafka.apache.org

0.基本概念

  Broker:Kafka集群包含一個或多個服務器,這種服務器被稱為broker。
  Topic:每條發布到Kafka集群的消息都有一個主題名稱,這個主題名稱就被稱為Topic。
  Partition:Partition是物理上的概念,每個Topic包含一個或多個Partition。
  Producer:消息生產者,負責發布消息到Kafka broker。
  Consumer:消息消費者,向Kafka broker讀取消息的客戶端。
  Consumer Group:每個Consumer屬於一個特定的Consumer Group,group name可單獨設定,若不設定則屬於默認的group。

1.安裝步驟

  系統:Centos7.2  

  JDK:   java1.8

  另外需要安裝zookeeper,可以使用已有的zookeeper服務,新版的kafka已內置了一個zookeeper環境,也可以直接使用。

  (1)下載

  kafka安裝包下載地址:http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz

  /bin 可執行腳本目錄

  /config 配置文件目錄

tar -xzf kafka_2.11-2.1.0.tgz
cd kafka_2.11-2.1.0

  (2)啟動zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

  (3)配置kafka相關屬性

broker.id=0   //唯一標識
listeners=PLAINTEXT://:9092  指定服務的端口
advertised.listeners=PLAINTEXT://192.168.31.222:9092  如果要提供外網訪問 必須配置此項
log.dirs=/tmp/kafka-logs-0  //日志目錄

  (4)啟動服務

bin/kafka-server-start.sh config/server.properties &

  (5)檢查服務

[root@localhost config]# netstat -tunlp|egrep "(2181|9092)"
tcp6       0      0 :::9092       :::*       LISTEN      14610/java          
tcp6       0      0 :::2181       :::*       LISTEN      20494/java 

2.集群配置

  所謂的kafka集群就是指多個broker組成的集群,通過zookeeper來進行管理。以下在本機部署三個broker組成的集群。

  首先將config/server.properties的復制三份,分別命名為server-0.properties、server-1.properties、server-2.properties,分別配置如下:

  server-0.properties:

broker.id=0
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://192.168.31.222:9092
log.dirs=/tmp/kafka-logs-0

  server-1.properties:

broker.id=1
listeners=PLAINTEXT://:9093
advertised.listeners=PLAINTEXT://192.168.31.222:9093
log.dirs=/tmp/kafka-logs-1

  server-2.properties:

broker.id=2
listeners=PLAINTEXT://:9094
advertised.listeners=PLAINTEXT://192.168.31.222:9094
log.dirs=/tmp/kafka-logs-2

   然后分別都啟動就可以了:

bin/kafka-server-start.sh config/server-0.properties &
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &

  查看是否啟動了:

[root@localhost ~]# netstat -tunlp|egrep "(2181|9092|9093|9094)"
tcp6       0      0 :::9092        :::*      LISTEN      14610/java          
tcp6       0      0 :::2181        :::*      LISTEN      20494/java          
tcp6       0      0 :::9093        :::*      LISTEN      15353/java          
tcp6       0      0 :::9094        :::*      LISTEN      14974/java

3.客戶端調用

  客戶端使用.net core測試,使用官網推薦的confluent-kafka-dotnet類庫

  生產者代碼:通過192.168.31.222:9092發布消息

public static void Main(string[] args)
    {
        var conf = new ProducerConfig { BootstrapServers = "192.168.31.222:9092" };
        Action<DeliveryReportResult<Null, string>> handler = r => 
            Console.WriteLine(!r.Error.IsError
                ? $"Delivered message to {r.TopicPartitionOffset}"
                : $"Delivery Error: {r.Error.Reason}");

        using (var p = new Producer<Null, string>(conf))
        {
            for (int i=0; i<100; ++i){
                p.BeginProduce("my-topic", new Message<Null, string> { Value = i.ToString() }, handler);
            }
            p.Flush(TimeSpan.FromSeconds(10));
        }
    }

  消費者代碼:通過192.168.31.222:9093訂閱接收消息

public static void Main(string[] args)
    {
        var conf = new ConsumerConfig
        { 
            GroupId = "test-consumer-group",
            BootstrapServers = "192.168.31.222:9093",
            AutoOffsetReset = AutoOffsetResetType.Earliest
        };
        using (var c = new Consumer<Ignore, string>(conf))
        {
            c.Subscribe("my-topic");
            bool consuming = true;
            c.OnError += (_, e) => consuming = !e.IsFatal;
            while (consuming)
            {
                try{
                    var cr = c.Consume();
                    Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                }
                catch (ConsumeException e){
                    Console.WriteLine($"Error occured: {e.Error.Reason}");
                }
            }
            c.Close();
        }
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM