1、安裝zookeeper
因為zookeeper 與kafka 存在對應的版本,選擇不當,將無法使用,所以兩者都使用最新版本
下載地址:https://zookeeper.apache.org/releases.html
目錄
下載並解壓ZooKeeper軟件壓縮包后,可以看到zk包含以下的文件和目錄:
圖1:ZooKeeper軟件的文件和目錄
- bin目錄
zk的可執行腳本目錄,包括zk服務進程,zk客戶端,等腳本。其中,.sh是Linux環境下的腳本,.cmd是Windows環境下的腳本。 - conf目錄
配置文件目錄。zoo_sample.cfg為樣例配置文件,需要修改為自己的名稱,一般為zoo.cfg。log4j.properties為日志配置文件。 - lib
zk依賴的包。 - contrib目錄
一些用於操作zk的工具包。 - recipes目錄
zk某些用法的代碼示例
運行配置
上面提到,conf目錄下提供了配置的樣例zoo_sample.cfg,要將zk運行起來,需要將其名稱修改為zoo.cfg。
打開zoo.cfg,可以看到默認的一些配置。
- tickTime
時長單位為毫秒,為zk使用的基本時間度量單位。例如,1 * tickTime是客戶端與zk服務端的心跳時間,2 * tickTime是客戶端會話的超時時間。
tickTime的默認值為2000毫秒,更低的tickTime值可以更快地發現超時問題,但也會導致更高的網絡流量(心跳消息)和更高的CPU使用率(會話的跟蹤處理)。 - clientPort
zk服務進程監聽的TCP端口,默認情況下,服務端會監聽2181端口。 - dataDir
無默認配置,必須配置,用於配置存儲快照文件的目錄。如果沒有配置dataLogDir,那么事務日志也會存儲在此目錄。
配置環境變量 vim ~/.bash_profile
export ZK_HOME=/Users/yyj/big_data/zookeeper export KAFKA_HOME=/Users/yyj/big_data/kafka_2.12-2.5.0 export PATH=$PATH:$ANDROID_HOME/platform-tools:${KAFKA_HOME}/bin:${ZK_HOME}/bin
啟動 進入bin目錄,執行命令
zkServer.sh start
安裝Kafka
下載 http://kafka.apache.org/downloads
解壓
tar -xzf kafka_2.12-2.5.0.tgz
注意,kafka_2.12-2.5.0.tgz版本是已經編譯好的版本,解壓就能使用。
配置server.properties
默認配置 advertised.listeners=PLAINTEXT://:your.host.name:9092
修改為 advertised.listeners=PLAINTEXT://:ip:9092
ip為服務器ip。
hostname和端口是用來建議給生產者和消費者使用的,如果沒有設置,將會使用listeners的配置,如果listeners也沒有配置,將使用java.net.InetAddress.getCanonicalHostName()來獲取這個hostname和port,對於ipv4,基本就是localhost了。
"PLAINTEXT"表示協議,可選的值有PLAINTEXT和SSL,hostname可以指定IP地址,也可以用"0.0.0.0"表示對所有的網絡接口有效,如果hostname為空表示只對默認的網絡接口有效。也就是說如果你沒有配置advertised.listeners,就使用listeners的配置通告給消息的生產者和消費者,這個過程是在生產者和消費者獲取源數據(metadata)。
配置環境變量
export ZK_HOME=/Users/yyj/big_data/zookeeper export KAFKA_HOME=/Users/yyj/big_data/kafka_2.12-2.5.0 export PATH=$PATH:$ANDROID_HOME/platform-tools:${KAFKA_HOME}/bin:${ZK_HOME}/bin
啟動Kafka
啟動ZooKeeper
zkServer.sh start
注意,需要先啟動ZooKeeper再啟動kafka,不然會報錯。如下圖:
啟動kafka
kafka-server-start.sh /Users/yyj/big_data/kafka_2.12-2.5.0/config/server.properties
啟動Kafka Broker后,在ZooKeeper終端上鍵入命令 jps,效果如下:
創建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo
其中demo為創建的topic名稱。
如上圖,創建了一個名為 demo 的主題,其中包含一個分區和一個副本因子。 創建成功之后會輸出: Created topic "demo".
如上圖,創建主題后,系統會在config / server.properties文件中的"/ tmp / kafka-logs /"中指定的創建主題的日志。
查詢topic列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
查看topic信息
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic demo
刪除topic
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic demo
啟動生產者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo
從上面的語法,生產者命令行客戶端需要兩個主要參數 -
代理列表 - 我們要發送郵件的代理列表。 在這種情況下,我們只有一個代理。 Config / server.properties文件包含代理端口ID,因為我們知道我們的代理正在偵聽端口9092,因此您可以直接指定它。主題名稱:demo。
啟動消費者
為了方便測試,另啟一個sheel窗口 這樣效果更明顯。需要注意的是舊版本和新版本的命令是不一樣的
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic demo --from-beginning
報錯提示: zookeeper is not a recognized option
發現在啟動的時候說使用 --zookeeper是一個過時的方法,最新的版本中命令如下:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --from-beginning
可以開啟兩個終端,一個發送消息,一個接受消息。效果如下:
配置啟動 關閉shell 腳本
start_all.sh
#!/bin/bash
# 啟動zk
zkServer.sh start
# 啟動kafka
nohup kafka-server-start.sh /Users/yyj/big_data/kafka_2.12-2.5.0/config/server.properties >> /Users/yyj/big_data/kafka.log &
stop_all.sh
#!/bin/bash # 關閉kafka kafka-server-stop.sh /Users/yyj/big_data/kafka_2.12-2.5.0/config/server.properties # 關閉zk zkServer.sh stop