轉載
原文地址:https://www.jianshu.com/p/c74e0ec577b0
kafka依賴zookeeper,zookeeper安裝參見:https://www.cnblogs.com/liyuanhong/articles/12501001.html
jdk安裝
- 查看centos自帶的openjdk安裝包:
rpm -qa | grep openjdk
java-1.7.0-openjdk-headless-1.7.0.111-2.6.7.8.el7.x86_64
java-1.7.0-openjdk-1.7.0.111-2.6.7.8.el7.x86_64
-
卸載centos7的openjdk:
yum -y remove java-1.7.0-openjdk-headless-1.7.0.111-2.6.7.8.el7.x86_64 -
下載jdk1.8
http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html -
解壓jdk安裝包
tar -xvf jdk-8u201-linux-x64.tar.gz vi /etc/profile
JAVA_HOME=/home/wusong1/software/jdk1.8.0_201
JRE_HOME=/home/wusong1/software/jdk1.8.0_201/jre
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export JAVA_HOME JRE_HOME PATH CLASSPATH
-
是profile文件生效
sourece /etc/profile -
測試Java是否安裝成功
java -version
配置免密登錄
-
進入到.ssh文件夾中,如果沒有就自己創建該文件夾,生成公鑰私鑰
ssh-keygen -t rsa 對於提示 一路回車 -
將生成的公鑰 id_rsa.pub 追加到 authorized_keys文件中,
cat id_rsa.pub >> authorized_keys
這里需要注意修改authorized_keys文件的權限:
chmod 600 authorized_keys -
配置完成后本地測試一下,如果沒有提示輸入密碼則配置成功
ssh localhost -
對於分布式環境,只需要把對應的公鑰放到目標機器的authorized_keys里面即可免密登錄
我們要注意,.ssh目錄的權限為700,其下文件authorized_keys和私鑰的權限為600。否則會因為權限問題導致無法免密碼登錄。我們可以看到登陸后會有known_hosts文件生成。
Zookeeper安裝
-
下載zookeeper: https://mirrors.cnnic.cn/apache/zookeeper/
-
解壓: tar xvf zookeeper-3.4.13.tar.gz
-
修改配置文件:
-
dataDir=/opt/data/zookeeper/data
存儲快照文件的目錄,默認情況下, 事務日志也會存儲在該目
dataDir錄上。由於事務日志 的寫性能直接影響 ZooKeeper 性能,因此 建議同時配置參數 dataLogDir
dataLogDir=/opt/data/zookeeper/logs -
集群配直。首先在 3 台機器的/etc/hosts 文件中加入 3 台機器 的 IP 與機器域名映射, 域名自定義, 這里分別命名為 server-I、 server-2、 server-3, 3 台機器 IP與機器域名映射關 系如下:
10.211.55.4 server-1
10.211.55.5 server-2
10.211.55.6 server-3- 進入其中一台機器的Zookeeper安裝路徑conf,添加
server.1=server-1:2888:3888
server.2=server-2:2888:3888
server.3=server-3:2888:3888端口號2888表示該服務器與集群中leader交換信息的端口,默認為2888, 3888表示選舉時服務器相互通信的端口。
-
接着在${dataDir}路徑下創建一個myid文件,myid存放的值就是服務器的編號,即對應上面的1、2、3。ZooKeeper在啟動時會讀取 myid文件 中的值與 zoo.cfg文件中的配置信息進行比較, 以確定是哪台服務器。
-
將配置好的zoo.cfg拷貝到其他兩台機器,並分別創建對應的myid,為了操作方便,我們可以將Zookeeper相關環境變量添加到/etc/profile文件中,如:
export ZOOKEEPER_HOME=/home/wusong/software/zookeeper-3.4.13
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin:$ZOOKEEPER_HOME/bin -
驗證。 zkServer.sh start
zkServer.sh status
輸出下面類似結果表示安裝成功``` ZooKeeper JMX enabled by default Using config: /home/wusong/software/zookeeper-3.4.13/bin/../conf/zoo.cfg
-
Mode: follower
```
注意: centos7默認使用firewall作為防火牆,並默認開啟, 在啟動zk時需要關閉防護牆,不然無法通信,查看防火牆狀態: firewall-cmd --state 停止防火牆: systemctl stop firewalld.service 禁止防火牆開機啟動:systemctl disable firewalld.service
kafka安裝
-
下載 http://kafka.apache.org/downloads
當前 Kafka 最新版本為 kafka_2.12-2.1.0.tgz,其中 2.12 代表 Scala 版本, 2.1.0 表示 Kafka 的版本 -
解壓安裝 tar xvf kafka_2.12-2.1.0.tgz, 為了操作方便,這里我們對kafka的環境變量進行設置, 在/etc/profile 文件中加入kafka的安裝路徑,
export KAFKA_HOME=/home/wusong/software/kafka_2.12-2.1.0 PATH=\$PATH:\$JAVA_HOME/bin:\$JRE_HOME/bin:\$ZOOKEEPER_HOME/bin:\$KAFKA_HOME/bin
- 修改配置 。修改$KAFKA_HOME/config 目錄下的 server.prope叫es文件,為了便於后 續集群環境搭建的配置, 需要保證同一個集群下 broker.id要唯一,因此這里手動配置 broker.id, 直接保持與ZooKeeper的myid值一致, 同時配置日志存儲路徑。server.properties修改的配置 如下 :
broker.id=l #指定代理的 id
log.dirs=/opt/data/kafka- logs #指定 Log 存儲路徑
zookeeper . connect=server-1:2181 , server - 2:2181 , server-3:2181
在三台機器上分別修改配置文件server.properties, 並修改對應的broker.id.
-
啟動。
kafka-server-start.sh -daemon ../config/server.properties
執行 jps命令查看 Java進程,此時進程信息至少包括以下幾項:
15976 Jps
14999 QuorumPeerMain
15906 Kafka -
通過 ZooKeeper 客戶端登錄 ZooKeeper 查看目錄結構,執行以下命令:
zkCli.sh -server server 1:2181 #登錄 ZooKeeper
ls / #查看 ZooKeeper 目錄結構
ls /brokers/ ids 輸出 [1, 2, 3]由/brokers/ids 節點存儲的元數據可知, 3台機器的 Kafka 均已正常己啟動。至此, Kafka 分布式環境搭建過程介紹完畢。
kafka實戰
- 創建一個擁有3個副本的topic:
kafka-topics.sh --create --zookeeper 10.211.55.6:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
kafka-topics.sh --list --zookeeper 10.211.55.5:2181
my-replicated-topic
現在我們搭建了一個集群,怎么知道每個節點的信息呢?運行“"describe topics”命令就可以了:
kafka-topics.sh --describe --zookeeper server-1:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
第一行是對所有分區的一個描述,然后每個分區都會對應一行,因為我們只有一個分區所以下面就只加了一行。
leader:負責處理消息的讀和寫,leader是從所有節點中隨機選擇的.
replicas:列出了所有的副本節點,不管節點是否在服務中.
isr:是正在服務中的節點.
在我們的例子中,節點2是作為leader運行。
修改topic過期時間
默認是7天 修改為3天
kafka-topics.sh --zookeeper server-1:2181 -topic xxxx --alter --config retention.ms=259200000
-
往topic發送消息:
kafka-console-producer.sh --broker-list server-1:9092 --topic my-replicated-topic -
消費這些消息:
kafka-console-consumer.sh --bootstrap-server server-1:9092 --from-beginning --topic my-replicated-topic
- 測試一下容錯能力:
broker2作為leader運行,現在kill掉:
jps
22839 QuorumPeerMain
22334 Jps
1999 Kafka
[wusong@centos-linux-2 config]$ kill -9 1999
kafka-topics.sh --describe --zookeeper server-1:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 3 Replicas: 2,3,1 Isr: 3,1
現在leader為3,
雖然最初負責續寫消息的leader down掉了,但之前的消息還是可以消費的:
kafka-console-consumer.sh --bootstrap-server server-1:9092 --from-beginning --topic my-replicated-topic
hello
world
nihao
yes
hello