Kafka是由Apache軟件基金會開發的一個高吞吐量的分布式發布訂閱消息系統,由Scala和Java編寫。官網地址:http://kafka.apache.org
0.基本概念
Broker:Kafka集群包含一個或多個服務器,這種服務器被稱為broker。
Topic:每條發布到Kafka集群的消息都有一個主題名稱,這個主題名稱就被稱為Topic。
Partition:Partition是物理上的概念,每個Topic包含一個或多個Partition。
Producer:消息生產者,負責發布消息到Kafka broker。
Consumer:消息消費者,向Kafka broker讀取消息的客戶端。
Consumer Group:每個Consumer屬於一個特定的Consumer Group,group name可單獨設定,若不設定則屬於默認的group。
1.安裝步驟
系統:Centos7.2
JDK: java1.8
另外需要安裝zookeeper,可以使用已有的zookeeper服務,新版的kafka已內置了一個zookeeper環境,也可以直接使用。
(1)下載
kafka安裝包下載地址:http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
/bin 可執行腳本目錄
/config 配置文件目錄
tar -xzf kafka_2.11-2.1.0.tgz cd kafka_2.11-2.1.0
(2)啟動zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
(3)配置kafka相關屬性
broker.id=0 //唯一標識 listeners=PLAINTEXT://:9092 指定服務的端口 advertised.listeners=PLAINTEXT://192.168.31.222:9092 如果要提供外網訪問 必須配置此項 log.dirs=/tmp/kafka-logs-0 //日志目錄
(4)啟動服務
bin/kafka-server-start.sh config/server.properties &
(5)檢查服務
[root@localhost config]# netstat -tunlp|egrep "(2181|9092)" tcp6 0 0 :::9092 :::* LISTEN 14610/java tcp6 0 0 :::2181 :::* LISTEN 20494/java
2.集群配置
所謂的kafka集群就是指多個broker組成的集群,通過zookeeper來進行管理。以下在本機部署三個broker組成的集群。
首先將config/server.properties的復制三份,分別命名為server-0.properties、server-1.properties、server-2.properties,分別配置如下:
server-0.properties:
broker.id=0 listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://192.168.31.222:9092
log.dirs=/tmp/kafka-logs-0
server-1.properties:
broker.id=1 listeners=PLAINTEXT://:9093 advertised.listeners=PLAINTEXT://192.168.31.222:9093 log.dirs=/tmp/kafka-logs-1
server-2.properties:
broker.id=2 listeners=PLAINTEXT://:9094 advertised.listeners=PLAINTEXT://192.168.31.222:9094 log.dirs=/tmp/kafka-logs-2
然后分別都啟動就可以了:
bin/kafka-server-start.sh config/server-0.properties & bin/kafka-server-start.sh config/server-1.properties & bin/kafka-server-start.sh config/server-2.properties &
查看是否啟動了:
[root@localhost ~]# netstat -tunlp|egrep "(2181|9092|9093|9094)" tcp6 0 0 :::9092 :::* LISTEN 14610/java tcp6 0 0 :::2181 :::* LISTEN 20494/java tcp6 0 0 :::9093 :::* LISTEN 15353/java tcp6 0 0 :::9094 :::* LISTEN 14974/java
3.客戶端調用
客戶端使用.net core測試,使用官網推薦的confluent-kafka-dotnet類庫
生產者代碼:通過192.168.31.222:9092發布消息
public static void Main(string[] args) { var conf = new ProducerConfig { BootstrapServers = "192.168.31.222:9092" }; Action<DeliveryReportResult<Null, string>> handler = r => Console.WriteLine(!r.Error.IsError ? $"Delivered message to {r.TopicPartitionOffset}" : $"Delivery Error: {r.Error.Reason}"); using (var p = new Producer<Null, string>(conf)) { for (int i=0; i<100; ++i){ p.BeginProduce("my-topic", new Message<Null, string> { Value = i.ToString() }, handler); } p.Flush(TimeSpan.FromSeconds(10)); } }
消費者代碼:通過192.168.31.222:9093訂閱接收消息
public static void Main(string[] args) { var conf = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "192.168.31.222:9093", AutoOffsetReset = AutoOffsetResetType.Earliest }; using (var c = new Consumer<Ignore, string>(conf)) { c.Subscribe("my-topic"); bool consuming = true; c.OnError += (_, e) => consuming = !e.IsFatal; while (consuming) { try{ var cr = c.Consume(); Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'."); } catch (ConsumeException e){ Console.WriteLine($"Error occured: {e.Error.Reason}"); } } c.Close(); } }