1.Logstash為日志收集必須掌握知識點
2.Logstash架構介紹
Logstash的基礎架構類型pipeline流水線,如下圖所示:
●Input,數據采集(常用插件: stdin、 file、 kafka、 beat. http、 )
●Filter :數據解析/轉換(r常用插件: grok、date、 geoip、 mutate、 useragent )
●Output :數據輸出(常用插件: Elasticsearch、 )
2.Logstash Input插件
input插件用於指定輸入源,一個pipeline 可以有多個input插件,我們主要圍繞下面幾個input進行介紹:
●stdin
●file
●beat
●kafka
- 實戰1 :從標准輸入讀取數據,從標准輸出中輸出內容:
#安裝--依賴Java環境
[root@logstash-node1 ~]# yum install java -y
[root@logstash-node1 ~]# rpm -ivh logstash-7.4.0.rpm
[root@logstash-node1 ~]# cd /etc/logstash/
[root@logstash-node1 logstash]# vim jvm.options
# Xmx represents the maximum size of total heap space
-Xms512m #調整內存大小,實際生產環境大於一半,
-Xmx512m
#環境測試
[root@logstash-node1 logstash]# cd /etc/logstash/conf.d/
[root@logstash-node1 conf.d]# vim input_file_output_console.conf
[root@logstash-node1 conf.d]# cat input_file_output_console.conf
input {
file {
path => "/var/log/oldxu.log"
type => syslog
exclude => "*.gz" #不想監聽的文件規則,基於glob匹配語法
start_position => "beginning" #第一次叢頭開始讀取文件 beginning or end
stat_interval => "3" #定時檢查文件是否更新,默認1s
}
}
output {
stdout {
codec => rubydebug
}
}
[root@logstash-node1 conf.d]# /usr/share/logstash/bin/logstash -f input_file_output_console.conf
[root@logstash-node1 conf.d]# vim input_stdin_output_console.conf
[root@logstash-node1 conf.d]# cat input_stdin_output_console.conf
input {
stdin {
type => stdin
tags => "tags_stdin"
}
}
output {
stdout {
codec => "rubydebug"
}
}
[root@logstash-node1 ~]# echo "qwwe" >/var/log/oldxu.log
[root@logstash-node1 conf.d]# /usr/share/logstash/bin/logstash -f input_file_output_console.conf
......
{
"message" => "qwwe",
"path" => "/var/log/oldxu.log",
"@timestamp" => 2020-01-15T01:37:08.418Z,
"host" => "logstash-node1",
"type" => "syslog",
"@version" => "1"
}
3.Logstash Filter插件
數據從源傳輸到存儲的過程中, Logstash 的filter過濾器能夠解析各個事件,識別已命名的字段結構,
並將它們轉換成通用格式,以便更輕松、更快速地分析和實現商業價值。
●利用Grok從非結構化數據中派生出結構
●利用geoip從IP地址分析出地理坐標
●利用useragent叢請求中分析操作系統、設備類型
3.1 Grok插件
1.grok是如何出現?
#我們希望將如下非結構化的數據解析成json結構化數據格式
120.27.74.166 - - [ 30/Dec/2019:11:59:18 +0800]"GET / HTTP/1.1"
302 1 54
"Mozi11a/5.0 (Macintosh; Intel Mac OS X 10 14 1) Chrome/79.0.3945.88 Safari/537.36"
#需要使用非常復雜的正則表達式
\[([^]]+)]\s\[(\W+)]\s([^:]+:\s\w+\s\w+\s[^:]+:\S+\s[^:]+:
\S+\s\S+). *\[([^]]+)]\s\[(\w+)]\s([^:]+:\s\w+\s\w+\s[^:]+:
\S+\s[^:]+: \S+\s\S+). *\[([^]]+)]\s\[(\w+)]\s([^:]+:\s\W+
\s\w+\s[^:]+:\S+\s[^:]+: \S+\s\S+).*
2.grok如何解決該問題呢? grok其實是帶有名字的正則表達式集台。grok 內置J很多pattern可以直接使用。
grok語法生成器: http://grokdebug.herokuapp.com/
#grok語法生成器grokdebug.herokuapp.com
%{IPORHOST:clientip} %{NGUSER:ident} %{NGUSER:auth} \[%{HTTPDATE:timestamp}\]
"%{WORD:verb} %{URIPATHPARAM: request} HTTP/%{NUMBER: httpversion}" %{NUMBER:response}
(?:%{NUMBER:bytes}I-) (?:"(?:%{URI:referrer}|-)" |%{QS:referrer})
%{QS:agent} %{QS:xforwardedfor} %{IPORHOST:host} %{BASE10NUM:request_duration}
3.grok語法示意圖
http://grokdebug.herokuapp.com/
4.grok示例、使用grok pattern將Nginx日志格式化為json格式
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
3.3 Date插件
date插件:將日期字符串解析為日志類型。然后替換@timestamp字段或指定的其他字段
●match 類型為數組,用於指定日期匹配的格式,可以以此指定多種日期格式
●target類型為字符串,用於指定賦值的字段名,默認是@timestamp
●timezone 類型為字符串,用於指定時區域
1.date示例,將nginx請求中的timestamp日志進行解析
#創建插件input_http_output_console.conf
[root@logstash-node1 conf.d]# vim input_http_output_console.conf
input {
http {
port => 7474
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
geoip {
source => "clientip"
}
#30/Dec/2019:11:59:18 +0800
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
timezone => "Asia/Shanghai"
}
useragent {
source => "agent"
target => "agent"
}
}
output {
stdout {
codec => rubydebug
}
}
[root@logstash-node1 conf.d]# /usr/share/logstash/bin/logstash -f input_http_output_console.conf -r
電腦下載軟件-->此處為Windows:
https://insomnia.rest/download/#windows
①不同系統選擇不同版本安裝並運行軟件
②依次執行即可,在③處輸入http://10.0.0.151:7474
③將下列代碼插入下圖④所示位置,然后點擊send提示ok表示成功,登錄服務器查看。
#試驗數據:
120.27.74.166 - - [30/Dec/2018:11:59:18 +0800] "GET / HTTP/1.1" 302 154 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_1) Chrome/79.0.3945.88 Safari/537.36"
66.249.73.135 - - [20/May/2015:21:05:11 +0000] "GET /blog/tags/xsendevent HTTP/1.1" 200 10049 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 6_0 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) Version/6.0 Mobile/10A5376e Safari/8536.25 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"
- 打開服務器窗口展示如下所示表示成功
3.4 mutate插件
mutate_主要是對字段進行、類型轉換、刪除、替換、更新等操作_
●remove_ field 刪除字段
●split字符串切割
●add_ field添加字段
●convert 類型轉換
●gsub字符串替換
●rename字段重命名
mutate插件是logstash另一個非常重要的插件,它提供了豐富的基礎類型數據處理能力,包括重命名、刪除、替換、修改日志事件中的字段。我們這里舉幾個常用的mutate插件:字段類型轉換功能covert、正則表達式替換字段功能gsub、分隔符分隔字符串為數值功能split、重命名字段功能rename、刪除字段功能remove_field。
1.mutate刪除無用字段比如: headers、message、 agent
filter{
grok {
match => {
"message" => "%{IP:ip}"
}
remove_field => ["message"]
}
geoip {
source => "ip"
}
}
2.分隔符分隔字符串為數組---->字符分割
split可以通過指定的分隔符分隔字段中的字符串為數組。
filter{
mutate {
split => { "message" => "|" }
}
}
3.添加字段add_field。
添加字段多用於split分隔中,主要是對split分隔后的字段中指定格式輸出。
mutate {
add_field => {
"userID" => "%{[message][0]}"
remove_field => [ "message","headers","timestamp" ]
}
4.mutate中的convert類型轉煥。支持轉換 integer、float、string、和boolean
mutate {
add_field => {
"userID" => "%{[message][0]}"
"Action" => "%{[message][1]}"
"Date" => "%{[message][2]}"
}
remove_field => ["message","headers"]
convert => {
"userID" => "integer"
"Action" => "string"
"Date" => "string"
}
}
4.Logstash Output插件
負責將Logstash Event輸出,常見的插件如下:
●stdout
●filehe
●elasticsearch
output {
stdout {
codec => rubydebug
}
elasticsearch {
hosts => ["10.0.0.161:9200","10.0.0.162:9200","10.0.0.163:9200"]
index => "app-%{+YYYY.MM.dd}" #索引名稱
template_overwrite => true
}
}
上述案例代碼實現下效果展示
[root@logstash-node1 conf.d]# cat input_http_filter_grok_output_console.conf
input {
http {
port => 7474
}
}
filter {
# grok {
# match => { "message" => "%{COMBINEDAPACHELOG}" }
# }
#
# geoip {
# source => "clientip"
# }
#
#
# #30/Dec/2019:11:59:18 +0800
# date {
# match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
# target => "@timestamp"
# timezone => "Asia/Shanghai"
# }
#
# useragent {
# source => "agent"
# target => "agent"
# }
# mutate {
# remove_field => [ "message","headers","timestamp" ]
# }
mutate {
split => { "message" => "|" }
}
mutate {
add_field => {
"userID" => "%{[message][0]}"
"Action" => "%{[message][1]}"
"Date" => "%{[message][2]}"
}
remove_field => ["message","headers"]
convert => {
"userID" => "integer"
"Action" => "string"
"Date" => "string"
}
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
hosts => ["10.0.0.161:9200","10.0.0.162:9200","10.0.0.163:9200"]
index => "app-%{+YYYY.MM.dd}" #索引名稱
template_overwrite => true
}
}
日志收集概述
●1.將Nginx普通日志轉換為json
●2.將Nginx 日志的時間格式進行格式化輸出
●3.將Nginx 日志的來源IP進行地域分析
●4.將Nginx 日志的user -agent字段進行分析
●5.將Nginx 日志的bytes修改為整數
●6.移除沒有用的字段, message、 headers
#日志格式
66.249.73.135 - - [20/May/2015:21:05:11 +0000] "GET /blog/tags/xsendevent HTTP/1.1" 200 10049 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 6_0 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) Version/6.0 Mobile/10A5376e Safari/8536.25 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"
1.在grok上面生成message。
1.編寫fiebeat
#依賴環境filebeat
[root@web01 ~]# cd /etc/filebeat/
[root@web01 filebeat]# /var/log/nginx/access.log
[root@web01 filebeat]# vim filebeat.yml
[root@web01 filebeat]# cat filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
tags: ["nginx-access"]
- type: log
enabled: true
path:
- /var/log/nginx/error.log
tags: ["nginx-error"]
output.logstash:
hosts: ["10.0.0.151:5044"]
#將日志寫入/var/log/nginx/access.log
[root@web01 filebeat]# cat /var/log/nginx/access.log
66.249.73.135 - - [20/May/2015:21:05:11 +0000] "GET /blog/tags/xsendevent HTTP/1.1" 200 10049 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 6_0 like Mac OS X) AppleWebKit/536.26 (KHTML, like Gecko) Version/6.0 Mobile/10A5376e Safari/8536.25 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"
[root@web01 filebeat]# systemctl restart filebeat
編寫logstash文件
[root@logstash-node1 conf.d]# vim input_filebeat_output_es.conf
[root@logstash-node1 conf.d]# cat input_filebeat_output_es.conf
input {
beats {
port => 5044
}
}
filter {
if "nginx-access" in [tags][0] {
grok {
match => { "message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})\" %{NUMBER:response} (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:useragent}" }
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
timezone => "Asia/Shanghai"
}
geoip {
source => "clientip"
}
useragent {
source => "useragent"
target => "useragent"
}
mutate {
rename => ["%{[host][name]}" , "hostname" ]
convert => [ "bytes", "integer" ]
remove_field => [ "message", "agent" , "input","ecs" ]
add_field => { "target_index" => "logstash-nginx-access-%{+YYYY.MM.dd}" }
}
} else if "nginx-error" in [tags][0] {
mutate {
add_field => { "target_index" => "logstash-nginx-error-%{+YYYY.MM.dd}" }
}
}
}
output {
elasticsearch {
hosts => ["10.0.0.161:9200","10.0.0.162:9200","10.0.0.163:9200"]
index => "%{[target_index]}"
}
}
[root@logstash-node1 conf.d]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/input_filebeat_output_es.conf -r
#另開一個窗口查看端口
[root@logstash-node1 conf.d]# netstat -lntp
tcp6 0 0 :::5044 :::* LISTEN 10500/java
#創造點錯誤日志--->web01
[root@web01 filebeat]# curl 10.0.0.7/sdasfdsafadsfsdaf
進入瀏覽器查看並分析
1.MySQL慢日志收集介紹
1.什么是Mysql慢查詢日志?
當SQL語句執行時間超過所設定的閾值時,便會記錄到指定的日志文件中,所記錄內容稱之為慢查詢日志。
2.為什么要收集Mysql慢查詢日志?
數據庫在運行期間,可能會存在SQL語句查詢過慢,那我們如何快速定位、分析哪些SQL語旬需要優化處理,
又是哪些SQL語旬給業務系統造成影響呢?
當我們進行統-的收集分析, SQL語句執行的時間,對應語句的具體寫法,一目了然.
3.如何收集Mysq|慢查詢日志?
1.安裝MySQL
2.開啟MySQL慢查詢日志記錄
3.使用filebeat收集本地慢查詢日志路徑
環境:10.0.0.7 2G 1G
[root@web01 ~]# yum install mariadb mariadb-server -y
#重啟mariadb
[root@db01 ~]# vim /etc/my.cnf
[mysqld]
...
slow_query_log=ON
slow_query_log_file=/var/log/mariadb/slow.log
long_query_time=3
...
[root@db01 ~]# systemctl restart mariadb
[root@web01 ~]# ls /var/log/mariadb
mariadb.log slow.log
[root@web01 ~]# mysql -uroot -poldxu.com
Welcome to the MariaDB monitor. Commands end with ; or \g.
Your MariaDB connection id is 8
Server version: 5.5.64-MariaDB MariaDB Server
Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
#模擬慢日志
MariaDB [(none)]> select sleep(1) user,host from mysql.user;
+------+-----------+
| user | host |
+------+-----------+
| 0 | % |
| 0 | % |
| 0 | % |
| 0 | % |
| 0 | 127.0.0.1 |
| 0 | ::1 |
| 0 | localhost |
| 0 | localhost |
| 0 | localhost |
| 0 | web01 |
| 0 | web01 |
+------+-----------+
11 rows in set (11.48 sec)
Your MariaDB connection id is 8
MariaDB [(none)]> select sleep(1) user,host from mysql.user;
日志格式轉換
#編寫filebeat.yml文件
[root@web01 filebeat]# cat filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/mariadb/slow.log
exclude_lines: ['^\# Time']
multiline.pattern: '^\# User'
multiline.negate: true
multiline.match: after
multiline.max_lines: 10000
tags: ["mysql-slow"]
output.logstash:
hosts: ["10.0.0.151:5044"]
編寫logstash文件
[root@logstash-node1 conf.d]# cat input_filebeat_mysql_output_es.conf
input {
beats {
port => 5044
}
}
filter {
mutate {
gsub => ["message","\n"," "]
}
grok {
match => {
"message" => "(?m)^# User@Host: %{USER:User}\[%{USER-2:User}\] @ (?:(?<Clienthost>\S*) )?\[(?:%{IP:Client_IP})?\] # Thread_id: %{NUMBER:Thread_id:integer}\s+ Schema: (?:(?<DBname>\S*) )\s+QC_hit: (?:(?<QC_hit>\S*) )# Query_time: %{NUMBER:Query_Time}\s+ Lock_time: %{NUMBER:Lock_Time}\s+ Rows_sent: %{NUMBER:Rows_Sent:integer}\s+Rows_examined: %{NUMBER:Rows_Examined:integer} SET timestamp=%{NUMBER:timestamp}; \s*(?<Query>(?<Action>\w+)\s+.*)"
}
}
date {
match => ["timestamp","UNIX", "YYYY-MM-dd HH:mm:ss"]
target => "@timestamp"
timezone => "Asia/Shanghai"
}
mutate {
remove_field => ["message","input","timestamp","agent","ecs","log"]
convert => ["Lock_Time","float"]
convert => ["Query_Time","float"]
add_field => { "target_index" => "logstash-mysql-slow-%{+YYYY.MM.dd}" }
}
}
output {
elasticsearch {
hosts => ["10.0.0.161:9200"]
index => "%{[target_index]}"
}
stdout {
codec => "rubydebug"
}
}
[root@logstash-node1 conf.d]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/input_filebeat_mysql_output_es.conf -r
#進行日志刷新,啟動filebeat
[root@web01 filebeat]# mysql -uroot -poldxu.com
Welcome to the MariaDB monitor. Commands end with ; or \g.
Your MariaDB connection id is 17
Server version: 5.5.64-MariaDB MariaDB Server
Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
MariaDB [(none)]> select sleep(1) user,host from mysql.user;
+------+-----------+
| user | host |
+------+-----------+
| 0 | % |
| 0 | % |
| 0 | % |
| 0 | % |
| 0 | 127.0.0.1 |
| 0 | ::1 |
| 0 | localhost |
| 0 | localhost |
| 0 | localhost |
| 0 | web01 |
| 0 | web01 |
+------+-----------+
11 rows in set (11.01 sec)
MariaDB [(none)]> Bye
[root@web01 filebeat]# systemctl restart filebeat
服務器輸出窗口如下圖示
慢日志檢測
創建索引,依次按步驟執行結果如下
logstash手機app日志
#上傳app-dashboard-1.0-SNAPSHOT 到服務器web01模擬日志
[root@web01 log]# java -jar app-dashboard-1.0-SNAPSHOT.jar &>/var/log/app.log
[root@web01 ~]# tail -f /var/log/app.log
[INFO] 2020-01-15 22:21:03 [cn.oldxu.dashboard.Main] - DAU|2635|領取優惠券|2020-01-15 18:09:02
[INFO] 2020-01-15 22:21:08 [cn.oldxu.dashboard.Main] - DAU|3232|領取優惠券|2020-01-15 15:21:06
[INFO] 2020-01-15 22:21:11 [cn.oldxu.dashboard.Main] - DAU|8655|使用優惠券|2020-01-15 10:05:10
[INFO] 2020-01-15 22:21:15 [cn.oldxu.dashboard.Main] - DAU|498|評論商品|2020-01-15 18:15:04
[INFO] 2020-01-15 22:21:18 [cn.oldxu.dashboard.Main] - DAU|1603|加入購物車|2020-01-15 16:13:03
[INFO] 2020-01-15 22:21:18 [cn.oldxu.dashboard.Main] - DAU|7085|提交訂單|2020-01-15 15:10:06
[INFO] 2020-01-15 22:21:21 [cn.oldxu.dashboard.Main] - DAU|5576|搜索|2020-01-15 09:06:06
[INFO] 2020-01-15 22:21:23 [cn.oldxu.dashboard.Main] - DAU|6309|搜索|2020-01-15 11:20:16
編寫filebeat.yml配置文件
[root@web01 filebeat]# cat filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/app.log
hosts: ["10.0.0.151:5044"]
#思考:如果有兩台機器都有日志,filebeat則再另外一台機器也要配置filebeat.yml
編寫logstash文件
[root@logstash-node1 conf.d]# cat input_filebeat_app_output_es.conf
input {
beats {
port => 5044
}
}
filter {
mutate {
split => {"message" => "|"}
add_field => {
"UserID" => "%{[message][1]}"
"Action" => "%{[message][2]}"
"Date" => "%{[message][3]}"
}
convert => {
"UserID" => "integer"
"Action" => "string"
"Date" => "string"
}
}
#2020-01-15 17:04:15
date {
match => ["Date","yyyy-MM-dd HH:mm:ss"]
target => "@timestamp"
timezone => "Asia/Chongqing"
}
mutate {
#remove_field => ["message","Date"]
add_field => { "target_index" => "logstash-app-%{+YYYY.MM.dd}" }
}
}
output {
elasticsearch {
hosts => ["10.0.0.161:9200"]
index => "%{[target_index]}"
template_overwrite => true
}
stdout {
codec => "rubydebug"
}
}
[root@logstash-node1 conf.d]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/input_filebeat_app_output_es.conf -r
[root@web01 filebeat]# systemctl restart filebeat