Logstash過濾分析日志數據/kibanaGUI調試(四)


【Logstash】

[root@localhost ~]# wget https://artifacts.elastic.co/downloads/logstash/logstash-6.3.2.tar.gz

[root@localhost ~]# tar zxvf logstash-6.3.2.tar.gz -C /usr/local/

[root@localhost ~]# mv /usr/local/logstash-6.3.2/ /usr/local/logstash

[root@localhost ~]# mkdir /usr/local/logstash/config/etc/

[root@localhost ~]# vim /usr/local/logstash/config/etc/test01.conf 

測試:小試牛刀,我們先將kafka隊列消息輸出到終端,主要為了測試filebeat生產web日志。推送到kafka隊列,logstash端能否消費,很明顯,下圖所示,消費正常!

input {
kafka {
bootstrap_servers =>
"192.168.37.134:9092,192.168.37.135:9092,192.168.37.136:9092"
topics => ["webapache"]
codec => "json"
}
}
output {
stdout {
codec => rubydebug
}
}

input接收源變成kafka,通過bootstrap_server和topics兩個選項指定了接收源kafka集群以及主題屬性;因為我們的logstash從kafka獲取的的日志數據格式為json,所以需要 在inout字段中加上codec=>"json"解析

輸出端是stdout,表示標准輸出(從終端輸出),這里的codex是個插件,表示格式,放在stout中表示輸出的格式,rubydebug是專門測試的格式,一般在用來在終端輸出json格式信息

[root@localhost ~]#/usr/local/logstash/bin/logstash -f config/etc/test01.conf

[root@localhost etc]# vim /usr/local/logstash/config/etc/httpd_log.conf      #我們配置將kafka消息存入到ES集群中,

input {
kafka {
bootstrap_servers =>
"192.168.37.134:9092,192.168.37.135:9092,192.168.37.136:9092"
topics => ["webapache"]
codec => "json"
}
}
output {
elasticsearch {
hosts => ["192.168.37.134:9200","192.168.37.135:9200","192.168.37.136:9200"]
index => "webapachelog-%{+YYYY-MM-dd}"
}
}

root@localhost etc]#  nohup /usr/local/logstash/bin/logstash -f httpd_log.conf &

詳解:上述可以看到,

input接收源變成kafka,通過bootstrap_server和topics兩個選項指定了接收源kafka集群以及主題屬性;因為我們的logstash從kafka獲取的的日志數據格式為json,所以需要 在inout字段中加上codec=>"json"解析

在看輸出端,output輸出類型是elasticseach,通過hosts指定Elasticsearch集群ip,將kafka消息隊列的數據存入elasticsearch中,最后通過index定義索引名稱;

 [Kibana]

[root@localhost ~]# tar zxvf kibana-6.3.2-linux-x86_64.tar.gz -C /usr/local/

[root@localhost ~]# mv /usr/local/kibana-6.3.2-linux-x86_64/ /usr/local/kibana
[root@localhost ~]# cd /usr/local/kibana/config/

[root@localhost config]# egrep -v "#|^$" kibana.yml 

server.port: 5601       #kibana默認端口
server.host: "192.168.37.136"     #kibana綁定的IP地址,寫本地IP即可
elasticsearch.url: "http://192.168.37.134:9200"  #kibana訪問Elasticsearch地址,可以任意ES集群節點IP,推薦寫Client node角色IP
 kibana.index: ".kibana"    #存儲kibana數據信息的索引

 

這里只需要填寫“webapachelog-*”即可,會自動檢測ES索引對應的文件並抓取映射,前提是filebeat檢測的日志消息經過kafka隊列消息存入 ES中;否則會失敗

選擇“@timestamp”進行時間排序

 

 

 Kibana界面展示:

#Discover:主要用來進行日志檢索,查詢數據

#Visualize:數據可視化,我們可以創建各種唯獨的可視化圖表,例如:面積圖,折線圖,餅圖 等

#Dashboard:儀盤表功能,儀盤表就是可視化圖表的組合,通過將各種可視化圖表組合到一起,整體上了解數據和日志的各種狀態

#Timeblion:時間畫像,可以創建時間序列可視化圖形

#Dev Tools:調試工具控制台,簡單的理解就是kibana提供與ES交互的平台

#Management:管理界面,可以在此創建索引模式,調整kibana設置等操作

 

總結:整個流程呢,我簡單的說一下(大白話說明)

1.首先是由filebeat檢索監控日志,將監控的日志路徑推送到kafka消息隊列中生成topics消息,這里filebeat相當於生產者

2.kafka消息隊列將日志消息存儲起來,這里結合zookeeper集群,並允許logstash拉取

3.Logstash充當消費者,向kafka集群拉取日志數據,並轉發存入Elasticsearch;

4.Kibana為webUI展示頁面,與Elasticsearch進行交互,將日志數據可視化的展現出來

根據上圖架構,從整體分為兩個部分:即日志收集,日志檢索;細分如下描述

1.首先是生產日志,filebeat收集業務服務器上的日志數據,我們只需要將其安裝在需要收集日志服務器上即可,隨后將日志數據實時推送到Kafka集群中,kafka集群會對日志數據進行緩沖和存儲,簡稱緩存,哈哈,這里的filebeat相當於kafka集群中的producer(生產者)

2.filebeat推送到Kafka集群之后,logstash會主動去kafka集群中拉取數據,因為這樣consumer消費者可以自主的控制消費速率和方式,從而減少消費過程中出錯的幾率;其實logstash拉取數據就是分為input,filter和output,中間接受不規則的數據,過濾,分析並轉換成格式化數據最后輸出,

3.Logstash將格式化的數據中轉到Elasticsearch中進行存儲索引,所有的數據都會存儲到Elasticsearch集群中

4.Kibana將elasticsearch中的數據在web GUI界面進行可視化展示


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM