大數據 -- zookeeper和kafka集群環境搭建


一 運行環境

從阿里雲申請三台雲服務器,這里我使用了兩個不同的阿里雲賬號去申請雲服務器。我們配置三台主機名分別為zy1,zy2,zy3。

我們通過阿里雲可以獲取主機的公網ip地址,如下:

 通過secureRCT連接主機106.15.74.155,運行ifconfig,可以查看其內網ip地址:

1、賬號1申請了兩台雲服務器:

主機zy1的公網ip為:106.15.74.155,內網ip為172.19.182.67。

主機zy2的公網ip為:47.103.134.70,內網ip為172.19.14.178。

2、賬號2申請了一台雲服務器:

主機zy3的公網ip為:47.97.10.51,內網ip為172.16.229.255。

3、阿里雲入規則配置

由於主機位於不同的局域網下,因此需要進行一個公網端口到內網端口的映射。在搭建zookeeper和kafka需要使用到2181,2888 ,3888,9092端口。需要在阿里雲中配置入規則,具體可以參考阿里雲官方收藏:同一個地域、不同賬號下的實例實現內網互通 

注意:如果47.103.134.70配置一個入端口3888,那么對該47.103.134.70:3888的訪問會實際映射到172.19.14.178:3888下。如果是同一局域網下的兩個主機,是不需要配置這個的,可以直接互通。

如果想了解更多,可以參考以下博客:

通過SSH訪問阿里雲服務器的原理可以參考-用SSH訪問內網主機的方法

雲服務器主機內網ip和外網ip的區別

一台阿里雲2台騰訊雲服務器搭建Hadoop集群

4、配置/etc/hosts

以主機zy1為例:配置如下:

注意zy1對應的ip需要配置為內網ip,也就是本機ip:172.19.182.67。而zy2、zy3配置的都是公網ip。

二 JDK安裝

在每個主機下執行以下操作:

1、安裝之前先查看一下有無系統自帶jdk

rpm -qa |grep java

rpm -qa |grep jdk

rpm -qa |grep gcj

如果有就使用批量卸載命令

rpm -qa | grep java | xargs rpm -e --nodeps 

2、直接yum安裝1.8.0版本openjdk

進入 Oracle 官方網站 下載合適的 JDK 版本,准備安裝:

/usr目錄下創建java目錄:

mkdir /usr/java
cd /usr/java

解壓 JDK:

tar -zxvf jdk-8u231-linux-x64.tar.gz  -C /usr/java

3、配置環境變量

JAVA_HOME=/usr/java/jdk1.8.0_231        
JRE_HOME=/usr/java/jdk1.8.0_231/jre     
CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
export JAVA_HOME JRE_HOME CLASS_PATH PATH

使得配置生效

. /etc/profile

4、查看版本

echo $JAVA_HOME  
echo $CLASSPATH
java -version

三 安裝zookeeper

在主機zy1下面執行以下操作:

1、下載並解壓

wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz

創建目錄/opt/bigdata:

mkdir /opt/bigdata

解壓文件到/opt/bigdata:

tar -zxvf  zookeeper-3.4.13.tar.gz  -C /opt/bigdata

跳轉目錄:

cd /opt/bigdata/zookeeper-3.4.13/

2、復制配置文件

cp conf/zoo_sample.cfg  conf/zoo.cfg

修改配置文件如下:

vim conf/zoo.cfg

其中部分參數意義如下:

  • server.1=zy1:2888:3888:server.1 這個1是服務器的標識也可以是其他的數字, 表示這個是第幾號服務器,用來標識服務器,這個標識要寫到快照目錄下面myid文件里。第一個端口是master和slave之間的通信端口,默認是2888,第二個端口是leader選舉的端口,集群剛啟動的時候選舉或者leader掛掉之后進行新的選舉的端口默認是3888
  • dataDir:快照日志的存儲路徑。
  • dataLogDir:事物日志的存儲路徑,如果不配置這個那么事物日志會默認存儲到dataDir制定的目錄,這樣會嚴重影響zk的性能,當zk吞吐量較大的時候,產生的事物日志、快照日志太多。
  • clientPort:這個端口就是客戶端連接 zookeeper 服務器的端口,zookeeper 會監聽這個端口,接受客戶端的訪問請求。

3、創建myid文件

創建/opt/bigdata/data/zookeeper/zkdata:

mkdir -vp /opt/bigdata/data/zookeeper/zkdata

創建myid文件:

echo 1 > /opt/bigdata/data/zookeeper/zkdata/myid

4、拷貝zookeeper到主機zy2、zy3

scp -r /opt/bigdata/zookeeper-3.4.13/ zy2:/opt/bigdata/
scp -r /opt/bigdata/zookeeper-3.4.13/ zy3:/opt/bigdata/

5、創建主機zy2、zy3的myid文件

zyx主機:

創建/opt/bigdata/data/zookeeper/zkdata:

mkdir -vp /opt/bigdata/data/zookeeper/zkdata

創建myid文件:

echo x > /opt/bigdata/data/zookeeper/zkdata/myid

注意:x表示主機的編號。

6、配置環境變量(每個主機都需要配置)

vim /etc/profile
#set java environment   , append

export ZOOKEEPER_HOME=/opt/bigdata/zookeeper-3.4.13

export PATH=$ZOOKEEPER_HOME/bin:$PATH

使得配置生效

. /etc/profile

7、啟動服務並查看

進入到zookeeper目錄下,在每個主機下分別執行

cd /opt/bigdata/zookeeper-3.4.13
bin/zkServer.sh start

檢查服務狀態

bin/zkServer.sh status

可以用“jps”查看zk的進程,這個是zk的整個工程的main

jps

注意:zk集群一般只有一個leader,多個follower,主一般是響應客戶端的讀寫請求,而從同步數據,當主掛掉之后就會從follower里投票選舉一個leader出來。

8、客戶端連接

zookeeper服務開啟后,進入客戶端的命令:

zkCli.sh

更多常用命令參考博客:Kafka在zookeeper中存儲結構和查看方式

9、出現錯誤常用排錯手段

1、防火牆

防火牆沒有關閉問題。解決方式參考:https://blog.csdn.net/weiyongle1996/article/details/73733228

2、端口沒有開啟

如果/etc/hosts全部配置為公網:在zy1運行zkServer.sh start,查看端口開啟狀態:

netstat -an | grep 3888

則會發現無法開啟公網3888端口,我們應該打開的是內網機器對應的端口。
如果端口已經開啟,可以通過telnet ip port判斷該端口是否可以從外部訪問。

四 安裝kafka

在主機zy1下執行:

1、下載並解壓

wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.1.1/kafka_2.12-2.1.1.tgz
tar -zxvf kafka_2.12-2.1.1.tgz  -C /opt/bigdata

重命名:

cd /opt/bigdata/
mv kafka_2.12-2.1.1 kafka

2、修改kafka配置文件

 在/opt/bigdata/kafka下:

vim config/server.properties

各個參數意義:

  • broker.id=1  #當前機器在集群中的唯一標識,和zookeeper的myid性質一樣;
  • listeners=PLAINTEXT://主機:9092  #當前kafka對外提供服務的主機:端口(默認是9092);
  • num.network.threads=3  #這個是borker進行網絡處理的線程數;
  • num.io.threads=8 #這個是borker進行I/O處理的線程數;
  • log.dirs=/opt/kafka/kafkalogs/ #消息存放的目錄,這個目錄可以配置為“,”逗號分割的表達式,上面的num.io.threads要大於這個目錄的個數這個目錄,如果配置多個目錄,新創建的topic他把消息持久化的地方是,當前以逗號分割的目錄中,那個分區數最少就放那一個;
  • socket.send.buffer.bytes=102400 #發送緩沖區buffer大小,數據不是一下子就發送的,先回存儲到緩沖區了到達一定的大小后在發送,能提高性能;
  • socket.receive.buffer.bytes=102400 #kafka接收緩沖區大小,當數據到達一定大小后在序列化到磁盤;
  • socket.request.max.bytes=104857600 #這個參數是向kafka請求消息或者向kafka發送消息的請請求的最大數,這個值不能超過java的堆棧大小;
  • num.partitions=1 #默認的分區數,一個topic默認1個分區數;
  • log.retention.hours=168 #默認消息的最大持久化時間,168小時,7天;
  • message.max.byte=5242880  #消息保存的最大值5M;
  • default.replication.factor=2  #kafka保存消息的副本數,如果一個副本失效了,另一個還可以繼續提供服務;
  • replica.fetch.max.bytes=5242880  #取消息的最大直接數;
  • log.segment.bytes=1073741824 #這個參數是:因為kafka的消息是以追加的形式落地到文件,當超過這個值的時候,kafka會新起一個文件;
  • log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時間(log.retention.hours=168 ),到目錄查看是否有過期的消息如果有,刪除;
  • log.cleaner.enable=false #是否啟用log壓縮,一般不用啟用,啟用的話可以提高性能;
  • zookeeper.connect=xx:12181,xx:12181,xx:12181#設置zookeeper的連接端口;

 注意,這里如果希望在java中創建topic也是多個備份,需要添加一下屬性

#default replication factors for automatically created topics,默認值1;

default.replication.factor=3

#When a producer sets acks to "all" (or "-1"), this configuration specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful.

#min.insync.replicas and acks allow you to enforce greater durability guarantees,默認值1;

min.insync.replicas=3

上面是參數的解釋,實際的修改項為:

broker.id=1

listeners=PLAINTEXT://zy1:9092           #內網地址

advertised.listeners=PLAINTEXT://106.15.74.155:9092   #公網地址(不然遠程客戶端無法訪問)

log.dirs=/opt/bigdata/kafka/kafka-logs

#此外,可以在log.retention.hours=168 下面新增下面三項:

message.max.byte=5242880

default.replication.factor=2

replica.fetch.max.bytes=5242880

#設置zookeeper的連接端口

zookeeper.connect=zy1:2181,zy2:2181,zy3:2181 

如果我們需要刪除topic,還需要配置一下內容:

delete.topic.enable=true

具體參考博客:kafka安裝及刪除TopicKafka0.8.2.1刪除topic邏輯

3、復制kafka到zy2、zy3

scp -r /opt/bigdata/kafka zy2:/opt/bigdata/
scp -r /opt/bigdata/kafka zy3:/opt/bigdata/

4、修改zy2、zy3的配置文件server.properties

拷貝文件過去的其他兩個節點需要更改broker.id和listeners,以zy2為例:

5、啟動kafka

我們可以根據Kafka內帶的zk集群來啟動,但是建議使用獨立的zk集群:

zkServer.sh start
在/opt/bigdata/kafka下 ,三個節點分別執行如下命令,啟動kafka集群:
bin/kafka-server-start.sh config/server.properties &

運行命令后服務確實后台啟動了,但日志會打印在控制台,而且關掉命令行窗口,服務就會隨之停止,這個讓我挺困惑的。后來,參考了其他的啟動腳本,通過測試和調試最終找到了完全滿足要求的命令。

bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &

其中1>/dev/null 2>&1 是將命令產生的輸入和錯誤都輸入到空設備,也就是不輸出的意思。/dev/null代表空設備。

注意:如果內存不足:打開kafka安裝位置,在bin目錄下找到kafka-server-start.sh文件,將export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"修改為export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"。

6、驗證

思路:以下給出幾條kafka指令。創建一個topic,一個節點作為生產者,兩個節點作為消費者分別看看能否接收數據,進行驗證:

創建及查看topic:

cd /opt/big/data/kafka
bin/kafka-topics.sh --list --zookeeper  zy1:2181
bin/kafka-topics.sh --create --zookeeper  zy1:2181 --replication-factor 3 --partitions 3  --topic zy-test

開啟生產者:

bin/kafka-console-producer.sh --broker-list zy1:9092 --topic zy-test

開啟消費者:

bin/kafka-console-consumer.sh --bootstrap-server zy2:9092 --topic zy-test --from-beginning 

 

節點zy1產生消息,如果消息沒有清理,在節點zy2、zy3都可以接收到消息。

注意:更多命令的參數可以通過類似如下命令 bin/kafka-console-consumer.sh 查看。

7、更多kafka命令

以下是kafka常用命令行總結:  

查看topic的詳細信息  

 bin/kafka-topics.sh -zookeeper zy1:2181 --describe --topic zy-test

可以看到topic包含3個復本,每個副本又分為三個partition。以zy-test:partition0為例,其leader保存在broker.id=1的主機上,副本保存在2、3節點上。其消息保存在配置參數log.dirs所指定的路徑下:

為topic增加副本  

bin/kafka-reassign-partitions.sh --zookeeper zy1:2181 --reassignment-json-file  json/partitions-to-move.json -execute  

創建topic 

bin/kafka-topics.sh --create --zookeeper  zy1:2181 --replication-factor 3 --partitions 3  --topic zy-test

為topic增加partition  

bin/kafka-topics.sh –-zookeeper zy1:2181 –-alter –-partitions 3 –-topic  zy-test

kafka生產者客戶端命令  

bin/kafka-console-producer.sh --broker-list zy1:9092 --topic zy-test

kafka消費者客戶端命令  

bin/kafka-console-consumer.sh --bootstrap-server zy2:9092 --topic zy-test --from-beginning 

kafka服務啟動  

bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &

刪除topic  

 bin/kafka-topics.sh --zookeeper zy1:2181 --delete --topic  zy-test

8、關閉kafka

bin/kafka-server-stop.sh

五 consumer offsets

由於Zookeeper並不適合大批量的頻繁寫入操作,新版Kafka已推薦將consumer的位移信息保存在kafka內部的topic中,即__consumer_offsets topic,並且默認提供了kafka_consumer_groups.sh腳本供用戶查看consumer信息。

1、獲取消息在topic中的記錄信息

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list zy1:9092 --topic zy-test

輸出結果每個字段分別表示topic、partition、untilOffset(當前partition的最大偏移);

上面的輸出結果表明kafka隊列總有產生過4條消息(這並不代表kafka隊列現在一定有4條消息,因為kafka有兩種策略可以刪除舊數據:基於時間、基於大小)。

由於我使用kafka-console-producer.sh生成了四條消息:zy、19941108、 liuyan、1。因此kafka消息隊列中存在4條消息。

2、創建一個console consumer group

bin/kafka-console-consumer.sh --bootstrap-server zy1:9092 --consumer.config config/consumer.properties --topic zy-test  --from-beginning

 

再次建立消費者:

會發現獲取不到數據。這是因為我們指定了消費組,第一次消費時從offset為0開始消費,把4條消息全部讀出,此時offset移動到最后,當再次使用同一消費組讀取數據,則會從上次的offset開始獲取數據。

而使用:

bin/kafka-console-consumer.sh --bootstrap-server zy1:9092 --topic zy-test --from-beginning 

每次都會獲取四條數據,這是因為每次都會創建一個新的消費者,這些消費者會被隨機分配到一個不同的組,因此每次都是從offset為0開始消費。

參數解釋:

3、 獲取該consumer group的group id

bin/kafka-consumer-groups.sh --bootstrap-server zy1:9092 --list  

可以看到有三個消費組,前兩個消費者沒有指定消費組,隨機產生一個console-consumer-***的group.ig。

第三個是我們剛剛在config/consumer.properties 中指定的消費組。

4、查看消費者組的offset

 bin/kafka-consumer-groups.sh --bootstrap-server zy1:9092 --describe --group test-consumer-group

如果此時再使用生產者客戶端生成兩條消息:

再次查看消費組test-consumer-group的消費情況:

5、KAFKA API指定位移消費

由於我們還沒有介紹KAFKA的API,這塊內容就不先介紹,具體參考博客:Kafka消費者 之 指定位移消費

六 各個端口作用

  • 2888:zookeeper集群三台主機心跳端口;
  • 3888:zookeeper集群三台主機選取leader端口,防止其中一個宕機了;
  • 2181:zookeeper服務器監聽端口,等待消費者連接,消費者可以從中獲取topic分區以及消費offset等信息(高版本消費者offset已經保存在kafka內部的topic中了);
  • 9092:kafka服務器綁定端口,等待生產者和消費者連接;

因此上面介紹的kafka命令,與topic相關的使用--zookeeper zy1:2181,與生產者、消費者相關的使用 --bootstrap-server zy1:9092。

參考博客:

[1]kafka和zookeeper集群搭建詳細步驟

[2]yum安裝jdk環境變量配置

[3]使用命令讀取kafka的內部topic:__consumer_offsets

[4]kafka學習筆記:知識點整理 - cyfonly - 博客園(推薦)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM