集群搭建:
[root@localhost kafka_2.11-0.10.2.1]# cat config/server.properties | grep -v ^$ | grep -v ^#
broker.id=0
listeners=PLAINTEXT://node1:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=node1:2181,node2:2181,node3:2181
zookeeper.connection.timeout.ms=6000
listeners=PLAINTEXT://node1:9092, 配置物理機器的hostname, 一定要是hostname, 每個節點單獨配置自己的
腳本啟動不管用的, 把環境變量配置在 ~/.bashrc下, 因為ssh分為登陸和非登陸, 讀取配置文件的順序不同
也可以使用如下配置, 公司一個大牛做的
broker.id=2 delete.topic.enable=true port=9092 advertised.host.name=10.183.222.194 num.network.threads=6 num.io.threads=6 message.max.bytes=10485760 log.index.size.max.bytes=104857600 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 replica.fetch.max.bytes=104857600 log.dirs=/data/hadoop/data1/kafka/log,/data/hadoop/data2/kafka/log,/data/hadoop/data3/kafka/log,/data/hadoop/data4/kafka/log,/data/hadoop/data5/kafka/log,/data/hadoop/data6/kafka/log num.partitions=1 num.recovery.threads.per.data.dir=1 log.retention.hours=48 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 replica.socket.receive.buffer.bytes=1048576 num.replica.fetchers=6 replica.lag.max.messages=100000 zookeeper.connect=10.183.222.192:2181,10.183.222.193:2181,10.183.222.194:2181/rdp_test_kafka zookeeper.connection.timeout.ms=15000 auto.create.topics.enable=true auto.leader.rebalance.enable=true compression.type=gzip
配置zookeeper的路徑, 以便在同一個zookeeper下進行區分, 方便管理
如果配置 了路徑, 那么創建topic之類的 時候, 需要指定路徑
/opt/install/kafka_2.13-2.4.1/bin/kafka-topics.sh --zookeeper 10.144.91.9:2181,10.144.91.10:2181,10.144.91.11:2181/cdn_kafka --create --topic test1 --partitions 3 --replication-factor 2
啟動腳本:
#!/bin/bash brokers="node1 node2 node3" kafka_home="/usr/local/kafka_2.11-0.10.2.1" for i in $brokers do echo "Starting kafka on ${i} ... " ssh ${i} "source /etc/profile; nohup sh ${kafka_home}/bin/kafka-server-start.sh ${kafka_home}/config/server.properties > /dev/null 2>&1 &" if [[ $? -ne 0 ]]; then echo "Start kafka on ${i} is OK !" fi done echo kafka kafka are started !
停止腳本:
#!/bin/bash brokers="node1 node2 node3" kafka_home="/usr/local/kafka_2.11-0.10.2.1" for i in $brokers do echo "Stopping kafka on ${i} ..." ssh ${i} "source /etc/profile;bash ${kafka_home}/bin/kafka-server-stop.sh" if [[ $? -ne 0 ]]; then echo "Stopping ${kafka_home} on ${i} is down" fi done echo all kafka are stopped ! exit 0
腳本啟動不管用的, 把環境變量配置在 ~/.bashrc下, 因為ssh分為登陸和非登陸, 讀取配置文件的順序不同
整合啟動腳本
#!/bin/bash #set -x brokers="node1 node2 node3 node4" kafka_home="/usr/local/kafka_2.11-0.10.2.1" start() { for i in $brokers do echo "Starting kafka on ${i} ... " ssh root@$i "source /etc/profile; nohup sh ${kafka_home}/bin/kafka-server-start.sh ${kafka_home}/config/server.properties > /dev/null 2>&1 &" if [[ $? -ne 0 ]]; then echo "Start kafka on ${i} is OK !" fi done echo kafka kafka are started ! } stop() { for i in $brokers do echo "Stopping kafka on ${i} ..." ssh root@$i "source /etc/profile;bash ${kafka_home}/bin/kafka-server-stop.sh" if [[ $? -ne 0 ]]; then echo "Stopping ${kafka_home} on ${i} is down" fi done echo all kafka are stopped ! } case "$1" in start) start ;; stop) stop ;; *) echo "Usage: start|stop" ;; esac