Kafka集群部署
部署Kafka之前必须先部署好zookeeper
1. zookeeper分布式安装部署
1.1 集群规划
在master、slave01和slave02三个节点上部署Zookeeper。
1.2 解压安装
(1)进入存放zookeeper安装包目录,解压Zookeeper安装包到/opt/module/目录下
tar -zxvf zookeeper-3.4.5.tar.gz -C /opt/module/
(2)解压后文件名修改为zookeeper
mv zookeeper-3.4.5 zookeeper
1.3 设置zookeeper环境变量
命令:
vi /root/.bash_profile
加入下面内容:
export ZOOKEEPER_HOME=/opt/module/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin
使设置立即生效:
source /root/.bash_profile
复制环境变量到slave01、slave01节点
slave01节点:
scp -r /root/.bash_profile root@slave01:/root/.bash_profile
slave02节点:
scp -r /root/.bash_profile root@slave02:/root/.bash_profile
1.4 配置服务器编号
(1)在/opt/module/zookeeper/这个目录下创建zkData
mkdir zkData
(2)在/opt/module/zookeeper/zkData目录下创建一个myid的文件
touch myid
(3)编辑myid文件
vi myid
在文件中添加与server对应的编号:
2
1.5 配置zoo.cfg文件
(1)重命名/opt/module/zookeeper/conf这个目录下的zoo_sample.cfg为zoo.cfg
mv zoo_sample.cfg zoo.cfg
(2)打开zoo.cfg文件
vi zoo.cfg
修改数据存储路径配置
dataDir=/opt/module/zookeeper/zkData
增加如下配置
server.2=master:2888:3888
server.3=slave01:2888:3888
server.4=slave02:2888:3888
(3)复制配置好的zookeeper到其他节点上
slave01节点:
scp -r /opt/module/zookeeper/ root@slave01:/opt/module/zookeeper/
slave02节点:
scp -r /opt/module/zookeeper/ root@slave02:/opt/module/zookeeper/
并分别在slave01、slave02上修改myid文件中内容为3、4
1.6 集群操作
(1)分别启动三个节点的Zookeeper
master节点:
[root@master zookeeper]# bin/zkServer.sh start
slave01节点:
[root@slave01 zookeeper]# bin/zkServer.sh start
slave02节点:
[root@slave02 zookeeper]# bin/zkServer.sh start
(2)查看进程是否启动
jps
(3)查看三个节点的状态
master节点 :
[root@master zookeeper]# bin/zkServer.sh status
slave01节点:
[root@slave01 zookeeper]# bin/zkServer.sh status
slave02节点:
[root@slave02 zookeeper]# bin/zkServer.sh status
(4)启动客户端:
[root@master zookeeper]# bin/zkCli.sh
(5)退出客户端:
[zk: localhost:2181(CONNECTED) 0] quit
(6)停止Zookeeper
[root@master zookeeper]# bin/zkServer.sh stop
2. Kafka组件的安装与部署
2.1 Kafka下载地址
http://kafka.apache.org/downloads.html
这里的Kafka用的版本是:kafka_2.11-0.11.0.2.tgz
2.2 集群规划
在master、slave01和slave02三个节点上部署Kafka。
2.3 Kafka集群部署
(1)进入存放Kafka安装包目录,解压安装包到指定路径
tar -zxvf kafka_2.11-0.11.0.2.tgz -C /opt/module/
修改解压后的文件名称
mv kafka_2.11-0.11.0.2 kafka
(2)配置环境变量
命令:
vi /root/.bash_profile
加入下面内容:
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
使设置立即生效:
source /root/.bash_profile
复制环境变量到slave01、slave01节点
slave01节点:
scp -r /root/.bash_profile root@slave01:/root/.bash_profile
slave02节点:
scp -r /root/.bash_profile root@slave02:/root/.bash_profile
(2)在/opt/module/kafka目录下创建logs文件夹
mkdir logs
(3)修改配置文件
进入/opt/module/kafka/config/目录,修改server.properties文件
[root@master config]# vi server.properties
输入以下内容(里面等号前面的内容已存在,只需修改后面的内容即可):
#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=master:2181,master:2181,master:2181
(5)复制配置好的Kafka到其他节点上
slave01节点:
scp -r /opt/module/kafka/ root@slave01:/opt/module/kafka/
slave02节点:
scp -r /opt/module/kafka/ root@slave02:/opt/module/kafka/
(6)分别在slave01和slave02上修改配置文件/opt/module/kafka/config/server.properties中的broker.id
slave01:broker.id=1
slave02:broker.id=2
注:broker.id不得重复
(7)启动集群
依次在master、slave01、slave02节点上启动kafka
[root@master kafka]# bin/kafka-server-start.sh -daemon config/server.properties
(8)关闭集群命令
[root@master kafka]# bin/kafka-server-stop.sh
2.4 启动后,查看zookeeper集群链接情况
(1)进入zookeeper目录下
启动zookeeper客户端:
[root@master zookeeper]# bin/zkCli.sh
查看
[zk: localhost:2181(CONNECTED) 0] ls /
查看连接情况
[zk: localhost:2181(CONNECTED) 1] ls /brokers/ids
如图所示
2.5 Kafka命令行操作
(1)查看当前服务器中的所有topic
[root@master kafka]# bin/kafka-topics.sh --zookeeper master:2181 --list
(2)任意一台机器创建topic
[root@master kafka]# bin/kafka-topics.sh --zookeeper master:2181 --create --replication-factor 1 --partitions 1 --topic test
## 说明
master:2181:这是zookeeper服务器名+端口号
test:是topic的名字
如图所示

(3)使用任意一台Kafka服务器做生产者(这里使用的是master节点)
[root@master kafka]# bin/kafka-console-producer.sh --broker-list 192.168.1.110:9092 --topic test
(4)使用其他两台Kafka消费
slave01节点:
[root@slave01 kafka]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.111:9092 --topic test --from-beginning
slave02节点:
[root@slave02 kafka]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.112:9092 --topic test --from-beginning
3. 删除kafka的topic
登录zookeeper客户端
bin/zkCli.sh
找到topic所在的目录
ls /brokers/topics
找到要删除的topic执行如下命令
rmr /brokers/topics/topic名称