ELK 介紹
ELK 最早是 Elasticsearch(以下簡稱ES)、Logstash、Kibana 三款開源軟件的簡稱,三款軟件后來被同一公司收購,並加入了Xpark、Beats等組件,改名為Elastic Stack,成為現在最流行的開源日志解決方案,雖然有了新名字但大家依然喜歡叫她ELK,現在所說的ELK就指的是基於這些開源軟件構建的日志系統。
我們收集mysql慢日志的方案如下:

-
mysql 服務器安裝 Filebeat 作為 agent 收集 slowLog
-
Filebeat 讀取 mysql 慢日志文件做簡單過濾傳給 Kafka 集群
-
Logstash 讀取 Kafka 集群數據並按字段拆分后轉成 JSON 格式存入 ES 集群
-
Kibana讀取ES集群數據展示到web頁面上
慢日志分類
目前主要使用的mysql版本有5.5、5.6 和 5.7,經過仔細對比發現每個版本的慢查詢日志都稍有不同,如下:
5.5 版本慢查詢日志

5.6 版本慢查詢日志

5.7 版本慢查詢日志

慢查詢日志異同點:
-
每個版本的Time字段格式都不一樣
-
相較於5.6、5.7版本,5.5版本少了Id字段
-
use db語句不是每條慢日志都有的
-
可能會出現像下邊這樣的情況,慢查詢塊# Time:下可能跟了多個慢查詢語句

處理思路
上邊我們已經分析了各個版本慢查詢語句的構成,接下來我們就要開始收集這些數據了,究竟應該怎么收集呢?
-
拼裝日志行:mysql 的慢查詢日志多行構成了一條完整的日志,日志收集時要把這些行拼裝成一條日志傳輸與存儲。
-
Time行處理:# Time: 開頭的行可能不存在,且我們可以通過SET timestamp這個值來確定SQL執行時間,所以選擇過濾丟棄Time行
-
一條完整的日志:最終將以# User@Host: 開始的行,和以SQL語句結尾的行合並為一條完整的慢日志語句
-
確定SQL對應的DB:use db這一行不是所有慢日志SQL都存在的,所以不能通過這個來確定SQL對應的DB,慢日志中也沒有字段記錄DB,所以這里建議為DB創建賬號時添加db name標識,例如我們的賬號命名方式為:projectName_dbName,這樣看到賬號名就知道是哪個DB了
-
確定SQL對應的主機:我想通過日志知道這條SQL對應的是哪台數據庫服務器怎么辦?
慢日志中同樣沒有字段記錄主機,可以通過filebeat注入字段來解決,例如我們給filebeat的name字段設置為服務器IP,這樣最終通過beat.name這個字段就可以確定SQL對應的主機了。
Filebeat配置
filebeat 完整的配置文件如下:

# mysql_slow.log
-
input_type: log
paths:
- /home/logs/mysql/mysqld_slow.log
document_type: mysqld-slow
exclude_lines: ['^\# Time']
multiline.pattern: '^\# Time|^\# User'
multiline.negate: true
multiline.match: after
tail_files: true
重要參數解釋:
-
input_type:指定輸入的類型是log或者是stdin
-
paths:慢日志路徑,支持正則,比如/data/*.log
-
exclude_lines:過濾掉# Time開頭的行
-
multiline.pattern:匹配多行時指定正則表達式,這里匹配以# Time或者# User開頭的行,Time行要先匹配再過濾
-
multiline.negate:定義上邊pattern匹配到的行是否用於多行合並,也就是定義是不是作為日志的一部分
-
multiline.match:定義如何將皮排行組合成時間,在之前或者之后
-
tail_files:定義是從文件開頭讀取日志還是結尾,這里定義為true,從現在開始收集,之前已存在的不管
-
name:設置filebeat的名字,如果為空則為服務器的主機名,這里我們定義為服務器IP
-
output.kafka:配置要接收日志的kafka集群地址可topic名稱
Kafka 接收到的日志格式:

{"@timestamp":"2018-08-07T09:36:00.140Z","beat":{"hostname":"db-7eb166d3","name":"10.63.144.71","version":"5.4.0"},"input_type":"log","message":"# User@Host: select[select] @ [10.63.144.16] Id: 23460596\n# Query_time: 0.155956 Lock_time: 0.000079 Rows_sent: 112 Rows_examined: 366458\nSET timestamp=1533634557;\nSELECT DISTINCT(uid) FROM common_member WHERE hideforum=-1 AND uid != 0;","offset":1753219021,"source":"/data/slow/mysql_slow.log","type":"log"}
Logstash配置
logstash完整的配置文件如下:

僅顯示filter信息
if [type] =~ "mysqld-slow" {
mutate {
add_field => {"line_message" => "%{message} %{offset}"}
}
ruby {
code => "
require 'digest/md5';
event.set('computed_id', Digest::MD5.hexdigest(event.get('line_message')))
"
}
#有ID有use
grok {
match => {
"message" => "(?m)^# User@Host: %{USER:user}\[[^\]]+\] @\s+(?:(?<clienthost>\S*))?\s+\[(?:%{IP:clientip})?\]\s+Id:\s+%{NUMBER:id:
int}\n\#\s+Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent:\s+%{NUMBER:rows_sent:int}\s+Rows_examined:
%{NUMBER:rows_examined:int}\nuse\s(?<dbname>\w+)\;\s+SET\s+timestamp=%{NUMBER:timestamp_mysql:int}\;\s+(?<query>.*)"
}
}
#有ID無use
grok {
match => {
"message" => "(?m)^# User@Host: %{USER:user}\[[^\]]+\] @\s+(?:(?<clienthost>\S*))?\s+\[(?:%{IP:clientip})?\]\s+Id:\s+%{NUMBER:id
:int}\n\#\s+Query_time: %{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent:\s+%{NUMBER:rows_sent:int}\s+Rows_examined:
%{NUMBER:rows_examined:int}\nSET\s+timestamp=%{NUMBER:timestamp_mysql:int}\;\s+(?<query>.*)"
}
}
#無ID有use
grok {
match => {
"message" => "(?m)^# User@Host: %{USER:user}\[[^\]]+\] @\s+(?:(?<clienthost>\S*))?\s+\[(?:%{IP:clientip})?\]\n\#\s+Query_time: %
{NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent:\s+%{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:i
nt}\nuse\s(?<dbname>\w+)\;\nSET\s+timestamp=%{NUMBER:timestamp_mysql:int}\;\s+(?<query>.*)"
}
}
#無ID無use
grok {
match => {
"message" => "(?m)^# User@Host: %{USER:user}\[[^\]]+\] @\s+(?:(?<clienthost>\S*))?\s+\[(?:%{IP:clientip})?\]\n\#\s+Query_time: %{
NUMBER:query_time:float}\s+Lock_time: %{NUMBER:lock_time:float}\s+Rows_sent:\s+%{NUMBER:rows_sent:int}\s+Rows_examined: %{NUMBER:rows_examined:in
t}\nSET\s+timestamp=%{NUMBER:timestamp_mysql:int}\;\s+(?<query>.*)"
}
}
date {
match => ["timestamp_mysql", "UNIX"]
target => "@timestamp"
}
mutate {
remove_field => ["line_message","message","kafka","tags"]
}
}
重要參數解釋:
-
input:配置 kafka 的集群地址和 topic 名字
-
filter:過濾日志文件,主要是對 message 信息(看前文 kafka 接收到的日志格式)進行拆分,拆分成一個一個易讀的字段,例如User、Host、Query_time、Lock_time、timestamp等。
grok段根據我們前文對mysql慢日志的分類分別寫不通的正則表達式去匹配,當有多條正則表達式存在時,logstash會從上到下依次匹配,匹配到一條后邊的則不再匹配。
date字段定義了讓SQL中的timestamp_mysql字段作為這條日志的時間字段,kibana上看到的實踐排序的數據依賴的就是這個時間
-
output:配置ES服務器集群的地址和index,index自動按天分割
ES 中mysqld-slow-*索引模板
{
"order": 0,
"template": "mysqld-slow-*",
"settings": {
"index": {
"refresh_interval": "5s"
}
},
"mappings": {
"mysqld-slow": {
"numeric_detection": true,
"properties": {
"@timestamp": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis"
},
"@version": {
"type": "string"
},
"query_time": {
"type": "double"
},
"row_sent": {
"type": "string"
},
"rows_examined": {
"type": "string"
},
"clientip": {
"type": "string"
},
"clienthost": {
"type": "string"
},
"id": {
"type": "integer"
},
"lock_time": {
"type": "string"
},
"dbname": {
"type": "keyword"
},
"user": {
"type": "keyword"
},
"query": {
"type": "string",
"index": "not_analyzed"
},
"tags": {
"type": "string"
},
"timestamp": {
"type": "string"
},
"type": {
"type": "string"
}
}
}
},
"aliases": {}
}
kibana查詢展示
-
打開Kibana添加
mysql-slowlog-*的Index,並選擇timestamp,創建Index Pattern
進入Discover頁面,可以很直觀的看到各個時間點慢日志的數量變化,可以根據左側Field實現簡單過濾,搜索框也方便搜索慢日志,例如我要找查詢時間大於2s的慢日志,直接在搜索框輸入
query_time: > 2回車即可。
點擊每一條日志起邊的很色箭頭能查看具體某一條日志的詳情。
如果你想做個大盤統計慢日志的整體情況,例如top 10 SQL等,也可以很方便的通過web界面配置。

-
總結
-
不要望而卻步,當你開始去做已經成功一半了
-
本篇文章詳細介紹了關於mysql慢日志的收集,收集之后的處理呢?我們目前是DBA每天花時間去Kibana上查看分析,有優化的空間就跟開發一起溝通優化,后邊達成默契之后考慮做成自動報警或處理
-
關於報警ELK生態的xpark已經提供,且最新版本也開源了,感興趣的可以先研究起來,歡迎一起交流
-
