ELK之七------Zookeeper和Kafka安裝與配置


一、zookeeper介紹:

ZooKeeper是一個分布式且開源的分布式應用程序協調服務。

zookeeper集群特性:整個集群種只要有超過集群數量一半的zookeeper工作只正常的,那么整個集群對外就是可用的,假如有2台服務器做了一個zookeeper集群,只要有任何一台故障或宕機,那么這個zookeeper集群就不可用了,因為剩下的一台沒有超過集群一半的數量,但是假如有三台zookeeper組成一個集群,那么損壞一台就還剩兩台,大於3台的一半,所以損壞一台還是可以正常運行的,但是再損壞一台就只剩一台集群就不可用了。那么要是4台組成一個zookeeper集群,損壞一台集群肯定是正常的,那么損壞兩台就還剩兩台,那么2台不大於集群數量的一半,所以3台的zookeeper集群和4台的zookeeper集群損壞兩台的結果都是集群不可用,一次類推5台和6台以及7台和8台都是同理,所以這也就是為什么集群一般都是奇數的原因。

1、安裝JDK環境

1、將下載的JDK包解壓(有幾個集群主機,就配置幾個)

[root@web1 src]# tar xvf jdk-8u212-linux-x64.tar.gz 
[root@web1 src]# ln -s /usr/local/src/jdk1.8.0_212/ /usr/local/jdk
[root@web1 src]# ln -s /usr/local/jdk/bin/java /usr/bin

2、配置JDK環境

[root@web1 src]# vim /etc/profile.d/jdk.sh
 export HISTTIMEFORMAT="%F %T `whoami`"
 export export LANG="en_US.utf-8"
 export JAVA_HOME=/usr/local/jdk
 export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
 export PATH=$PATH:$JAVA_HOME/bin
[root@web1 src]# . /etc/profile.d/jdk.sh

2、zookeeper安裝

zookeeper 下載地址:http://zookeeper.apache.org/releases.html

不要下載最新和太舊的版本,盡量使用較新的版本。

1、將下載下來的zookeeper包進行解壓,並創建軟鏈接。(有幾個集群主機,配置幾個)

[root@web1 src]# tar xvf zookeeper-3.4.14.tar.gz 
[root@web1 src]# ln -s /usr/local/src/zookeeper-3.4.14 /usr/local/zookeeper

2、修改zookeeper配置文件,實現zookeeper集群功能

[root@web1 conf]# pwd
/usr/local/src/zookeeper-3.4.14/conf
[root@web1 conf]# cp zoo_sample.cfg zoo.cfg  # 復制zookeeper配置文件
[root@web1 conf]# mkdir /usr/local/zookeeper/data # 有幾個zookeeper集群就創建一個目錄,存放zookeeper數據的位置

[root@web1 conf]# vim zoo.cfg   # 修改zookeeper配置文件
dataDir=/usr/local/zookeeper/data  # 自定義zookeeper保存數據目錄
clientPort=2181  # 客戶端端口號
maxClientCnxns=4096  #客戶端最大連接數
autopurge.snapRetainCount=512 # 設置zookeeper保存多少次的客戶端數據
autopurge.purgeInterval=1   # 設置zookeeper間隔多少小時清理一次保存的客戶端數據
server.1=192.168.7.104:2888:3888 # 如果有三個集群,就寫三個對應的IP地址。
server.2=192.168.7.105:2888:3888

3、對每個zookeeper集群設置一個myid,必須與上面的zoo.cfg配置文件下面的server.xxx的ID對應,有幾個集群就需要設置幾個ID。

[root@web1 conf]# echo 1 > /usr/local/zookeeper/data/myid
[root@tomcat-web2]# echo 2 > /usr/local/zookeeper/data/myid

4、啟動zookeeper服務

[root@web1 ~]# /usr/local/zookeeper/bin/zkServer.sh start  #啟動zookeeper服務器
[root@web1 ~]# /usr/local/zookeeper/bin/zkServer.sh status  # 查看啟動的狀態,如果不是以下的狀態,說明集群失敗
ZooKeeper JMX enabled by default 
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: follower  # 從服務器

[root@tomcat-web2 bin]# /usr/local/zookeeper/bin/zkServer.sh start
[root@tomcat-web2 bin]# /usr/local/zookeeper/bin/zkServer.sh status #查看zookeeper啟動狀態
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: leader  # 主服務器

5、將zookeeper設置為開機啟動

[root@tomcat-web2 src]# vim /etc/rc.d/rc.local
/usr/local/zookeeper/bin/zkServer.sh start  # 設置為開機啟動

[root@tomcat-web2 src]# chmod +x /etc/rc.d/rc.local  #加上執行權限

二、kafka簡介:

Kafka 被稱為下一代分布式消息系統,是非營利性組織ASF(Apache Software Foundation,簡稱為ASF)基金會中的一個開源項目,比如HTTP Server、Hadoop、ActiveMQ、Tomcat等開源軟件都屬於Apache基金會的開源軟件,類似的消息系統還有RbbitMQ、ActiveMQ、ZeroMQ,最主要的優勢是其具備分布式功能、並且結合zookeeper可以實現動態擴容。

kafka要想正常運行,必須配置zookeeper,否則無論是kafka集群還是客戶端的生存者和消費者都無法正常的工作的;所以需要配置啟動zookeeper服務。

官方文檔:http://www.infoq.com/cn/articles/apache-kafka

kafka術語解釋

Broker

Kafka集群包含一個或多個服務器,這種服務器被稱為broker

Topic

每條發布到Kafka集群的消息都有一個類別,這個類別被稱為topic。(物理上不同topic的消息分開存儲,邏輯上一個topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的topic即可生產或消費數據而不必關心數據存於何處)

Partition

parition是物理上的概念,每個topic包含一個或多個partition,創建topic時可指定parition數量。每個partition對應於一個文件夾,該文件夾下存儲該partition的數據和索引文件

Producer

負責發布消息到Kafka broker

Consumer

消費消息。每個consumer屬於一個特定的consuer group(可為每個consumer指定group name,若不指定group name則屬於默認的group)。使用consumer high level API時,同一topic的一條消息只能被同一個consumer group內的一個consumer消費,但多個consumer group可同時消費這一消息。

 1、下載並安裝kafka

kafka下載地址:http://kafka.apache.org/downloads.html

1、將下載的kafka包進行解壓,並創建軟鏈接(有幾個集群主機就配置幾個)

[root@tomcat-web2 src]# tar xvf kafka_2.12-2.1.0.tgz   #解壓kafka包
[root@web1 src]# ln -s /usr/local/src/kafka_2.12-2.1.0 /usr/local/kafka  # 設置軟鏈接

2、修改kafka配置文件(有幾個集群主機,就配置幾個)

[root@tomcat-web2 kafka_2.12-2.1.0]# pwd
/usr/local/src/kafka_2.12-2.1.0
[root@tomcat-web2 kafka_2.12-2.1.0]# vim config/server.properties  # 修改kafka配置文件
broker.id=1  # 與zookeeper的ID一致,有幾個kafka主機,就在每個kafka寫入對應的zookeeper主機ID
listeners=PLAINTEXT://192.168.7.104:9092   # 寫成本地的服務器地址,針對不同的集群,在不同的kafka主機寫對應的IP地址
log.dirs=/usr/local/kafka/kafka-logs # 日志存放目錄
log.retention.hours=24  # 保留日志的時間,以小時為單位
zookeeper.connect=192.168.7.104:2181,192.168.7.105:2181 # 設置與zookeeper主機的連接地址,有幾個集群,就配置幾個IP地址。

3、啟動kafka服務,以守護進程的方式啟動

# /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

查看此時kafka啟動狀態,第一台kafka已經啟動

 查看第二台kafka主機狀態,也已經啟動

4、將kafka設置為開機啟動(經測試暫時未能成功,但是可以在開機啟動時候使用kafka的service命令將kafka啟動)

(1)配置kafka的環境變量

[root@web1 init.d]# vim /etc/profile.d/kafka.sh 
export KAFKA_HOME="/usr/local/kafka"
export PATH="${KAFKA_HOME}/bin:$PATH"

[root@web1 init.d]# .  /etc/profile.d/kafka.sh 

 (2)在/etc/init.d/目錄下編輯一個啟動腳,並加上執行權限chmod +x kafka

#!/bin/sh
#
# chkconfig: 345 99 01
# description: Kafka
#
# File : Kafka
#
# Description: Starts and stops the Kafka server
#

source /etc/rc.d/init.d/functions

KAFKA_HOME=/usr/local/kafka  # 修改kafka的存放路徑
KAFKA_USER=root  # 使用root啟動
export LOG_DIR=/usr/local/kafka/kafka-logs  # 定義kafka的log存放位置,與前面的kafka配置文件有關

[ -e /etc/sysconfig/kafka ] && . /etc/sysconfig/kafka

# See how we were called.
case "$1" in

  start)
    echo -n "Starting Kafka:"
    /sbin/runuser -s /bin/sh $KAFKA_USER -c "nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties > $LOG_DIR/server.out 2>  $LOG_DIR/server.err &"
    echo " done."
    exit 0
    ;;

  stop)
    echo -n "Stopping Kafka: "
    /sbin/runuser -s /bin/sh $KAFKA_USER  -c "ps -ef | grep kafka.Kafka | grep -v grep | awk '{print \$2}' | xargs kill"
    echo " done."
    exit 0
   ;;
  hardstop)
    echo -n "Stopping (hard) Kafka: "
    /sbin/runuser -s /bin/sh $KAFKA_USER  -c "ps -ef | grep kafka.Kafka | grep -v grep | awk '{print \$2}' | xargs kill -9"
    echo " done."
    exit 0
    ;;

  status)
    c_pid=`ps -ef | grep kafka.Kafka | grep -v grep | awk '{print $2}'`
    if [ "$c_pid" = "" ] ; then
      echo "Stopped"
      exit 3
    else
      echo "Running $c_pid"
      exit 0
    fi
    ;;

  restart)
    stop
    start
    ;;

  *)
    echo "Usage: kafka {start|stop|hardstop|status|restart}"
    exit 1
    ;;

esac

重新加載配置文件

# systemctl daemon-reload   
# service kafka start # 啟動kafka

(3)將kafka添加到服務上,然后設置為開機啟動

# chkconfig --add kafka   # 添加到服務上
# chkconfig  kafka on    #設置為開機啟動

2、基於logstash測試kafka和zookeeper  

1、在logstash服務器的/etc/logstash/conf.d目錄下創建一個測試kafka的標准輸出、標准輸入文件

input {
    stdin {}
}
output {
    kafka {
        topic_id => "hello"
        bootstrap_servers => "192.168.7.104:9092"  # IP地址為kafka的地址以及監聽的端口號,測試其他集群的kafka主機,需要修改IP地址即可。
        batch_size => 5
    }
    stdout {
        codec => rubydebug
    }
}

2、開始測是kafka的日志文件是否可以傳到logstash服務器上

[root@logstash conf.d]# logstash -f log-to-kafka.conf  # 測試配置文件
{
          "host" => "logstash",
       "message" => "hello world", # 輸入的hello world已經可以在logstash顯示
      "@version" => "1",
    "@timestamp" => 2020-03-15T03:30:03.426Z
}
nihao
{
          "host" => "logstash",
       "message" => "nihao",  # 輸入的nihao也已經可以在logstash服務上顯示
      "@version" => "1",
    "@timestamp" => 2020-03-15T03:30:12.118Z
}

  

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM