Kafka集群部署


Kafka集群部署

部署Kafka之前必须先部署好zookeeper

1. zookeeper分布式安装部署

1.1 集群规划

在master、slave01和slave02三个节点上部署Zookeeper。

1.2 解压安装

(1)进入存放zookeeper安装包目录,解压Zookeeper安装包到/opt/module/目录下

tar -zxvf zookeeper-3.4.5.tar.gz -C /opt/module/

(2)解压后文件名修改为zookeeper

mv zookeeper-3.4.5 zookeeper

1.3 设置zookeeper环境变量

命令:

vi /root/.bash_profile

加入下面内容:

export ZOOKEEPER_HOME=/opt/module/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin

使设置立即生效:

source /root/.bash_profile

复制环境变量到slave01、slave01节点

slave01节点:
scp -r /root/.bash_profile root@slave01:/root/.bash_profile 
slave02节点:
scp -r /root/.bash_profile root@slave02:/root/.bash_profile 

1.4 配置服务器编号

(1)在/opt/module/zookeeper/这个目录下创建zkData

mkdir zkData

(2)在/opt/module/zookeeper/zkData目录下创建一个myid的文件

touch myid

(3)编辑myid文件

vi myid

在文件中添加与server对应的编号:

2

1.5 配置zoo.cfg文件

(1)重命名/opt/module/zookeeper/conf这个目录下的zoo_sample.cfg为zoo.cfg

mv zoo_sample.cfg zoo.cfg

(2)打开zoo.cfg文件

vi zoo.cfg

修改数据存储路径配置

dataDir=/opt/module/zookeeper/zkData

增加如下配置

server.2=master:2888:3888
server.3=slave01:2888:3888
server.4=slave02:2888:3888

(3)复制配置好的zookeeper到其他节点上

slave01节点:
scp -r /opt/module/zookeeper/ root@slave01:/opt/module/zookeeper/
slave02节点:
scp -r /opt/module/zookeeper/ root@slave02:/opt/module/zookeeper/

并分别在slave01、slave02上修改myid文件中内容为3、4

1.6 集群操作

(1)分别启动三个节点的Zookeeper

master节点:
[root@master zookeeper]# bin/zkServer.sh start
slave01节点:
[root@slave01 zookeeper]# bin/zkServer.sh start
slave02节点:
[root@slave02 zookeeper]# bin/zkServer.sh start

(2)查看进程是否启动

jps

(3)查看三个节点的状态

master节点 :
[root@master zookeeper]# bin/zkServer.sh status
slave01节点:
[root@slave01 zookeeper]# bin/zkServer.sh status
slave02节点:
[root@slave02 zookeeper]# bin/zkServer.sh status

(4)启动客户端:

[root@master zookeeper]# bin/zkCli.sh

(5)退出客户端:

[zk: localhost:2181(CONNECTED) 0] quit

(6)停止Zookeeper

[root@master zookeeper]# bin/zkServer.sh stop

2. Kafka组件的安装与部署

2.1 Kafka下载地址

http://kafka.apache.org/downloads.html

这里的Kafka用的版本是:kafka_2.11-0.11.0.2.tgz

2.2 集群规划

在master、slave01和slave02三个节点上部署Kafka。

2.3 Kafka集群部署

(1)进入存放Kafka安装包目录,解压安装包到指定路径

tar -zxvf kafka_2.11-0.11.0.2.tgz -C /opt/module/

修改解压后的文件名称

mv kafka_2.11-0.11.0.2 kafka

(2)配置环境变量

命令:

vi /root/.bash_profile

加入下面内容:

export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin

使设置立即生效:

source /root/.bash_profile

复制环境变量到slave01、slave01节点

slave01节点:
scp -r /root/.bash_profile root@slave01:/root/.bash_profile 
slave02节点:
scp -r /root/.bash_profile root@slave02:/root/.bash_profile 

(2)在/opt/module/kafka目录下创建logs文件夹

mkdir logs

(3)修改配置文件

进入/opt/module/kafka/config/目录,修改server.properties文件

[root@master config]# vi server.properties

输入以下内容(里面等号前面的内容已存在,只需修改后面的内容即可):

#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径	
log.dirs=/opt/module/kafka/logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=master:2181,master:2181,master:2181

(5)复制配置好的Kafka到其他节点上

slave01节点:
scp -r /opt/module/kafka/ root@slave01:/opt/module/kafka/
slave02节点:
scp -r /opt/module/kafka/ root@slave02:/opt/module/kafka/

(6)分别在slave01和slave02上修改配置文件/opt/module/kafka/config/server.properties中的broker.id

slave01:broker.id=1

slave02:broker.id=2

注:broker.id不得重复

(7)启动集群

依次在master、slave01、slave02节点上启动kafka

[root@master kafka]# bin/kafka-server-start.sh -daemon config/server.properties

(8)关闭集群命令

[root@master kafka]# bin/kafka-server-stop.sh

2.4 启动后,查看zookeeper集群链接情况

(1)进入zookeeper目录下

启动zookeeper客户端:

[root@master zookeeper]# bin/zkCli.sh

查看

[zk: localhost:2181(CONNECTED) 0] ls /

查看连接情况

[zk: localhost:2181(CONNECTED) 1] ls /brokers/ids

如图所示

image-20210512122656781

2.5 Kafka命令行操作

(1)查看当前服务器中的所有topic

[root@master kafka]# bin/kafka-topics.sh --zookeeper master:2181 --list

(2)任意一台机器创建topic

[root@master kafka]# bin/kafka-topics.sh --zookeeper master:2181 --create --replication-factor 1 --partitions 1 --topic test
## 说明
master:2181:这是zookeeper服务器名+端口号
test:是topic的名字

如图所示

image-20210512234431543

(3)使用任意一台Kafka服务器做生产者(这里使用的是master节点)

[root@master kafka]# bin/kafka-console-producer.sh --broker-list 192.168.1.110:9092 --topic test

(4)使用其他两台Kafka消费

slave01节点:
[root@slave01 kafka]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.111:9092 --topic test --from-beginning

slave02节点:
[root@slave02 kafka]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.112:9092 --topic test --from-beginning

3. 删除kafka的topic

登录zookeeper客户端

bin/zkCli.sh

找到topic所在的目录

ls /brokers/topics

找到要删除的topic执行如下命令

rmr /brokers/topics/topic名称


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM