【Logstash】
[root@localhost ~]# wget https://artifacts.elastic.co/downloads/logstash/logstash-6.3.2.tar.gz
[root@localhost ~]# tar zxvf logstash-6.3.2.tar.gz -C /usr/local/
[root@localhost ~]# mv /usr/local/logstash-6.3.2/ /usr/local/logstash
[root@localhost ~]# mkdir /usr/local/logstash/config/etc/
[root@localhost ~]# vim /usr/local/logstash/config/etc/test01.conf
測試:小試牛刀,我們先將kafka隊列消息輸出到終端,主要為了測試filebeat生產web日志。推送到kafka隊列,logstash端能否消費,很明顯,下圖所示,消費正常!
input { kafka { bootstrap_servers => "192.168.37.134:9092,192.168.37.135:9092,192.168.37.136:9092" topics => ["webapache"] codec => "json" } } output { stdout { codec => rubydebug } }
input接收源變成kafka,通過bootstrap_server和topics兩個選項指定了接收源kafka集群以及主題屬性;因為我們的logstash從kafka獲取的的日志數據格式為json,所以需要 在inout字段中加上codec=>"json"解析
輸出端是stdout,表示標准輸出(從終端輸出),這里的codex是個插件,表示格式,放在stout中表示輸出的格式,rubydebug是專門測試的格式,一般在用來在終端輸出json格式信息
[root@localhost ~]#/usr/local/logstash/bin/logstash -f config/etc/test01.conf
[root@localhost etc]# vim /usr/local/logstash/config/etc/httpd_log.conf #我們配置將kafka消息存入到ES集群中,
input { kafka { bootstrap_servers => "192.168.37.134:9092,192.168.37.135:9092,192.168.37.136:9092" topics => ["webapache"] codec => "json" } } output { elasticsearch { hosts => ["192.168.37.134:9200","192.168.37.135:9200","192.168.37.136:9200"] index => "webapachelog-%{+YYYY-MM-dd}" } }
root@localhost etc]# nohup /usr/local/logstash/bin/logstash -f httpd_log.conf &
詳解:上述可以看到,
input接收源變成kafka,通過bootstrap_server和topics兩個選項指定了接收源kafka集群以及主題屬性;因為我們的logstash從kafka獲取的的日志數據格式為json,所以需要 在inout字段中加上codec=>"json"解析
在看輸出端,output輸出類型是elasticseach,通過hosts指定Elasticsearch集群ip,將kafka消息隊列的數據存入elasticsearch中,最后通過index定義索引名稱;
[Kibana]
[root@localhost ~]# tar zxvf kibana-6.3.2-linux-x86_64.tar.gz -C /usr/local/
[root@localhost ~]# mv /usr/local/kibana-6.3.2-linux-x86_64/ /usr/local/kibana
[root@localhost ~]# cd /usr/local/kibana/config/
[root@localhost config]# egrep -v "#|^$" kibana.yml
server.port: 5601 #kibana默認端口 server.host: "192.168.37.136" #kibana綁定的IP地址,寫本地IP即可 elasticsearch.url: "http://192.168.37.134:9200" #kibana訪問Elasticsearch地址,可以任意ES集群節點IP,推薦寫Client node角色IP
kibana.index: ".kibana" #存儲kibana數據信息的索引
這里只需要填寫“webapachelog-*”即可,會自動檢測ES索引對應的文件並抓取映射,前提是filebeat檢測的日志消息經過kafka隊列消息存入 ES中;否則會失敗
選擇“@timestamp”進行時間排序
Kibana界面展示:
#Discover:主要用來進行日志檢索,查詢數據
#Visualize:數據可視化,我們可以創建各種唯獨的可視化圖表,例如:面積圖,折線圖,餅圖 等
#Dashboard:儀盤表功能,儀盤表就是可視化圖表的組合,通過將各種可視化圖表組合到一起,整體上了解數據和日志的各種狀態
#Timeblion:時間畫像,可以創建時間序列可視化圖形
#Dev Tools:調試工具控制台,簡單的理解就是kibana提供與ES交互的平台
#Management:管理界面,可以在此創建索引模式,調整kibana設置等操作
總結:整個流程呢,我簡單的說一下(大白話說明)
1.首先是由filebeat檢索監控日志,將監控的日志路徑推送到kafka消息隊列中生成topics消息,這里filebeat相當於生產者
2.kafka消息隊列將日志消息存儲起來,這里結合zookeeper集群,並允許logstash拉取
3.Logstash充當消費者,向kafka集群拉取日志數據,並轉發存入Elasticsearch;
4.Kibana為webUI展示頁面,與Elasticsearch進行交互,將日志數據可視化的展現出來
根據上圖架構,從整體分為兩個部分:即日志收集,日志檢索;細分如下描述
1.首先是生產日志,filebeat收集業務服務器上的日志數據,我們只需要將其安裝在需要收集日志服務器上即可,隨后將日志數據實時推送到Kafka集群中,kafka集群會對日志數據進行緩沖和存儲,簡稱緩存,哈哈,這里的filebeat相當於kafka集群中的producer(生產者)
2.filebeat推送到Kafka集群之后,logstash會主動去kafka集群中拉取數據,因為這樣consumer消費者可以自主的控制消費速率和方式,從而減少消費過程中出錯的幾率;其實logstash拉取數據就是分為input,filter和output,中間接受不規則的數據,過濾,分析並轉換成格式化數據最后輸出,
3.Logstash將格式化的數據中轉到Elasticsearch中進行存儲索引,所有的數據都會存儲到Elasticsearch集群中
4.Kibana將elasticsearch中的數據在web GUI界面進行可視化展示