Logstash正則提取Nginx日志
為什么需要提取?使用一整行日志無法分析,需要提取單獨的字段
分析哪個IP訪問量大
分析Nginx的響應狀態碼
Nginx日志格式
192.168.238.90 - - [01/Aug/2020:14:53:35 +0800] "GET /sjg666 HTTP/1.1" 404 3650 "-" "Chrome xxx" "-"
Nginx日志格式配置
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
Grok提取利器,需要掌握正則表達式。借助Kibana的Grok工具驗證提取
自寫正則提取(建議)
內置規則提取(簡化):/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns/grok-patterns
Grok自寫正則提取語法:(?<字段名>自寫正則表達式)
(?<remote_addr>\d+\.\d+\.\d+\.\d+)
內置正則提取語法:%{內置正則表達式:字段名}
%{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}
混合語法提取
(?<remote_addr>\d+\.\d+\.\d+\.\d+) - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\]
普通正則表達式符號
. 表示任意一個字符,* 表示前面一個字符出現0次或者多次
[abc]表示中括號內任意一個字符,[^abc]表示非中括號內的字符
[0-9]表示數字,[a-z]表示小寫字母,[A-Z]表示大寫字母,[a-zA-Z]表示所有字母,[a-zA-Z0-9]表示所有字母+數字
[^0-9]表示非數字
^xx表示以xx開頭,xx$表示以xx結尾
\s表示空白字符,\S表示非空白字符,\d表示數字
擴展正則表達式,在普通正則基礎上再進行擴展
?表示前面字符出現0或者1次,+前面字符出現1或者多次
{a}表示前面字符匹配a次,{a,b}表示前面字符匹配a到b次
{,b}表示前面字符匹配0次到b次,{a,}前面字符匹配a或a+次
string1|string2表示匹配string1或者string2
Logstash正則提取Nginx寫入ES
Logstash提取字段配置
input {
file {
path => "/var/log/nginx/access.log"
}
}
filter {
grok {
match => {
"message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}'
}
remove_field => ["message"]
}
}
output {
elasticsearch {
hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"]
user => "elastic"
password => "sjgpwd"
index => "sjgnginx-%{+YYYY.MM.dd}"
}
}
Kibana顯示感嘆號問題處理
Kibana索引刷新
Kibana索引的操作並不會影響到數據,刪除重建也沒問題
Logstash字段特殊處理-替換或轉類型
http_user_agent包含雙引號,需要去除
filter {
grok {
match => {
"message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}'
}
remove_field => ["message"]
}
mutate {
gsub => [ "http_user_agent",'"',"" ]
}
}
Logstash字符串轉整形
mutate{
gsub => [ "http_user_agent",'"',"" ]
convert => { "status" => "integer" }
convert => { "body_bytes_sent" => "integer" }
}
Logstash替換時間戳@timestamp
Nginx模擬用戶訪問
while true;do
curl 192.168.238.90/sjg666
curl 127.0.0.1
sleep 2
done
場景假設
假設我們要分析用戶昨天的訪問日志
Logstash分析所有Nginx日志,發現問題
input {
file {
path => "/var/log/nginx/access.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
兩種時間
發送日志時間,無法分析日志
用戶的訪問時間在日志里,需要以日志里的為准,分析的結果才准確
以用戶訪問時間為准,格式為01/Aug/2020:10:34:20 +0800
filter {
grok {
match => {
"message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}'
}
remove_field => ["message"]
}
date {
match => ["time_local", "dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
}
}
日志里如果有不同的時間格式,覆蓋的時候格式要對應
20/Feb/2019:14:50:06 -> dd/MMM/yyyy:HH:mm:ss
2016-08-24 18:05:39,830 -> yyyy-MM-dd HH:mm:ss,SSS
手動統計Nginx的請求和網頁顯示進行對比
cat /var/log/nginx/access.log |awk '{print $4}'|sed 's/:[0-9][0-9]$//g'|sort |uniq -c
時間戳覆蓋后刪除
mutate {
gsub => [ "http_user_agent",'"',"" ]
convert => { "status" => "integer" }
convert => { "body_bytes_sent" => "integer" }
remove_field => ["time_local"]
}
Logstash正則提取異常處理
Logstash改成分析最新日志
input {
file {
path => "/var/log/nginx/access.log"
}
}
正則提取有異常的情況
echo "sjgmethods xxx xxx" >> /var/log/nginx/access.log
tags: _grokparsefailure
設置正則出錯提取到另外的索引里
output {
if "_grokparsefailure" not in [tags] and "_dateparsefailure" not in [tags] {
elasticsearch {
hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"]
user => "elastic"
password => "sjgpwd"
index => "sjgnginx-%{+YYYY.MM.dd}"
}
}
else{
elasticsearch {
hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"]
user => "elastic"
password => "sjgpwd"
index => "sjgfail-%{+YYYY.MM.dd}"
}
}
}
Kibana圖形簡單使用
模擬數據
while true;do
curl 192.168.238.90/sjg666;
curl 127.0.0.1;
sleep 2;
done
首頁區域
可以根據時間查看訪問量:每分鍾訪問量
可以根據某個字段查詢
可以單獨看某個字段的統計
Kibana圖形有建立,選擇terms去查看對應的數據
餅圖的創建 pie_remote_addr
表的創建 table_remote_addr
Kibana面板的創建sjg_dash
創建面板
在面板上添加圖形
建議采用Grafana展示
Logstash分析Linux系統日志
默認的日志格式
Aug 3 18:37:57 sjg1 sshd[1318]: Accepted password for root from xxx port 49205 ssh2
無年份的字段
系統日志配置/etc/rsyslog.conf,重啟rsyslog
$template sjgformat,"%$NOW% %TIMESTAMP:8:15% %hostname% %syslogtag% %msg%\n"
$ActionFileDefaultTemplate sjgformat
日志格式
2020-08-03 18:47:34 sjg1 sshd[1522]: Accepted password for root from 58.101.14.103 port 49774 ssh2
%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE} %{NOTSPACE:procinfo}: (?<secinfo>.*)
只讀權限添加
chmod +r secure
提取secure日志,messages等其它日志提取原理類似
input {
file {
path => "/var/log/secure"
}
}
filter {
grok {
match => {
"message" => '%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE} %{NOTSPACE:procinfo}: (?<secinfo>.*)'
}
remove_field => ["message"]
}
date {
match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
target => "@timestamp"
}
mutate {
remove_field => ["timestamp"]
}
}
output {
elasticsearch {
hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"]
user => "elastic"
password => "sjgpwd"
index => "sjgsecure-%{+YYYY.MM.dd}"
}
}

