kafka的安装
第一步:安装zookeeper,kafka依赖zookeeper,所以需要先安装zookeepe
第二步:下载安装kafka
wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz
tar -xzf kafka_2.12-2.8.1.tgz
cd kafka_2.12-2.8.1
第三步:修改配置
修改配置文件 config/server.properties
#broker.id属性在kafka集群中必须要是唯一
broker.id=0
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://192.168.65.60:9092
#kafka的消息存储文件
log.dir=/kafka/kafka-logs
#kafka连接zookeeper的地址
zookeeper.connect=192.168.65.60:2181
第四步:启动服务
启动脚本语法:kafka-server-start.sh [-daemon] server.properties
可以看到,server.properties的配置路径是一个强制的参数,-daemon表示以后台进程运行,否则ssh客户端退出后,就会停止服务。
(注意,在启动kafka时会使用linux主机名关联的ip地址,所以需要把主机名和linux的ip映射配置到本地host里,用vim /etc/hosts)
查看主机名称
hostname 查看主机名
hostname -i:查看本机对应的IP
修改主机名称:vim /etc/hostname
# 启动kafka,运行日志在logs目录的server.log文件里
bin/kafka-server-start.sh -daemon config/server.properties #后台启动,不会打印日志到控制台
或者用
bin/kafka-server-start.sh config/server.properties &
# 我们进入zookeeper目录通过zookeeper客户端查看下zookeeper的目录树
bin/zkCli.sh
ls / #查看zk的根目录kafka相关节点
ls /brokers/ids #查看kafka节点
# 停止kafka
bin/kafka-server-stop.sh
第五步 命令测试消息
1、创建主题
bin/kafka-topics.sh --create --zookeeper 192.168.31.239:2181 --replication-factor 1 --partitions 1 --topic test
2、列出所有主题
bin/kafka-topics.sh --list --zookeeper 192.168.31.239:2181
3、发送消息
bin/kafka-console-producer.sh --broker-list 192.168.31.243:9092 --topic test
4、消费消息,默认是消费最新的消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.31.243:9092 --topic test
5、创建多个分区主题
bin/kafka-topics.sh --create --zookeeper 192.168.31.239:2181 --replication-factor 1 --partitions 2 --topic test1
扩容分区
bin/kafka-topics.sh -alter --partitions 3 --zookeeper 192.168.31.239:2181 --topic test1
6、查看topic情况
bin/kafka-topics.sh --describe --zookeeper 192.168.31.239:2181 --topic test1
7、消费者的消费偏移量是消费者自己维护的,查看主题的消费偏移量
bin/kafka-consumer-groups.sh --bootstrap-server 192.168.31.243:9092 --describe --group test
kafka集群的搭建:
一、集群服务器
zookeeper 1台:192.168.31.239
kafka 2台: 192.168.31.243 192.168.31.244
1、修改kafka配置文件
在192.168.31.243服务器上面
vim config/server.properties
#broker.id属性在kafka集群中必须要是唯一
broker.id=0
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://192.168.31.243:9093
log.dir=/usr/local/data/kafka-logs-1
#kafka连接zookeeper的地址,要把多个kafka实例组成集群,对应连接的zookeeper必须相同
zookeeper.connect=192.168.31.239:2181
在192.168.31.244服务器上面
vim config/server.properties
#broker.id属性在kafka集群中必须要是唯一
broker.id=1
#kafka部署的机器ip和提供服务的端口号
listeners=PLAINTEXT://192.168.31.244:9093
log.dir=/usr/local/data/kafka-logs-1
#kafka连接zookeeper的地址,要把多个kafka实例组成集群,对应连接的zookeeper必须相同
zookeeper.connect=192.168.31.239:2181
2、分别启动两台kafka
bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-server-start.sh -daemon config/server.properties
3、测试
创建一个新的topic,副本数设置为3,分区数设置为2
bin/kafka-topics.sh --create --zookeeper 192.168.31.239:2181 --replication-factor 2 --partitions 2 --topic my-topic
查看topic信息
bin/kafka-topics.sh --describe --zookeeper 192.168.31.239:2181 --topic my-topic
向my-topic主题发送消息
bin/kafka-console-producer.sh --broker-list 192.168.31.243:9092,192.168.31.244:9092 --topic my-topic
消费消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.31.243:9092,192.168.31.244:9092 --from-beginning --topic my-topic