kafka快速開始教程


此教程假設你剛剛開始沒有任何 Kafka 或 ZooKeeper 數據。Kafka的控制台腳本在類Unix和Windows平台不同,Windows平台使用bin\windows\\代替bin/,腳本的擴展名改為.bat

第一步:下載代碼

下載0.10.1.0發行版並解壓。

> tar -xzf kafka_2.11-0.10.1.0.tgz
> cd kafka_2.11-0.10.1.0

第二步:啟動服務

Kafka使用Zookeeper,所以如果你沒有的話需要首先啟動Zookeeper服務。你可以使用kafka自帶的腳本啟動一個簡單的單一節點Zookeeper實例。

> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

現在啟動Kafka服務:

> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

第三步:創建一個主題

讓我們來創建一個名為test的topic,只使用單個分區和一個復本。

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

我們現在可以運行list topic命令看到我們的主題。

> bin/kafka-topics.sh --list --zookeeper localhost:2181
test

另外,當沒有主題存在的時候,你也可以通過配置代理自動創建主題而不是手動創建。

第四步:發送消息

Kafka有自帶的命令行客戶端會從文件或者標准輸入接受數據當作消息發送到Kafka集群。默認情況下,每行作為一個獨立的消息發送。

運行生產者控制台並且打幾行消息到控制台發送到服務器。

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

第5步:啟動一個消費者

Kafka還有個消費者控制台,會把消息輸出到標准輸出。

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

如果你上面的命令是在不同的終端運行,那么你可以在生產者終端輸入消息然后在消費者終端看到。

所有的命令行工具都有一些額外的參數:如果沒有使用參數運行命令,將會顯示它們的詳細用法。

第六步:設置多個代理集群

目前為止,我們已經在單個代理上運行了,但這不好玩。對於Kafka,單個代理只是大小為1的集群。所以沒什么改變除了多啟動幾個代理實例。只是為了感受一下,我們把集群擴展到3個節點(仍然在我們的本地機器上)。

首先,我們為每個代理新建一個配置文件(在windows上使用copy命令):

> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

現在編輯新文件設置一下屬性:

config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1

config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2

broker.id屬性是唯一的,在集群的每個節點永久不變。因為我們在單台機器上運行代理,必須重寫端口和日志目錄。

我們已經有了Zookeeper並運行了一個節點,所以只需要啟動下面的兩個新節點:

> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...
現在創建一個含有三個副本的主題:
```sh
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

好,現在我們有了一個集群但是怎么知道哪個代理正在做什么?使用describe topice命令查看:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: my-replicated-topic	Partition: 0	Leader: 1	Replicas: 1,2,0	Isr: 1,2,0

這是輸出解釋。第一行給出了各個分區的概況,額外的每行都給出了一個分區的信息。由於我們只有一個主題的分區,所以只有一行。

  • leader是負責當前分區的所有讀寫請求。每個節點都將領導一個隨機選擇的分區。
  • replicas 是節點列表,復制分區日志,不管他們是不是leader或者即使它們還活着。
  • isrin-sync的集合。這是replicas列表當前還活着的子集。

注意在我們的示例中節點1是唯一的主題分區領導者。

我們運行同樣的命令查看我們已經創建的原始主題在哪里:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0

所以沒什么驚奇的,原始的主題沒有副本在節點0上,當我們創建它時唯一存在的節點服務器。

讓我們發布一些消息到新的主題:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C

現在消費這些消息:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

現在測試容錯性。節點1是領導者,我們kill它。

> ps aux | grep server-1.properties
7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...
> kill -9 7564

在Windows上使用

> wmic process get processid,caption,commandline | find "java.exe" | find "server-1.properties"
java.exe    java  -Xmx1G -Xms1G -server -XX:+UseG1GC ... build\libs\kafka_2.10-0.10.1.0.jar"  kafka.Kafka config\server-1.properties    644
> taskkill /pid 644 /f

領導關系已經改為了從節點中的一個,節點1也不再in-sync復本集中:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: my-replicated-topic	Partition: 0	Leader: 2	Replicas: 1,2,0	Isr: 2,0

但是我們依然可以消費消息即使之前接受的領導者已經掛掉:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

第七部:使用Kafka Connect導入導出數據

在控制台輸入輸出數據是很方便,但是你可能使用來自其他數據源的數據或者把Kafka的數據導出到其他的系統中。對於很多系統,你可以直接使用Kafka Connect導入導出數據而不需要手寫自定義的集成代碼。

Kafka Connect是Kafka自帶的導入導出工具。它是運行連接器的可擴展工具,實現了集成外部系統的自定義邏輯。在快速教程里,我們會看到如何使用Kafka Connect的簡單連接器從文件導入數據到Kafka主題,再從kafka主題導出數據到文件。
首先,我們先創建一些測試數據:

echo -e "foo\nbar" >test.txt

然后我們在Standalone模式啟動兩個連接器,Standalone模式表示他們運行在一個本地進程中。我們提供了三個配置文件作為參數,第一個配置Kafka Connect進程,包含通用的配置如Kafka 代理連接和數據序列話工具。剩余的文件每個都指定一個連接器。這些文件包含一個唯一連接器名,實例化的連接器類,和一些其他的連接器配置。

> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

這些示例配置文件,包含在Kafka中,使用默認的本地集群配置並創建了兩個連接器:第一個是源連接器從文件讀取數據行每行都生成消息發送到kafka主題,第二個是目標連接器從Kafka主題讀取消息生成行輸出到文件中。

在啟動的時候你會看到大量的日志信息,包含一些表示連接器初始化的。一旦Kafka Connect進程啟動,源連接器開始從test.txt讀取行並發送到connect-test主題,sink連接器開始從connect-test主題讀取消息把他們寫到test.sink.txt文件。我們可以檢查輸出文件看到數據已經通過管道傳遞完畢:

> cat test.sink.txt
foo
bar

注意數據存儲在Kafka的主題connect-test中,我們可以運行消費者控制台查看主題數據(或者消費者代碼處理它):

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...

連接器持續生成數據,所以我們可以給測試文件添加數據,看它通過管道。

> echo "Another line" >> test.txt

你應該看到這行出現在消費者控制台和目標文件中。

第八步:用Kafka Streams處理數據

Kafka Streams 是一個客戶端庫,為了實時流計算和分析Kafka集群中的存儲數據。此快速教程示例將會描述如何運行一個用這個庫編寫的流程序。這是WordCountDemo 的主要示例代碼(改成Java8 lambda表達式為了容易閱讀)。

KTable wordCounts = textLines
    // Split each text line, by whitespace, into words.
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))

    // Ensure the words are available as record keys for the next aggregate operation.
    .map((key, value) -> new KeyValue<>(value, value))

    // Count the occurrences of each word (record key) and store the results into a table named "Counts".
    .countByKey("Counts")

它實現了WordCount算法,計算輸入文本單詞直方圖。然而,不同你之前可能見過的WordCount例子(數據是有限),這個Demo程序有些不同,因為它設計為操作無限且沒有邊界的流數據。與有界變量類似,它是一個有狀態的算法跟蹤和修改單詞的總數。然而,由於它必須假設潛在的輸入數據無限多,當處理更多數據時它會定期輸出它的當前狀態和結果,因為他不知道什么時候所有輸入數據會處理完成。

我們將給Kafka主題添加一些數據,隨后會被Kafka流程序處理。

> echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > file-input.txt

在Windows上:

> echo all streams lead to kafka> file-input.txt
> echo hello kafka streams>> file-input.txt
> echo|set /p=join kafka summit>> file-input.txt

下一步,我們使用生產者控制台發送一些輸入數據給主題streams-file-input(在實踐中,流數據會在系統啟動時持續不斷的流向Kafka系統)。

> bin/kafka-topics.sh --create \
            --zookeeper localhost:2181 \
            --replication-factor 1 \
            --partitions 1 \
            --topic streams-file-input
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-file-input < file-input.txt

現在可以運行WordCount示例程序處理輸入數據:

> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

不會有任何標准輸出,日志項作為結果持續寫到另一個名為streams-wordcount-output Kafka主題中。示例會運行幾秒鍾后自動停止而不像以便的流處理程序。

我們現在通過讀取它的輸出主題檢查WordCount 示例輸出:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
            --topic streams-wordcount-output \
            --from-beginning \
            --formatter kafka.tools.DefaultMessageFormatter \
            --property print.key=true \
            --property print.value=true \
            --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
            --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

以下輸出數據將會打印在控制台上:

all 1
lead 1
to 1
hello 1
streams 2
join 1
kafka 3
summit 1

第一列是Kafka消息鍵,第二列是消息的值,都是java.lang.String格式。注意輸出實際上是一個持續更新的流,每個數據記錄(例如上面的每行)是一個單詞的更新總數,又或者是如kafka這樣的鍵。對一個鍵有多條記錄,每個后面的記錄都會更新前面的。

現在你可以寫更多的消息發送到streams-file-input 主題中,並在streams-wordcount-output主題中觀察添加的消息,查看更新的單詞總數


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM