一.環境准備
當前環境:centos7.3一台
軟件版本:kafka_2.12
部署目錄:/usr/local/kafka
啟動端口:9092
配置文件:/usr/local/kafka/config/server.properties
yum依賴yum install java-1.8.0-openjdk
需要部署zookeeper單點
二.安裝
1.下載kafka包wget http://mirrors.hust.edu.cn/apache/kafka/2.1.0/kafka_2.12-2.1.0.tgz
2.解壓並移動,然后創建日志目錄tar zxvf kafka_2.12-0.10.2.1.tgzmv kafka_2.12-0.10.2.1 /usr/local/kafkamkdir /var/log/kafka
3.修改配置文件,將最后面的zookeeper地址修改vim /usr/local/kafka/config/server.properties
#連接zookeeper地址端口zookeeper.connect=127.0.0.1:2181
三.使用驗證
啟動/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
檢查netstat -unltp | grep 9092
一.環境准備
當前環境:centos7.3三台
軟件版本:kafka_2.12
部署目錄:/usr/local/kafka
啟動端口:9092
配置文件:/usr/local/kafka/config/server.properties
yum依賴(3台同時操作)yum install java-1.8.0-openjdk
需要部署zookeeper集群
二.安裝
1.下載kafka包(3台節點都執行)wget http://mirror.bit.edu.cn/apache/kafka/0.10.2.1/kafka_2.12-0.10.2.1.tgz
2.解壓並移動,然后創建日志目錄(3台節點都執行)tar zxvf kafka_2.12-0.10.2.1.tgzmv kafka_2.12-0.10.2.1 /usr/local/kafkamkdir /var/log/kafka
3.修改配置文件(3台同時操作,需要更改的地方不一樣)vim /usr/local/kafka/config/server.properties
#此為第一台,第二台為2 第三台為3broker.id=1# Switch to enable topic deletion or not, default value is falsedelete.topic.enable=true#本機開啟端口和監聽端口advertised.host.name=192.168.1.189# The number of threads handling network requestsnum.network.threads=3# The number of threads doing disk I/Onum.io.threads=8# The send buffer (SO_SNDBUF) used by the socket serversocket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket serversocket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)socket.request.max.bytes=104857600#日志目錄log.dirs=/var/log/kafka#開啟10個分區num.partitions=10#kafka保存消息的副本數default.replication.factor=3# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.# This value is recommended to be increased for installations with data dirs located in RAID array.num.recovery.threads.per.data.dir=1#持久化時間log.retention.hours=48# The maximum size of a log segment file. When this size is reached a new log segment will be created.log.segment.bytes=1073741824# to the retention policieslog.retention.check.interval.ms=300000#連接zookeeper地址端口zookeeper.connect=192.168.1.189:2181,192.168.1.190:2181,192.168.1.191:2181# Timeout in ms for connecting to zookeeperzookeeper.connection.timeout.ms=6000
三.使用驗證
啟動(3台都需要啟動)/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
檢查netstat -unltp | grep 9092
一.基本操作
1.啟動kafkabin/kafka-server-start.sh -daemon config/server.properties
2.關閉kafkabin/kafka-server-stop.sh
3.查看kafka topic是否支持集群,沒反應就是正確bin/kafka-topics.sh --describe --zookeeper 1.1.1.1:2181,1.1.1.2:2181,1.1.1.3:2181 --topic MyTopic
4.查看當前zookerrper下的kafka集群所有的topicbin/kafka-topics.sh --list --zookeeper 1.1.1.1:2181
5.詳細查看topicbin/kafka-topics.sh --describe --zookeeper 1.1.1.1:2181 --topic topic名字
6.創建一個topic,副本備份數1個,分區數1個bin/kafka-topics.sh --create --zookeeper 1.1.1.1:2181 --replication-factor 1 --partitions 1 --topic topic名字
7.刪除一個topicbin/kafka-topics.sh --zookeeper 1.1.1.1:2181 --delete --topic topic名字
8.改變集群模式為主主,切換主從到主主模式,解決主從模式下,從從選舉時間問題bin/kafka-preferred-replica-election.sh --zookeeper 1.1.1.1:2181
二.生產消費測試
以 sync 模式啟動一個producer,info.test是topic名bin/kafka-console-producer.sh --broker-list 1.1.1.1:9092,1.1.1.2:9093,1.1.1.3:9094 --sync --topic info.test
然后,輸入以下內容:Hello, world!
啟動一個 consumer,在另一個終端運行:bin/kafka-console-consumer.sh --zookeeper 1.1.1.1:2181 --topic info.test --from-beginning
觀察輸出,你會看到下面內容:
Hello, world!
