kafka詳解(02) - kafka_2.11-2.4.1安裝部署


kafka詳解(02) - kafka_2.11-2.4.1安裝部署

環境准備

下載安裝包

官網下載地址:https://kafka.apache.org/downloads.html

2.4.1版本下載地址:https://archive.apache.org/dist/kafka/2.4.1/kafka_2.11-2.4.1.tgz

Kafka安裝包版本使用scala_2.11的版本,即kafka_2.11-kafka版本號.tgz的安裝包,經測試scala_2.12的版本在增加topic時非常的慢,具體原因未定位, scala_2.13版本的未測試過

集群規划

hadoop102

hadoop103

hadoop104

zk

zk

zk

kafka

kafka

kafka

Zookeeper環境

Kafka集群需要依賴zookeeper環境,zookeeper安裝請參考文檔《Zookeeper詳解(02) - zookeeper安裝部署-單機模式-集群模式》

集群部署

上傳解壓

將安裝包kafka_2.11-2.4.1.tgz上傳到服務器的/opt/software並解壓到指定目錄

[hadoop@hadoop102 software]$ tar -zxvf kafka_2.11-2.4.1.tgz -C /opt/module/

修改解壓后的文件夾名稱

[hadoop@hadoop102 software]$ cd /opt/module/

[hadoop@hadoop102 module]$ mv kafka_2.11-2.4.1/ kafka

在/opt/module/kafka目錄下創建logs文件夾

[hadoop@hadoop102 module]$ cd kafka/

[hadoop@hadoop102 kafka]$ mkdir logs

修改配置文件

[hadoop@hadoop102 kafka]$ cd config/

[hadoop@hadoop102 config]$ vi server.properties

修改以下內容:

#broker的全局唯一編號,不能重復

broker.id=0

#刪除topic功能使能,當前版本此配置默認為true,已從配置文件移除

delete.topic.enable=true

#處理網絡請求的線程數量

num.network.threads=3

#用來處理磁盤IO的線程數量

num.io.threads=8

#發送套接字的緩沖區大小

socket.send.buffer.bytes=102400

#接收套接字的緩沖區大小

socket.receive.buffer.bytes=102400

#請求套接字的緩沖區大小

socket.request.max.bytes=104857600

#kafka運行日志存放的路徑

log.dirs=/opt/module/kafka/logs

#topic在當前broker上的分區個數

num.partitions=1

#用來恢復和清理data下數據的線程數量

num.recovery.threads.per.data.dir=1

#segment文件保留的最長時間,超時將被刪除

log.retention.hours=168

#配置連接Zookeeper集群地址,zookeeper的host必須要用主機名或hosts中配置的映射,不能直接使用ip地址,直接使用ip會出現無法連接的現象

zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181

分發安裝包

[hadoop@hadoop102 module]$ scp -r kafka/ hadoop103:/opt/module/

[hadoop@hadoop102 module]$ scp -r kafka/ hadoop104:/opt/module/

配置環境變量(可選)

sudo vim /etc/profile

#KAFKA_HOME

export KAFKA_HOME=/opt/module/kafka

export PATH=$PATH:$KAFKA_HOME/bin

立即生效

source /etc/profile

修改hadoop103和hadoop104上的broker.id

分別在hadoop103和hadoop104上修改配置文件/opt/module/kafka/config/server.properties中的broker.id=1、broker.id=2

集群中每個節點的broker.id不得重復

集群啟停

先啟動Zookeeper集群

[hadoop@hadoop102 module]$ myzookeeper.sh start

依次在hadoop102、hadoop103、hadoop104節點上啟動kafka

[hadoop@hadoop102 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties

[hadoop@hadoop103 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties

[hadoop@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties

查看進程

[hadoop@hadoop102 kafka]$ jpsall

=============== hadoop102 ===============

1237 QuorumPeerMain

1610 Kafka

=============== hadoop103 ===============

1236 QuorumPeerMain

1612 Kafka

=============== hadoop104 ===============

1603 Kafka

1236 QuorumPeerMain

停止kafka集群

[hadoop@hadoop102 kafka]$ bin/kafka-server-stop.sh stop

[hadoop@hadoop103 kafka]$ bin/kafka-server-stop.sh stop

[hadoop@hadoop104 kafka]$ bin/kafka-server-stop.sh stop

kafka群起腳本

[hadoop@hadoop102 kafka]$ vi /home/hadoop/bin/mykafka.sh

添加如下內容

#!/bin/bash

if [ $# -lt 1 ]

then

echo "Input Args Error....."

exit

fi

for i in hadoop102 hadoop103 hadoop104

do

case $1 in

start)

echo "==================START $i KAFKA==================="

ssh $i /opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties

;;

stop)

echo "==================STOP $i KAFKA==================="

ssh $i /opt/module/kafka/bin/kafka-server-stop.sh stop

;;

*)

echo "Input Args Error....."

exit

;;

esac

done

添加可執行權限

[hadoop@hadoop102 kafka]$ chmod +x /home/hadoop/bin/mykafka.sh

使用腳本啟動kafka集群

[hadoop@hadoop102 kafka]$ mykafka.sh start

使用腳本停止kafka集群

[hadoop@hadoop102 kafka]$ mykafka.sh stop

Kafka基礎命令行操作

1)創建topic

[hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic test

選項說明:

--topic 定義topic名

--replication-factor 定義副本數

--partitions 定義分區數

 

2)查看當前服務器中的所有topic

[hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list

3)向topic中發送消息

[hadoop@hadoop102 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic test

>hello kafka

>hello hadoop

4)從指定topic消費消息

從此刻最新發送到消息開始消費

[hadoop@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic test

加--from-beginning:會把主題中現有的所有的數據都讀取出來

[hadoop@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic test

5)查看某個Topic的詳情

[hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --describe –-topic test

6)修改分區數

bin/kafka-topics.sh --zookeeper hadoop102:2181 --alter –-topic test --partitions 6

7)刪除topic

[hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic test


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM