ELK+Kafka學習筆記之搭建ELK+Kafka日志收集系統集群


 

0x00 概述

關於如何搭建ELK部分,請參考這篇文章https://www.cnblogs.com/JetpropelledSnake/p/9893566.html。

該篇用戶為非root,使用用戶為“elk”。

基於以前ELK架構的基礎,結合Kafka隊列,實現了ELK+Kafka集群,整體架構如下:

# 1. 兩台es組成es集群;( 以下對elasticsearch簡稱es )

# 2. 中間三台服務器就是我的kafka(zookeeper)集群啦; 上面寫的 消費者/生產者 這是kafka(zookeeper)中的概念;

# 3. 最后面的就是一大堆的生產服務器啦,上面使用的是logstash,當然除了logstash也可以使用其他的工具來收集你的應用程序的日志,例如:Flume,Scribe,Rsyslog,Scripts……
# 雙ES備份集群: CentOS1+CentOS2
# Kafka+LogStash集群:CentOS3+CentOS4+CentOS5
# ES-Header+Kibina+LogStash/Beats采集:CentOS6+CentOS7
# 注意!!由於測試環境限制,此篇博客中CentOS6+CentOS7的內容放到CentOS1+CentOS2中的

使用軟件:

# Elasticsearch 6.2.4

# Kibana 6.2.4

# Logstash 6.2.4

# elasticsearch-head(從github直接clone)

# filebeat 6.2.4 (本篇文章使用Logstash做搜集用,可以用filebeat替換Logstash)

# phantomjs-2.1.1-linux-x84_64 (配合npm指令使用)

# node-v4.9.1-linux-x64 (記住文件夾路徑,需要增加到bash_profile中)

# kafka_2.11-1.0.0 (該版本已經包含ZooKeeper服務)

部署步驟:

# ES集群安裝配置;

# Logstash客戶端配置(直接寫入數據到ES集群,寫入系統messages日志);

# Kafka(zookeeper)集群配置;(Logstash寫入數據到Kafka消息系統);

 

0x01 ES集群安裝配置

ES安裝請參考這里https://www.cnblogs.com/JetpropelledSnake/p/9893566.html;

1. 下面修改ES的配置文件:

# 確保下列參數被正確設置:

cluster.name: logger   # ES集群的名字

node.name: node-1

path.data: /usr/local/app/elasticsearch-6.2.4/data

path.logs: /usr/local/app/elasticsearch-6.2.4/log

bootstrap.memory_lock: false   # 對於非專用ES,建議設置為false,默認為true
bootstrap.system_call_filter: false

network.host: 0.0.0.0  # 支持遠程訪問

http.port: 9200 # restful api訪問接口

http.cors.enabled: true      #允許ES head跨域訪問
http.cors.allow-origin: "*"   #允許ES head跨域訪問

2. 獲取es服務管理腳本

[ root @ elk ~ ] # git clone https://github.com/elastic/elasticsearch-servicewrapper.git

[ root @ elk ~ ] # mv elasticsearch-servicewrapper/service /usr/local/elasticsearch/bin/

[ root @ elk ~ ] # /usr/local/elasticsearch/bin/service/elasticsearch install

Detected RHEL or Fedora :

Installing the Elasticsearch daemon . .

[ root @ elk ~ ] #

#這時就會在/etc/init.d/目錄下安裝上es的管理腳本啦

#修改其配置:

[ root @ elk ~ ] #

set . default . ES_HOME = /usr/local/app/elasticsearch-6.2.4    #安裝路徑

set . default . ES_HEAP_SIZE = 1024                    #jvm內存大小,根據實際環境調整即可

3. 啟動es ,並檢查其服務是否正常

[ root @ elk ~ ] # netstat -nlpt | grep -E "9200|"9300

tcp          0        0 0.0.0.0 : 9200                  0.0.0.0 : *                    LISTEN        1684/ java          

tcp          0        0 0.0.0.0 : 9300                  0.0.0.0 : *                    LISTEN        1684/ java

 訪問http://192.168.26.135:9200/ 如果出現以下提示信息說明安裝配置完成

 

4. es1節點已經設置好,我們直接把目錄復制到es2

[ root @ elk ] # scp -r elasticsearch-6.2.4  192.168.26.136:/usr/local/app

[ root @ elk ] # elasticsearch/bin/service/elasticsearch install

#es2只需要修改node.name(此處使用的是node-2和node-1)即可,其他都與es1相同配置

5. 安裝es的管理插件elasticsearch-head

es-head安裝,請參考這里,https://www.cnblogs.com/JetpropelledSnake/p/9893566.html。

安裝好之后,訪問方式為:http://192.168.26.135:9100/

到此,ES集群的部署完成。

 

0x02  Logstash客戶端安裝配置

LogStash安裝請參考這里,https://www.cnblogs.com/JetpropelledSnake/p/9893566.html;

# Logstach需要搭配指定的配置文件啟動,創建一個logstash配置文件,比如logstash-es.conf,啟動LogStash時候使用;根據不同的配置文件,LogStash會做不同工作。

1. 通過stdin標准實時輸入的方式向Logstash向es集群寫數據(測試,暫未通過Kafka隊列傳遞)

    1.1 使用如下命令創建LogStash啟動配置文件

# cd /usr/local/app/logstash-6.2.4/config
# touch logstash_for_stdin.conf

# [elk@localhost config]$ vim logstash_for_stdin.conf 

     1.2 檢查配置文件是否有語法錯(如下 -t 為測試命令)

# [elk@localhost logstash-6.2.4]$ ./bin/logstash -f ./config/logstash_for_stdin.conf -t

    1.3 啟動 Logstash,開始寫入操作

# [elk@localhost logstash-6.2.4]$ ./bin/logstash -f ./config/logstash_for_stdin.conf

以上是輸入數據,從ES-Header(192.168.26.135:9200)插件就可以看到,數據已經分散的寫入到兩個集群內

ES Server:192.168.26.136

 ES Server:192.168.26.135

2. 通過采集制定文件的輸入的方式向Logstash向es集群寫數據(測試,暫未通過Kafka隊列傳遞)

    2.1首先創建一個用於采集系統日志的LogStash啟動的conf文件,使用如下命令

# cd /usr/local/app/logstash-6.2.4/config
# touch logstash_for_system_messages.conf

    2.2 編輯logstash_for_system_messages.conf文件

# [elk@localhost config]$ vim logstash_for_system_messages.conf 
input {
   file{
        path => "/var/log/messages"    #這是日志文件的絕對路徑
        start_position=>"beginning"    #這個表示從 messages 的第一行讀取,即文件開始處
   }
}
output {     #輸出到 es
    elasticsearch {    #這里將按照這個索引格式來創建索引
        hosts=>["192.168.26.135:9200","192.168.26.136:9200"]
        index=>"system-messages-%{+YYYY-MM}"
    }
}

    2.3 測試ogstash_for_system_messages.conf文件

# [elk@localhost logstash-6.2.4]$ ./bin/logstash -f ./config/logstash_for_system_messages.conf -t

    2.4 啟動LogStash,開始采集系統日志

# 注意,系統日志需要權限,命令行前面使用 sudo
#
[elk@localhost logstash-6.2.4]$ sudo ./bin/logstash -f ./config/logstash_for_system_messages.conf

 ES Server: 192.168.26.136

 ES Server: 192.168.26.135

系統日志我們已經成功的收集,並且已經寫入到es集群中.

上面的演示是logstash直接將日志寫入到es集群中的,這種場合我覺得如果量不是很大的話直接像上面已將將輸出output定義到es集群即可.

如果量大的話需要加上消息隊列Kafka來緩解es集群的壓力,下面就在三台server上面安裝kafka集群。

 

0x03 Kafka集群安裝配置1

 本篇博客使用的Kafka版本為

# kafka_2.11-1.0.0 (該版本已經包含ZooKeeper服務)

 1. 從官網獲取裝包,https://kafka.apache.org/downloads

2. 配置Zookeeper集群,修改配置文件

# [elk@localhost config]$ vim zookeeper.properties 

修改內容如下:

dataDir=/usr/local/app/kafka_2.11-1.0.0/zookeeperDir
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=1024
tickTime=2000
initLimit=20
syncLimit=10
server.2=192.168.26.137:2888:3888
server.3=192.168.26.138:2888:3888
server.4=192.168.26.139:2888:3888
# 說明:

tickTime : 這個時間是作為 Zookeeper 服務器之間或客戶端與服務器之間維持心跳的時間間隔,也就是每個 tickTime 時間就會發送一個心跳。

2888 端口:表示的是這個服務器與集群中的 Leader 服務器交換信息的端口;

3888 端口:表示的是萬一集群中的 Leader 服務器掛了,需要一個端口來重新進行選舉,選出一個新的 Leader ,而這個端口就是用來執行選舉時服務器相互通信的端口。

3. 創建Zookeeper所需的目錄

進入Kafka目錄內

創建目錄zookeeperDir

# [elk@localhost kafka_2.11-1.0.0]$ mkdir zookeeperDir
# [elk@localhost kafka_2.11-1.0.0]$ cd zookeeperDir

在zookeeperDir目錄下創建myid文件,里面的內容為數字,用於標識主機,如果這個文件沒有的話,zookeeper是沒法啟動的

# [elk@localhost zookeeperDir]$ touch myid

命名myid

# [elk@localhost zookeeperDir]$ echo 2 > myid

以上就是zookeeper集群的配置,下面等我配置好kafka之后直接復制到其他兩個節點即可

 4. Kafka配置

 

修改內容如下

broker . id = 2           # 唯一,填數字,本文中分別為 2 / 3 / 4

prot = 9092      # 這個 broker 監聽的端口

host . name = 192.168.2.22    # 唯一,填服務器 IP

log . dir = / data / kafka - logs    #  該目錄可以不用提前創建,在啟動時自己會創建

zookeeper . connect = 192.168.2.22 : 2181 , 192.168.2.23 : 2181 , 192.168.2.24 :2181  # 這個就是 zookeeper 的 ip 及端口

num . partitions = 16          # 需要配置較大 分片影響讀寫速度

log . dirs = / data / kafka - logs # 數據目錄也要單獨配置磁盤較大的地方

log . retention . hours = 168    # 時間按需求保留過期時間 避免磁盤滿

5. 將kafka(zookeeper)的程序目錄全部拷貝至其他兩個節點

# [elk@localhost app]$ scp kafka_2.11-1.0.0 192.168.26.138:/usr/local/app/
# [elk@localhost app]$ scp kafka_2.11-1.0.0 192.168.26.139:/usr/local/app/

6. 修改兩個節點的配置,注意這里除了以下兩點不同外,都是相同的配置

# zookeeper 的配置

mkdir zookeeperDir

echo "x" > zookeeperDir/myid    # “x”這里是3或者4,前面已經使用過2了

# kafka 的配置

broker.id = x   # “x”這里是3或者4,前面已經使用過2了

host.name = 192.168.26.137    # ip改為26.138或者26.139

7. 修改完畢配置之后我們就可以啟動了,這里先要啟動zookeeper集群,才能啟動kafka

 我們按照順序來,kafka1 –> kafka2 –>kafka3

# [elk@localhost kafka_2.11-1.0.0]$ ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &    #zookeeper啟動命令
# [elk@localhost kafka_2.11-1.0.0]$ ./bin/zookeeper-server-stop.sh    #zookeeper停止的命令

注意,如果zookeeper有問題 nohup的日志文件會非常大,把磁盤占滿,這個zookeeper服務可以通過自己些服務腳本來管理服務的啟動與關閉。

后面兩台執行相同操作,在啟動過程當中會出現以下報錯信息

[2018-12-03 15:06:02,089] WARN Cannot open channel to 4 at election address /192.168.26.139:3888 (org.apache.zookeeper.server.quorum.QuorumCnxManager)
java.net.ConnectException: Connection refused (Connection refused)
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:562)
    at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectAll(QuorumCnxManager.java:614)
    at org.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader(FastLeaderElection.java:843)
    at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:913)

由於zookeeper集群在啟動的時候,每個結點都試圖去連接集群中的其它結點,先啟動的肯定連不上后面還沒啟動的,所以上面日志前面部分的異常是可以忽略的。通過后面部分可以看到,集群在選出一個Leader后,最后穩定了。

其他節點也可能會出現類似的情況,屬於正常。

8. Zookeeper服務檢查

 在Kafka三台Server上執行如下命令

# netstat -nlpt | grep -E "2181|2888|3888"

顯示如下端口和信息,說明Zookeeper運行正常

[elk@localhost ~]$ netstat -nlpt | grep -E "2181|2888|3888"
(Not all processes could be identified, non-owned process info
 will not be shown, you would have to be root to see it all.)
tcp6       0      0 :::2181                 :::*                    LISTEN      5465/java           
tcp6       0      0 192.168.26.139:3888     :::*                    LISTEN      5465/java 

可以看出,如果哪台是Leader,那么它就擁有2888這個端口

[elk@localhost ~]$ netstat -nlpt | grep -E "2181|2888|3888"
(Not all processes could be identified, non-owned process info
 will not be shown, you would have to be root to see it all.)
tcp6       0      0 :::2181                 :::*                    LISTEN      5434/java           
tcp6       0      0 192.168.26.138:2888     :::*                    LISTEN      5434/java           
tcp6       0      0 192.168.26.138:3888     :::*                    LISTEN      5434/java 

9. 啟動Kafka服務

這時候zookeeper集群已經啟動起來了,下面啟動kafka,也是依次按照順序啟動,kafka1 –> kafka2 –>kafka3

# [elk@localhost kafka_2.11-1.0.0]$ pwd
# /usr/local/app/kafka_2.11-1.0.0
# [elk@localhost kafka_2.11-1.0.0]$ ./bin/kafka-server-start.sh ./config/server.properties &    #啟動Kafka服務
# [elk@localhost kafka_2.11-1.0.0]$ ./bin/kafka-server-stop.sh    # 停止Kafka服務

10. 此時三台上面的zookeeper及kafka都已經啟動完畢,下面來測試一下

    10.1 建立一個主題(在Kafka Server 192.168.26.137)

# 注意:factor大小不能超過broker的個數
# ./bin/kafka-topics.sh --create --zookeeper 192.168.26.137:2181 --replication-factor 3 --partitions 1 --topic test1test2test

    10.2 查看已經創建的topics(Kafka Server :192.168.26.137)

# ./bin/kafka-topics.sh --list --zookeeper 192.168.26.137:2181

    10.3 查看test1test2test這個主題的詳情(Kafka Server :192.168.26.137)

#主題名稱:test1test2test

#Partition:只有一個,從0開始

#leader :id為4的broker

#Replicas 副本存在於broker id為2,3,4的上面

#Isr:活躍狀態的broker

    10.4 發送消息,這里使用的是生產者角色(Kafka Server :192.168.26.137)

 

# ./bin/kafka-console-producer.sh --broker-list 192.168.26.137:9092 --topic test1test2test

    10.5 接收消息,這里使用的是消費者角色(更換到另外一台KafkaServer上運行,Kafka Server :192.168.26.138)

     消息已經顯示出來了。

 

# ./bin/kafka-console-consumer.sh --zookeeper  192.168.26.138:2181 --topic test1test2test --from-beginning

以上是Kafka生產者和消費者的測試,基於Kafka的Zookeeper集群就成功了。

11. 下面我們將ES Server:192.168.26.135上面的logstash的輸出改到kafka上面,將數據寫入到kafka中

    11.1 創建LogStash結合Kafka使用的.conf文件,注意文件目錄

    11.2  編輯輸入到Kafka的.conf文件

# vim logstash_for_kafka.conf
input {     #這里的輸入還是定義的是從日志文件輸入
    file {
        type = > "system-message"
        path = > "/var/log/messages"
        start_position = > "beginning"
    }
}
output {    #stdout { codec => rubydebug }   #這是標准輸出到終端,可以用於調試看有沒有輸出,注意輸出的方向可以有多個
    kafka {    #輸出到kafka
        bootstrap_servers = > 
        "192.168.26.137:9092,192.168.26.138:9092,192.168.26.139:9092"   #他們就是生產者
        topic_id = > "system-secure"    #這個將作為主題的名稱,將會自動創建
        compression_type = > "snappy"    #壓縮類型
     }
}

    11.3 測試輸入到Kafka的配置文件

 

# [elk@localhost logstash-6.2.4]$ ./bin/logstash -f ./config/logstash_for_kafka.conf -t

     11.4 使用該配置文件啟動LogStash

 注意采集系統日志需要權限,命令行前面必須加sudo

# [elk@localhost logstash-6.2.4]$ sudo ./bin/logstash -f ./config/logstash_for_kafka.conf

    11.5 驗證數據是否寫入到kafka,這里我們檢查是否生成了一個叫system-secure的主題

 

# [elk@localhost kafka_2.11-1.0.0]$ ./bin/kafka-topics.sh --list --zookeeper 192.168.26.137:2181

     11.6 查看system-secure詳情

# [elk@localhost kafka_2.11-1.0.0]$ ./bin/kafka-topics.sh --describe --zookeeper 192.168.26.137:2181 --topic system-secure

    11.7 對於logstash輸出的我們也可以提前先定義主題,然后啟動logstash 直接往定義好的主題寫數據就行啦,命令如下:

#[elk@localhost kafka_2.11-1.0.0]$ ./bin/kafka-topics.sh --create --zookeeper 192.168.26.137:2181 --replication-factor 3 --partitions 3 --topic TOPIC_NAME

好了,我們將logstash收集到的數據寫入到了kafka中了,在實驗過程中我使用while腳本測試了如果不斷的往kafka寫數據的同時停掉兩個節點,數據寫入沒有任何問題。

 

0x04 Kafka集群安裝配置2

那如何將數據從kafka中讀取然后給ES集群呢?

那下面我們在kafka集群上安裝Logstash,可以通過上面的方法,先配置一台的logstash,然后通過scp命令通過網絡拷貝,安裝路徑是“/usr/local/app/logstash-6.2.4”;

三台上面的logstash的配置如下,作用是將kafka集群的數據讀取然后轉交給es集群,這里為了測試我讓他新建一個索引文件,注意這里的輸入日志是secure,主題名稱是“system-secure”;

 1. 如上所說,在三台Kafka集群的Server上分別安裝LogStash,並在如下目錄新建logstash_kafka2ES.conf

 2. 配置logstash_kafka2ES.conf

# [elk@localhost config]$ vim logstash_kafka2ES.conf 

3. 在三台Kafka Server上按順序分別啟動LogStash,啟動命令三台是通用的

# [elk@localhost logstash-6.2.4]$ ./bin/logstash -f ./config/logstash_kafka2ES.conf

4. 測試通過Kafka隊列傳遞消息到ES集群

在ES Server:192.168.26.135上寫入測試內容,利用secure這個文件來測試

 

可以看到消息已經分散的寫入ES及集群中

ES Server 192.168.26.135

ES Server 192.168.26.136

 

0x05 在Kibana中展示

ELK中的Kibana中的使用和部署,請看這篇文章 https://www.cnblogs.com/JetpropelledSnake/p/9893566.html

1. 根據ES集群中存入的數據,在Kibana中建立Index

2. 建立好index后,就可以在Discover這里查詢介入的數據了

3. ES集群采用的查詢語言是DSL語言,該語言采用json的格式組織查詢語句。

4. Kibana可以多樣化展示ES集群中介入的數據,github上有很多定制好的格式,針對常見生產環境可以直接配置,十分方便。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM