Kafka安裝配置
我們使用5台機器搭建Kafka集群:
1. cluster-1-namenode-1-001 172.16.0.147
2. cluster-1-datanode-1-001 172.16.0.144
3. cluster-1-datanode-1-003 172.16.0.145
4. cluster-1-datanode-1-002 172.16.0.146
5. cluster-1-datanode-1-004 172.16.0.148
由於之前已經安裝了Zookeeper,就不再使用Kafka自帶的Zookeeper。
首先,在namenode上准備Kafka安裝文件,執行如下命令:
cd /opt
wget http://www-eu.apache.org/dist/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz
tar -xvzf kafka_2.11-0.10.0.0.tgz
解壓好之后,修改配置文件/opt/kafka_2.11-0.10.0.0/config/server.properties,原配置為:
broker.id=0
zookeeper.connect=localhost:2181
修改為:
broker.id=0
listeners=PLAINTEXT://cluster-1-namenode-1-001:9092
advertised.listeners=PLAINTEXT://cluster-1-namenode-1-001:9092
log.dirs=/opt/kafka_2.11-0.10.0.0/logs/kafka-logs
zookeeper.connect=cluster-1-namenode-1-001:2181,cluster-1-datanode-1-001:2181,cluster-1-datanode-1-003:2181,cluster-1-datanode-1-002:2181,cluster-1-datanode-1-004:2181
這里需要說明的是,默認Kafka會使用ZooKeeper默認的/路徑,這樣有關Kafka的ZooKeeper配置就會散落在根路徑下面,如果你有其他的應用也在使用ZooKeeper集群,查看ZooKeeper中數據可能會不直觀,所以強烈建議指定一個chroot路徑,直接在zookeeper.connect配置項中指定。
由於kafka-logs不存在,創建並給與權限:
cd /opt/kafka_2.11-0.10.0.0/logs
mkdir kafka-logs
chmod -R 777 kafka-logs
然后,將配置好的文件同步到datanode節點上:
scp -r /opt/kafka_2.11-0.10.0.0/ cluster-1-datanode-1-001:/opt/
scp -r /opt/kafka_2.11-0.10.0.0/ cluster-1-datanode-1-003:/opt/
scp -r /opt/kafka_2.11-0.10.0.0/ cluster-1-datanode-1-002:/opt/
scp -r /opt/kafka_2.11-0.10.0.0/ cluster-1-datanode-1-004:/opt/
並修改/opt/kafka_2.11-0.10.0.0/config/server.properties內容:
broker.id=1
listeners=PLAINTEXT:// cluster-1-datanode-1-001:9092
advertised.listeners=PLAINTEXT:// cluster-1-datanode-1-001:9092
# 在cluster-1-datanode-1-001修改
broker.id=2
listeners=PLAINTEXT:// cluster-1-datanode-1-002:9092
advertised.listeners=PLAINTEXT:// cluster-1-datanode-1-002:9092
# 在cluster-1-datanode-1-002修改
broker.id=3
listeners=PLAINTEXT:// cluster-1-datanode-1-003:9092
advertised.listeners=PLAINTEXT:// cluster-1-datanode-1-003:9092
# 在cluster-1-datanode-1-003修改
broker.id=4
listeners=PLAINTEXT:// cluster-1-datanode-1-004:9092
advertised.listeners=PLAINTEXT:// cluster-1-datanode-1-004:9092
# 在cluster-1-datanode-1-004修改
因為Kafka集群需要保證各個Broker的id在整個集群中必須唯一,需要調整這個配置項的值。
通過檢查進程狀態,使用守護進程模式啟動kafka,保證kafka集群啟動成功,並且kafka不會自動關閉:
./kafka-server-start.sh -daemon /opt/kafka_2.11-0.10.0.0/config/server.properties &
分別啟動節點上的kafka。
測試kafka的狀態
- 在namenode上創建mytest主題(kafka有幾個,replication-factor就填幾個)
[root@cluster-1-namenode-1-001 bin]# ./kafka-topics.sh --create --topic mytest --replication-factor 5 --partitions 2 --zookeeper cluster-1-namenode-1-001:2181
Created topic "mytest".
[root@cluster-1-namenode-1-001 bin]#
- 在namenode上查看剛才創建的mytest主題
[root@cluster-1-namenode-1-001 bin]# ./kafka-topics.sh --list --zookeeper cluster-1-namenode-1-001:2181
idoall
idoall_testTopic
my_test
mytest
test
[root@cluster-1-namenode-1-001 bin]#
- 在datanode1上發送消息至kafka,發送消息“this is for test”
[root@cluster-1-datanode-1-001 bin]# ./kafka-console-producer.sh --broker-list cluster-1-namenode-1-001:9092 --sync --topic mytest
this is for test
- 在datanode2上開啟一個消費者,模擬consumer,可以看到剛才發送的消息
[root@cluster-1-datanode-1-002 bin]# ./kafka-console-consumer.sh --zookeeper cluster-1-namenode-1-001:2181 --topic mytest --from-beginning
this is for test
^CProcessed a total of 1 messages
[root@cluster-1-datanode-1-002 bin]#