5,Logstash正则提取Nginx日志


									Logstash正则提取Nginx日志
为什么需要提取?使用一整行日志无法分析,需要提取单独的字段
	分析哪个IP访问量大
	分析Nginx的响应状态码

Nginx日志格式
	192.168.238.90 - - [01/Aug/2020:14:53:35 +0800] "GET /sjg666 HTTP/1.1" 404 3650 "-" "Chrome xxx" "-"

Nginx日志格式配置
	log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                      '$status $body_bytes_sent "$http_referer" '
                      '"$http_user_agent" "$http_x_forwarded_for"';


Grok提取利器,需要掌握正则表达式。借助Kibana的Grok工具验证提取
	自写正则提取(建议)
	内置规则提取(简化):/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.1.2/patterns/grok-patterns

Grok自写正则提取语法:(?<字段名>自写正则表达式)
	(?<remote_addr>\d+\.\d+\.\d+\.\d+)

内置正则提取语法:%{内置正则表达式:字段名}
	%{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}

混合语法提取
	(?<remote_addr>\d+\.\d+\.\d+\.\d+) - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\]

普通正则表达式符号
	. 表示任意一个字符,* 表示前面一个字符出现0次或者多次
	[abc]表示中括号内任意一个字符,[^abc]表示非中括号内的字符
	[0-9]表示数字,[a-z]表示小写字母,[A-Z]表示大写字母,[a-zA-Z]表示所有字母,[a-zA-Z0-9]表示所有字母+数字
	[^0-9]表示非数字
	^xx表示以xx开头,xx$表示以xx结尾
	\s表示空白字符,\S表示非空白字符,\d表示数字

扩展正则表达式,在普通正则基础上再进行扩展
	?表示前面字符出现0或者1次,+前面字符出现1或者多次
	{a}表示前面字符匹配a次,{a,b}表示前面字符匹配a到b次
	{,b}表示前面字符匹配0次到b次,{a,}前面字符匹配a或a+次
	string1|string2表示匹配string1或者string2

		
		Logstash正则提取Nginx写入ES
Logstash提取字段配置
input {
	file {
		path => "/var/log/nginx/access.log"
	}
}
filter {
	grok {
		match => {
			"message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}'
		}
		remove_field => ["message"]
	}
}
output {
	elasticsearch {
		hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"]
		user => "elastic"
		password => "sjgpwd"
		index => "sjgnginx-%{+YYYY.MM.dd}"
	}
}

Kibana显示感叹号问题处理
	Kibana索引刷新
	Kibana索引的操作并不会影响到数据,删除重建也没问题

		
		Logstash字段特殊处理-替换或转类型
http_user_agent包含双引号,需要去除
filter {
	grok {
		match => {
			"message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}'
		}
		remove_field => ["message"]
	}
	mutate {
		gsub => [ "http_user_agent",'"',"" ]
	}
}

Logstash字符串转整形
	mutate{
		gsub => [ "http_user_agent",'"',"" ]
		convert => { "status" => "integer" }
		convert => { "body_bytes_sent" => "integer" }
	}

		Logstash替换时间戳@timestamp
Nginx模拟用户访问
while true;do 
	curl 192.168.238.90/sjg666
	curl 127.0.0.1
	sleep 2
done


场景假设
  假设我们要分析用户昨天的访问日志

Logstash分析所有Nginx日志,发现问题
input {
	file {
		path => "/var/log/nginx/access.log"
		start_position => "beginning"
		sincedb_path => "/dev/null"
	}
}

两种时间
	发送日志时间,无法分析日志
	用户的访问时间在日志里,需要以日志里的为准,分析的结果才准确

以用户访问时间为准,格式为01/Aug/2020:10:34:20 +0800
filter {
	grok {
		match => {
			"message" => '%{IP:remote_addr} - (%{WORD:remote_user}|-) \[%{HTTPDATE:time_local}\] "%{WORD:method} %{NOTSPACE:request} HTTP/%{NUMBER}" %{NUMBER:status} %{NUMBER:body_bytes_sent} %{QS} %{QS:http_user_agent}'
		}
    remove_field => ["message"]
	}
  
	date {
		match => ["time_local", "dd/MMM/yyyy:HH:mm:ss Z"]
		target => "@timestamp"
	}
}

日志里如果有不同的时间格式,覆盖的时候格式要对应
20/Feb/2019:14:50:06 -> dd/MMM/yyyy:HH:mm:ss
2016-08-24 18:05:39,830 -> yyyy-MM-dd HH:mm:ss,SSS

手动统计Nginx的请求和网页显示进行对比
cat /var/log/nginx/access.log |awk '{print $4}'|sed 's/:[0-9][0-9]$//g'|sort |uniq -c

时间戳覆盖后删除
mutate {
    gsub => [ "http_user_agent",'"',"" ]
    convert => { "status" => "integer" }
    convert => { "body_bytes_sent" => "integer" }
    remove_field => ["time_local"]
}

		Logstash正则提取异常处理
Logstash改成分析最新日志
input {
	file {
		path => "/var/log/nginx/access.log"
	}
}

正则提取有异常的情况
echo "sjgmethods xxx xxx" >> /var/log/nginx/access.log
tags: _grokparsefailure

设置正则出错提取到另外的索引里
output {
	if "_grokparsefailure" not in [tags] and "_dateparsefailure" not in [tags] {
		elasticsearch {
			hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"]
			user => "elastic"
			password => "sjgpwd"
			index => "sjgnginx-%{+YYYY.MM.dd}"
		}
	}
  
	else{
		elasticsearch {
			hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"]
			user => "elastic"
			password => "sjgpwd"
			index => "sjgfail-%{+YYYY.MM.dd}"
		}
	}
}


		Kibana图形简单使用
模拟数据
while true;do 
	curl 192.168.238.90/sjg666; 
	curl 127.0.0.1; 
	sleep 2; 
done

首页区域
	可以根据时间查看访问量:每分钟访问量
	可以根据某个字段查询
	可以单独看某个字段的统计

Kibana图形有建立,选择terms去查看对应的数据
	饼图的创建 pie_remote_addr
	表的创建 table_remote_addr

Kibana面板的创建sjg_dash
	创建面板
	在面板上添加图形

建议采用Grafana展示

		Logstash分析Linux系统日志
默认的日志格式
	Aug  3 18:37:57 sjg1 sshd[1318]: Accepted password for root from xxx port 49205 ssh2
	无年份的字段

系统日志配置/etc/rsyslog.conf,重启rsyslog
$template sjgformat,"%$NOW% %TIMESTAMP:8:15% %hostname% %syslogtag% %msg%\n"
$ActionFileDefaultTemplate sjgformat

日志格式
2020-08-03 18:47:34 sjg1 sshd[1522]: Accepted password for root from 58.101.14.103 port 49774 ssh2
%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE} %{NOTSPACE:procinfo}: (?<secinfo>.*)

只读权限添加
chmod +r secure

提取secure日志,messages等其它日志提取原理类似
input {
	file {
		path => "/var/log/secure"
	}
}
filter {
	grok {
		match => {
			"message" => '%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE} %{NOTSPACE:procinfo}: (?<secinfo>.*)'
		}
	remove_field => ["message"]
	}

	date {
		match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
		target => "@timestamp"
	}
	mutate {
		remove_field => ["timestamp"]
	}
}

output {
	elasticsearch {
		hosts => ["http://192.168.238.90:9200", "http://192.168.238.92:9200"]
		user => "elastic"
		password => "sjgpwd"
		index => "sjgsecure-%{+YYYY.MM.dd}"
	}
}

 

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM