一、elk架構已經完成情況情況下
1)一個腳本收集多個日志,if 判斷寫入es的索引

[root@k8s6 conf.d]# cat file.conf input { file{ path => ["/var/log/messages", "/var/log/secure"] type => "system-log" start_position => "beginning" } file{ path => ["/var/log/elasticsearch/myes.log"] type => "es-log" start_position => "beginning" } } filter{ } output{ if [type] == "system-log" { elasticsearch { hosts => ["192.168.10.22:9200"] index => "system-log-%{+YYYY.MM}" } } if [type] == "es-log" { elasticsearch { hosts => ["192.168.10.22:9200"] index => "es-log-%{+YYYY.MM}" } } }
[root@k8s6 conf.d]# /opt/logstash/bin/logstash -f /etc/logstash/conf.d/file.conf 啟動去kibana查看
Java日志為多行一個事件,需要被處理
2)調試遇到 中括號就為一個事件

[root@k8s6 conf.d]# cat /etc/logstash/conf.d/codec.conf input { stdin { codec => multiline{ pattern => "^\[" negate => true what => "previous" } } } filter{ } output{ stdout { codec => rubydebug } }
啟動測試

[root@k8s6 conf.d]# /opt/logstash/bin/logstash -f /etc/logstash/conf.d/codec.conf Settings: Default pipeline workers: 4 Pipeline main started [你好啊 gitd^H^Hgird hei, hai [ { "@timestamp" => "2019-03-15T11:52:51.012Z", "message" => "[你好啊\ngitd\b\bgird\nhei, hai", "@version" => "1", "tags" => [ [0] "multiline" ], "host" => "k8s6" }
3)修改file文件,記錄Java日志(es的日志)添加了codec參數

[root@k8s6 ~]# cat /etc/logstash/conf.d/file.conf input { file{ path => ["/var/log/messages", "/var/log/secure"] type => "system-log" start_position => "beginning" } file{ path => ["/var/log/elasticsearch/myes.log"] type => "es-log" start_position => "beginning" codec => multiline{ pattern => "^\[" negate => true what => "previous" } } } filter{ } output{ if [type] == "system-log" { elasticsearch { hosts => ["192.168.10.22:9200"] index => "system-log-%{+YYYY.MM}" } } if [type] == "es-log" { elasticsearch { hosts => ["192.168.10.22:9200"] index => "es-log-%{+YYYY.MM}" } } }
啟動之前,需要刪除之前啟動生成文件內容,由於在前台啟動的,默認在家目錄的隱藏文件 ls /root/ -a
正常在后台啟動下: ls /var/lib/logstash/ -a
並且還要去es的head插件刪除該節點
4)刪除2內容和節點后,再次啟動服務
[root@k8s6 ~]# /opt/logstash/bin/logstash -f /etc/logstash/conf.d/file.conf
java的多行日志被分開
二、監控nginx日志
1)快速安裝nginx
尋找:https://mirrors.aliyun.com/epel/

yum install -y nginx wget https://mirrors.aliyun.com/epel/epel-release-latest-7.noarch.rpm yum install wget -y wget https://mirrors.aliyun.com/epel/epel-release-latest-7.noarch.rpm rpm -ivh epel-release-latest-7.noarch.rpm yum instal nginx -y
2)修改配置文件,存為json數據,只修改了部分

http { log_format main '$remote_addr - $remote_user [$time_local] "$request" ' '$status $body_bytes_sent "$http_referer" ' '"$http_user_agent" "$http_x_forwarded_for"'; log_format access_log_json '{"user_ip":"$http_x_real_ip","lan_ip":"$remote_addr","log_time":"$time_iso8601","user_req":"$request","http_code":"$status","body_bytes_sent":"$body_bytes_sent","req_time":"$request_time","user_ua":"$http_user_agent"}'; access_log /var/log/nginx/access_log_json.log access_log_json;
3)屏幕輸出nginx日志

[root@node01 conf.d]# cat /etc/logstash/conf.d/nginx.conf input{ file { path => "/var/log/nginx/access_log_json.log" codec => "json" } } filter{ } output{ stdout { codec => rubydebug } }
啟動調試模式,測試

[root@node01 conf.d]# /opt/logstash/bin/logstash -f /etc/logstash/conf.d/nginx.conf Settings: Default pipeline workers: 4 Pipeline main started { "user_ip" => "-", "lan_ip" => "10.8.0.18", "log_time" => "2019-03-16T00:57:19+08:00", "user_req" => "GET / HTTP/1.1", "http_code" => "304", "body_bytes_sent" => "0", "req_time" => "0.000", "user_ua" => "Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko", "@version" => "1", "@timestamp" => "2019-03-15T16:57:20.127Z", "path" => "/var/log/nginx/access_log_json.log", "host" => "node01" }
4)存入es服務,並屏幕輸出調試。 寫入之前先在家目錄下刪除.sincedb 文件
[root@node01 ~]# /etc/init.d/logstash start

[root@node01 conf.d]# cat /etc/logstash/conf.d/nginx.conf input{ file { path => "/var/log/nginx/access_log_json.log" codec => "json" } } filter{ } output{ elasticsearch { hosts => ["192.168.10.22:9200"] index => "nginx-access-log-%{+YYYY.MM.dd}" } stdout { codec => rubydebug } }
/opt/logstash/bin/logstash -f /etc/logstash/conf.d/nginx.conf
在kibana中配置時,注意時間格式
三、記錄系統日志,rsyslog服務的日志文件
1)由於rsyslog系統文件較少,即所有機器都存在一起
思路,n1機器 啟動logstash配置文件,監聽514端口
其他機器修改 /etc/rsyslog.conf 配置
最后一行改為 *.* @@192.168.10.23:514 該服務的日志就會寫入n1的logstash服務里
2)node1,的logstash配置文件如下,調試模式

[root@node01 conf.d]# cat syslog.conf input{ syslog { type => "system-syslog" port => 514 } } filter{ } output{ stdout { codec => rubydebug } }
[root@node01 conf.d]# /opt/logstash/bin/logstash -f /etc/logstash/conf.d/syslog.conf 啟動服務
可以查看端口
[root@node01 ~]# netstat -lntup|grep 514 tcp6 0 0 :::514 :::* LISTEN 27619/java udp6 0 0 :::514 :::* 27619/java
其他機器修改配置文件
[root@k8s6 ~]# tail -2 /etc/rsyslog.conf *.* @@192.168.10.23:514 # ### end of the forwarding rule ###
[root@k8s6 ~]# systemctl restart rsyslog 重啟服務,日志將發送到192.168.10.23的機器中
3)使用 logger 命令也可產生系統日志
[root@k8s6 ~]# logger hellow world
監聽的日志
{ "message" => "hellow world\n", "@version" => "1", "@timestamp" => "2019-03-16T05:30:58.000Z", "type" => "system-syslog", "host" => "192.168.10.22", "priority" => 13, "timestamp" => "Mar 16 13:30:58", "logsource" => "k8s6", "program" => "root", "severity" => 5, "facility" => 1, "facility_label" => "user-level", "severity_label" => "Notice" }
3)存入es中的配置文件

[root@node01 conf.d]# cat /etc/logstash/conf.d/syslog.conf input{ syslog { type => "system-syslog" port => 514 } } filter{ } output{ elasticsearch { hosts => ["192.168.10.22:9200"] index => "system-syslog-%{+YYYY.MM}" } }
在kibana中查詢
四、監聽tcp的日志
1.1)node01調試模式下啟動服務

[root@node01 conf.d]# cat /etc/logstash/conf.d/tcp.conf input{ tcp { type => "tcp" port => "6666" mode => "server" } } filter{ } output{ stdout { codec => rubydebug } }
啟動監聽程序 :[root@node01 conf.d]# /opt/logstash/bin/logstash -f /etc/logstash/conf.d/tcp.conf
查看端口
[root@node01 ~]# netstat -lntup|grep 6666 tcp6 0 0 :::6666 :::* LISTEN 27884/java
1.2)另一台機器模擬發送tcp日志
[root@k8s6 ~]# yum install nc -y [root@k8s6 ~]# echo "hello" |nc 192.168.10.23 6666
也可以發送文件 [root@k8s6 ~]# nc 192.168.10.23 6666 < /etc/resolv.conf
偽設備的發送方式 [root@k8s6 ~]# echo "hello world" > /dev/tcp/192.168.10.23/6666
1.3)查看監聽的程序輸出

[root@node01 conf.d]# /opt/logstash/bin/logstash -f /etc/logstash/conf.d/tcp.conf Settings: Default pipeline workers: 4 Pipeline main started { "message" => "hello", "@version" => "1", "@timestamp" => "2019-03-16T08:00:49.445Z", "host" => "192.168.10.22", "port" => 47168, "type" => "tcp" }
五、收集Apache日志
1.1)查看logstash的預定義插件

[root@node01 patterns]# pwd /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-2.0.5/patterns [root@node01 patterns]# head grok-patterns USERNAME [a-zA-Z0-9._-]+ USER %{USERNAME} EMAILLOCALPART [a-zA-Z][a-zA-Z0-9_.+-=:]+ EMAILADDRESS %{EMAILLOCALPART}@%{HOSTNAME} HTTPDUSER %{EMAILADDRESS}|%{USER} INT (?:[+-]?(?:[0-9]+)) BASE10NUM (?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+))) NUMBER (?:%{BASE10NUM}) BASE16NUM (?<![0-9A-Fa-f])(?:[+-]?(?:0x)?(?:[0-9A-Fa-f]+)) BASE16FLOAT \b(?<![0-9A-Fa-f.])(?:[+-]?(?:0x)?(?:(?:[0-9A-Fa-f]+(?:\.[0-9A-Fa-f]*)?)|(?:\.[0-9A-Fa-f]+)))\b
1.2)filter中,grok插件的使用
grok查看官網
https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html
1.3)根據官網調試日志

[root@k8s6 conf.d]# cat /etc/logstash/conf.d/grok.conf input{ stdin {} } filter{ grok { match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" } } } output{ stdout { codec => rubydebug } }
1.4)啟動查看日志輸出

[root@k8s6 conf.d]# /opt/logstash/bin/logstash -f /etc/logstash/conf.d/grok.conf Settings: Default pipeline workers: 4 Pipeline main started 55.3.244.1 GET /index.html 15824 0.043 { "message" => "55.3.244.1 GET /index.html 15824 0.043", "@version" => "1", "@timestamp" => "2019-03-16T09:25:51.670Z", "host" => "k8s6", "client" => "55.3.244.1", "method" => "GET", "request" => "/index.html", "bytes" => "15824", "duration" => "0.043" }
2.1)調試apache日志
尋找apache的日志插件

[root@node01 patterns]# vim /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-2.0.5/patterns/grok-patterns COMBINEDAPACHELOG 找到這個
2.2)編輯apache的日志bug模式

[root@k8s6 conf.d]# cat apache-grok.conf input{ file { path => "/var/log/httpd/access_log" start_position => "beginning" } } filter{ grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } } output{ stdout { codec => rubydebug } }
啟動測試

[root@k8s6 conf.d]# /opt/logstash/bin/logstash -f /etc/logstash/conf.d/apache-grok.conf Settings: Default pipeline workers: 4 Pipeline main started { "message" => "::1 - - [16/Mar/2019:16:41:48 +0800] \"OPTIONS * HTTP/1.0\" 200 - \"-\" \"Apache/2.4.6 (CentOS) (internal dummy connection)\"", "@version" => "1", "@timestamp" => "2019-03-16T10:26:41.882Z", "path" => "/var/log/httpd/access_log", "host" => "k8s6", "clientip" => "::1", "ident" => "-", "auth" => "-", "timestamp" => "16/Mar/2019:16:41:48 +0800", "verb" => "OPTIONS", "request" => "*", "httpversion" => "1.0", "response" => "200", "referrer" => "\"-\"", "agent" => "\"Apache/2.4.6 (CentOS) (internal dummy connection)\"" }
3.1)寫入es服務

[root@k8s6 ~]# cat /etc/logstash/conf.d/apache-grok.conf input{ file { path => "/var/log/httpd/access_log" start_position => "beginning" } } filter{ grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } } output{ elasticsearch { hosts => ["192.168.10.22:9200"] index => "apache-accesslog-%{+YYYY.MM.dd}" } }
kibana進行日志匹配
六、elk標准架構圖,使用消息隊列形式
1)elk架構修改
之前 數據 =》logstash =》es 修改 數據 =》logstash =》redis(消息隊列) =》logstash =》es
支持的output插件
https://www.elastic.co/guide/en/logstash/current/output-plugins.html
2)安裝啟動redis

測試redis yum install redis -y 修改redis配置文件 vim /etc/redis.conf daemonize no ==> daemonize yes 改為后台運行 bind 192.168.10.23 修改ip地址 啟動redis [root@node01 ~]# systemctl start redis [root@node01 ~]# netstat -lntup|grep redis tcp 0 0 192.168.10.23:6379 0.0.0.0:* LISTEN 28379/redis-server [root@node01 ~]# redis-cli -h 192.168.10.23 -p 6379 192.168.10.23:6379>
3.1)測試 logstash與redis測試

[root@node01 ~]# cat /etc/logstash/conf.d/redis.conf input{ stdin {} } output{ redis { host => "192.168.10.23" port => "6379" db => "6" data_type => "list" key => "demo" } }
啟動logstash服務
[root@node01 ~]# /opt/logstash/bin/logstash -f /etc/logstash/conf.d/redis.conf Settings: Default pipeline workers: 4 Pipeline main started hello hello weodda agaeawegaw
添加了3個值
3.2)在redis中查看
[root@node01 ~]# redis-cli -h 192.168.10.23 -p 6379
192.168.10.23:6379> info
db6:keys=1,expires=0,avg_ttl=0
192.168.10.23:6379> select 6
OK
192.168.10.23:6379[6]> keys *
1) "demo"
192.168.10.23:6379[6]> type demo
list
192.168.10.23:6379[6]> llen demo
(integer) 3
192.168.10.23:6379[6]> lindex demo -1
"{\"message\":\"agaeawegaw\",\"@version\":\"1\",\"@timestamp\":\"2019-03-16T15:20:02.261Z\",\"host\":\"node01\"}"
4)將apache日志寫入redis

[root@k8s6 conf.d]# cat /etc/logstash/conf.d/apache-grok-redis.conf input{ file { path => "/var/log/httpd/access_log" start_position => "beginning" } } filter{ grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } } output{ redis { host => "192.168.10.23" port => "6379" db => "6" data_type => "list" key => "apache-accesslog" } }
過濾操作,可在后面讀取redis時實現
啟動服務
[root@k8s6 conf.d]# /opt/logstash/bin/logstash -f /etc/logstash/conf.d/apache-grok-redis.conf
可進入redis查看
5)從另一台機器啟動logstash讀取redis寫入es服務
192.168.10.22機器使用 logstash 將日志存入redis 192.168.10.23機器使用logstash讀取redis內寫入的日志
查看inpu的插件
https://www.elastic.co/guide/en/logstash/current/input-plugins.html
5.1)屏幕調試輸出

[root@node01 conf.d]# cat /etc/logstash/conf.d/indexer.conf input{ redis { host => "192.168.10.23" port => "6379" db => "6" data_type => "list" key => "apache-accesslog" } } output{ stdout { codec => rubydebug } }
/opt/logstash/bin/logstash -f /etc/logstash/conf.d/indexer.conf
5.2)整個過程梳理(最終版)

標准存入redis [root@k8s6 conf.d]# cat /etc/logstash/conf.d/apache-grok-redis.conf input{ file { path => "/var/log/httpd/access_log" start_position => "beginning" } } output{ redis { host => "192.168.10.23" port => "6379" db => "6" data_type => "list" key => "apache-accesslog" } } 標准讀取redis,過濾之后再存入es服務 [root@node01 conf.d]# cat /etc/logstash/conf.d/indexer.conf input{ redis { host => "192.168.10.23" port => "6379" db => "6" data_type => "list" key => "apache-accesslog" } } filter{ grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } } output{ elasticsearch { hosts => ["192.168.10.22:9200"] index => "apache-accesslog-%{+YYYY.MM.dd}" } }
消息隊列使用kafka
六、綜合測試
1.1)logstash存入redis服務

[root@k8s6 conf.d]# cat /etc/logstash/conf.d/sumlog_put.conf input{ file { path => "/var/log/httpd/access_log" start_position => "beginning" type => "apache-accesslog" } file { path => ["/var/log/elasticsearch/myes.log"] type => "es-log" start_position => "beginning" codec => multiline{ pattern => "^\[" negate => true what => "previous" } } } output{ if [type] == "apache-accesslog" { redis { host => "192.168.10.23" port => "6379" db => "6" data_type => "list" key => "apache-accesslog" } } if [type] == "es-log" { redis { host => "192.168.10.23" port => "6379" db => "6" data_type => "list" key => "es-log" } } }
啟動服務
[root@k8s6 ~]# /etc/init.d/logstash start
1.2)處理問題,發現redis沒有存入日志

遇到的坑。權限問題 chown www:www /var/log/httpd/access_log # yum安裝的apache可能存在權限問題 修改logstash的啟動權限 vim /etc/init.d/logstash LS_USER=root LS_GROUP=root
警告:如果啟動端口,請誤使用root用戶
1.3)讀取redis,存入es

標准讀取redis,過濾之后再存入es服務 [root@node01 conf.d]# cat /etc/logstash/conf.d/sumlog_get.conf input{ syslog { type => "system-syslog" port => 514 } redis { type => "apache-accesslog" host => "192.168.10.23" port => "6379" db => "6" data_type => "list" key => "apache-accesslog" } redis { type => "es-log" host => "192.168.10.23" port => "6379" db => "6" data_type => "list" key => "es-log" } } filter { if [type] == "apache-accesslog" { grok { match => { "message" => "%{COMBINEDAPACHELOG}" } } } } output{ if [type] == "apache-accesslog" { elasticsearch { hosts => ["192.168.10.22:9200"] index => "apache-accesslog-%{+YYYY.MM.dd}" } } if [type] == "es-log" { elasticsearch { hosts => ["192.168.10.22:9200"] index => "es-log-%{+YYYY.MM}" } } if [type] == "system-syslog" { elasticsearch { hosts => ["192.168.10.22:9200"] index => "system-syslog-%{+YYYY.MM}" } } }
也需要改為root用戶,普通用戶無法啟動低端口
2)強調,防止程序掛了
如果使用redis list 作為消息隊列,需要對key進行監控
llen key_name,如超過1萬就報警。
三、kibana的圖形化()
1)markdown定義文本,如緊急聯系人