一、zookeeper介紹:
ZooKeeper是一個分布式且開源的分布式應用程序協調服務。
zookeeper集群特性:整個集群種只要有超過集群數量一半的zookeeper工作只正常的,那么整個集群對外就是可用的,假如有2台服務器做了一個zookeeper集群,只要有任何一台故障或宕機,那么這個zookeeper集群就不可用了,因為剩下的一台沒有超過集群一半的數量,但是假如有三台zookeeper組成一個集群,那么損壞一台就還剩兩台,大於3台的一半,所以損壞一台還是可以正常運行的,但是再損壞一台就只剩一台集群就不可用了。那么要是4台組成一個zookeeper集群,損壞一台集群肯定是正常的,那么損壞兩台就還剩兩台,那么2台不大於集群數量的一半,所以3台的zookeeper集群和4台的zookeeper集群損壞兩台的結果都是集群不可用,一次類推5台和6台以及7台和8台都是同理,所以這也就是為什么集群一般都是奇數的原因。
1、安裝JDK環境
1、將下載的JDK包解壓(有幾個集群主機,就配置幾個)
[root@web1 src]# tar xvf jdk-8u212-linux-x64.tar.gz [root@web1 src]# ln -s /usr/local/src/jdk1.8.0_212/ /usr/local/jdk [root@web1 src]# ln -s /usr/local/jdk/bin/java /usr/bin
2、配置JDK環境
[root@web1 src]# vim /etc/profile.d/jdk.sh export HISTTIMEFORMAT="%F %T `whoami`" export export LANG="en_US.utf-8" export JAVA_HOME=/usr/local/jdk export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar export PATH=$PATH:$JAVA_HOME/bin [root@web1 src]# . /etc/profile.d/jdk.sh
2、zookeeper安裝
zookeeper 下載地址:http://zookeeper.apache.org/releases.html
不要下載最新和太舊的版本,盡量使用較新的版本。
1、將下載下來的zookeeper包進行解壓,並創建軟鏈接。(有幾個集群主機,配置幾個)
[root@web1 src]# tar xvf zookeeper-3.4.14.tar.gz [root@web1 src]# ln -s /usr/local/src/zookeeper-3.4.14 /usr/local/zookeeper
2、修改zookeeper配置文件,實現zookeeper集群功能
[root@web1 conf]# pwd /usr/local/src/zookeeper-3.4.14/conf [root@web1 conf]# cp zoo_sample.cfg zoo.cfg # 復制zookeeper配置文件 [root@web1 conf]# mkdir /usr/local/zookeeper/data # 有幾個zookeeper集群就創建一個目錄,存放zookeeper數據的位置 [root@web1 conf]# vim zoo.cfg # 修改zookeeper配置文件 dataDir=/usr/local/zookeeper/data # 自定義zookeeper保存數據目錄 clientPort=2181 # 客戶端端口號 maxClientCnxns=4096 #客戶端最大連接數 autopurge.snapRetainCount=512 # 設置zookeeper保存多少次的客戶端數據 autopurge.purgeInterval=1 # 設置zookeeper間隔多少小時清理一次保存的客戶端數據 server.1=192.168.7.104:2888:3888 # 如果有三個集群,就寫三個對應的IP地址。 server.2=192.168.7.105:2888:3888
3、對每個zookeeper集群設置一個myid,必須與上面的zoo.cfg配置文件下面的server.xxx的ID對應,有幾個集群就需要設置幾個ID。
[root@web1 conf]# echo 1 > /usr/local/zookeeper/data/myid [root@tomcat-web2]# echo 2 > /usr/local/zookeeper/data/myid
4、啟動zookeeper服務
[root@web1 ~]# /usr/local/zookeeper/bin/zkServer.sh start #啟動zookeeper服務器 [root@web1 ~]# /usr/local/zookeeper/bin/zkServer.sh status # 查看啟動的狀態,如果不是以下的狀態,說明集群失敗 ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg Mode: follower # 從服務器 [root@tomcat-web2 bin]# /usr/local/zookeeper/bin/zkServer.sh start [root@tomcat-web2 bin]# /usr/local/zookeeper/bin/zkServer.sh status #查看zookeeper啟動狀態 ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg Mode: leader # 主服務器
5、將zookeeper設置為開機啟動
[root@tomcat-web2 src]# vim /etc/rc.d/rc.local /usr/local/zookeeper/bin/zkServer.sh start # 設置為開機啟動 [root@tomcat-web2 src]# chmod +x /etc/rc.d/rc.local #加上執行權限
二、kafka簡介:
Kafka 被稱為下一代分布式消息系統,是非營利性組織ASF(Apache Software Foundation,簡稱為ASF)基金會中的一個開源項目,比如HTTP Server、Hadoop、ActiveMQ、Tomcat等開源軟件都屬於Apache基金會的開源軟件,類似的消息系統還有RbbitMQ、ActiveMQ、ZeroMQ,最主要的優勢是其具備分布式功能、並且結合zookeeper可以實現動態擴容。
kafka要想正常運行,必須配置zookeeper,否則無論是kafka集群還是客戶端的生存者和消費者都無法正常的工作的;所以需要配置啟動zookeeper服務。
官方文檔:http://www.infoq.com/cn/articles/apache-kafka

kafka術語解釋
Broker
Kafka集群包含一個或多個服務器,這種服務器被稱為broker
Topic
每條發布到Kafka集群的消息都有一個類別,這個類別被稱為topic。(物理上不同topic的消息分開存儲,邏輯上一個topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的topic即可生產或消費數據而不必關心數據存於何處)
Partition
parition是物理上的概念,每個topic包含一個或多個partition,創建topic時可指定parition數量。每個partition對應於一個文件夾,該文件夾下存儲該partition的數據和索引文件
Producer
負責發布消息到Kafka broker
Consumer
消費消息。每個consumer屬於一個特定的consuer group(可為每個consumer指定group name,若不指定group name則屬於默認的group)。使用consumer high level API時,同一topic的一條消息只能被同一個consumer group內的一個consumer消費,但多個consumer group可同時消費這一消息。
1、下載並安裝kafka
kafka下載地址:http://kafka.apache.org/downloads.html
1、將下載的kafka包進行解壓,並創建軟鏈接(有幾個集群主機就配置幾個)
[root@tomcat-web2 src]# tar xvf kafka_2.12-2.1.0.tgz #解壓kafka包 [root@web1 src]# ln -s /usr/local/src/kafka_2.12-2.1.0 /usr/local/kafka # 設置軟鏈接
2、修改kafka配置文件(有幾個集群主機,就配置幾個)
[root@tomcat-web2 kafka_2.12-2.1.0]# pwd /usr/local/src/kafka_2.12-2.1.0 [root@tomcat-web2 kafka_2.12-2.1.0]# vim config/server.properties # 修改kafka配置文件 broker.id=1 # 與zookeeper的ID一致,有幾個kafka主機,就在每個kafka寫入對應的zookeeper主機ID listeners=PLAINTEXT://192.168.7.104:9092 # 寫成本地的服務器地址,針對不同的集群,在不同的kafka主機寫對應的IP地址 log.dirs=/usr/local/kafka/kafka-logs # 日志存放目錄 log.retention.hours=24 # 保留日志的時間,以小時為單位 zookeeper.connect=192.168.7.104:2181,192.168.7.105:2181 # 設置與zookeeper主機的連接地址,有幾個集群,就配置幾個IP地址。
3、啟動kafka服務,以守護進程的方式啟動
# /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
查看此時kafka啟動狀態,第一台kafka已經啟動

查看第二台kafka主機狀態,也已經啟動

4、將kafka設置為開機啟動(經測試暫時未能成功,但是可以在開機啟動時候使用kafka的service命令將kafka啟動)
(1)配置kafka的環境變量
[root@web1 init.d]# vim /etc/profile.d/kafka.sh
export KAFKA_HOME="/usr/local/kafka"
export PATH="${KAFKA_HOME}/bin:$PATH"
[root@web1 init.d]# . /etc/profile.d/kafka.sh
(2)在/etc/init.d/目錄下編輯一個啟動腳,並加上執行權限chmod +x kafka
#!/bin/sh
#
# chkconfig: 345 99 01
# description: Kafka
#
# File : Kafka
#
# Description: Starts and stops the Kafka server
#
source /etc/rc.d/init.d/functions
KAFKA_HOME=/usr/local/kafka # 修改kafka的存放路徑
KAFKA_USER=root # 使用root啟動
export LOG_DIR=/usr/local/kafka/kafka-logs # 定義kafka的log存放位置,與前面的kafka配置文件有關
[ -e /etc/sysconfig/kafka ] && . /etc/sysconfig/kafka
# See how we were called.
case "$1" in
start)
echo -n "Starting Kafka:"
/sbin/runuser -s /bin/sh $KAFKA_USER -c "nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties > $LOG_DIR/server.out 2> $LOG_DIR/server.err &"
echo " done."
exit 0
;;
stop)
echo -n "Stopping Kafka: "
/sbin/runuser -s /bin/sh $KAFKA_USER -c "ps -ef | grep kafka.Kafka | grep -v grep | awk '{print \$2}' | xargs kill"
echo " done."
exit 0
;;
hardstop)
echo -n "Stopping (hard) Kafka: "
/sbin/runuser -s /bin/sh $KAFKA_USER -c "ps -ef | grep kafka.Kafka | grep -v grep | awk '{print \$2}' | xargs kill -9"
echo " done."
exit 0
;;
status)
c_pid=`ps -ef | grep kafka.Kafka | grep -v grep | awk '{print $2}'`
if [ "$c_pid" = "" ] ; then
echo "Stopped"
exit 3
else
echo "Running $c_pid"
exit 0
fi
;;
restart)
stop
start
;;
*)
echo "Usage: kafka {start|stop|hardstop|status|restart}"
exit 1
;;
esac
重新加載配置文件
# systemctl daemon-reload # service kafka start # 啟動kafka
(3)將kafka添加到服務上,然后設置為開機啟動
# chkconfig --add kafka # 添加到服務上 # chkconfig kafka on #設置為開機啟動
2、基於logstash測試kafka和zookeeper
1、在logstash服務器的/etc/logstash/conf.d目錄下創建一個測試kafka的標准輸出、標准輸入文件
input {
stdin {}
}
output {
kafka {
topic_id => "hello"
bootstrap_servers => "192.168.7.104:9092" # IP地址為kafka的地址以及監聽的端口號,測試其他集群的kafka主機,需要修改IP地址即可。
batch_size => 5
}
stdout {
codec => rubydebug
}
}
2、開始測是kafka的日志文件是否可以傳到logstash服務器上
[root@logstash conf.d]# logstash -f log-to-kafka.conf # 測試配置文件
{
"host" => "logstash",
"message" => "hello world", # 輸入的hello world已經可以在logstash顯示
"@version" => "1",
"@timestamp" => 2020-03-15T03:30:03.426Z
}
nihao
{
"host" => "logstash",
"message" => "nihao", # 輸入的nihao也已經可以在logstash服務上顯示
"@version" => "1",
"@timestamp" => 2020-03-15T03:30:12.118Z
}
