概述
logstash 之所以強大和流行,與其豐富的過濾器插件是分不開的
過濾器提供的並不單單是過濾的功能,還可以對進入過濾器的原始數據進行復雜的邏輯處理,甚至添加獨特的新事件到后續流程中
強大的文本解析工具 -- Grok
grok 是一個十分強大的 logstash filter 插件,他可以解析任何格式的文本,他是目前 logstash 中解析非結構化日志數據最好的方式
基本用法
Grok 的語法規則是:
%{語法 : 語義}
“語法”指的就是匹配的模式,例如使用 NUMBER 模式可以匹配出數字,IP 則會匹配出 127.0.0.1 這樣的 IP 地址:
%{NUMBER:lasttime}%{IP:client}
默認情況下,所有“語義”都被保存成字符串,你也可以添加轉換到的數據類型
%{NUMBER:lasttime:int}%{IP:client}
目前轉換類型只支持 int 和 float
覆蓋 -- overwrite
使用 Grok 的 overwrite 參數也可以覆蓋日志中的信息
filter {
grok {
match => { "message" => "%{SYSLOGBASE} %{DATA:message}" }
overwrite => [ "message" ]
}
}
日志中的 message 字段將會被覆蓋
示例
對於下面的log,事實上是一個 HTTP 請求行:
55.3.244.1 GET /index.html 15824 0.043
我們可以使用下面的 logstash 配置:
input {
file {
path => "/var/log/http.log"
}
}
filter {
grok {
match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
}
}
可以看到收集結果:
client: 55.3.244.1 method: GET request: /index.html bytes: 15824 duration: 0.043
將無結構的數據通過這樣的方式實現了結構化輸出
Grok 使用正則表達式
grok 是在正則表達式的基礎上實現的(使用 Oniguruma 庫),因此他可以解析任何正則表達式
創建模式
提取日志字段和正則表達式提取字段的規則一樣:
(?<field_name>the pattern here)
首先,創建一個模式文件,寫入你需要的正則表達式:
# contents of ./patterns/postfix:
POSTFIX_QUEUEID [0-9A-F]{10,11}
然后配置你的 Logstash:
filter {
grok {
patterns_dir => "./patterns"
match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" }
}
}
針對日志:
Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message-id=<20130101142543.5828399CCAF@mailserver14.example.com>
可以匹配出:
timestamp: Jan 1 06:25:43
logsource: mailserver14
program: postfix/cleanup
pid: 21403
queue_id: BEF25A72965
syslog_message: message-id=<20130101142543.5828399CCAF@mailserver14.example.com>
IP 位置插件 -- Geoip
Logstash 1.3.0 以上可以使用 geoip 插件獲取 IP 對應的地理位置,對於 accesslog 等的統計來說,IP 來源是非常有用的一個信息
使用方法
geoip {
source => ...
}
示例
filter {
geoip {
source => "message"
}
}
運行結果:
{
"message" => "183.60.92.253",
"@version" => "1",
"@timestamp" => "2014-08-07T10:32:55.610Z",
"host" => "raochenlindeMacBook-Air.local",
"geoip" => {
"ip" => "183.60.92.253",
"country_code2" => "CN",
"country_code3" => "CHN",
"country_name" => "China",
"continent_code" => "AS",
"region_name" => "30",
"city_name" => "Guangzhou",
"latitude" => 23.11670000000001,
"longitude" => 113.25,
"timezone" => "Asia/Chongqing",
"real_region_name" => "Guangdong",
"location" => [
[0] 113.25,
[1] 23.11670000000001
]
}
}
可以看到,logstash 通過提取到的 message 字段中的 IP,解析到了地理位置相關的一系列信息
當然,對於解析出的眾多數據,你也可以通過 fields 選項進行篩選
filter {
geoip {
fields => ["city_name", "continent_code", "country_code2", "country_code3", "country_name", "dma_code", "ip", "latitude", "longitude", "postal_code", "region_name", "timezone"]
}
}
選項
上面我們看到了 source 和 fields 兩個選項,geoip 還提供了下列選項:
| 選項 | 類型 | 是否必須 | 默認值 | 意義 |
| add_field | hash | 否 | {} | 為當前事件增加一個字段 |
| add_tag | array | 否 | [] | 為當前事件增加一個用於標識的tag |
| database | path | 否 | 無 | 位置信息庫所在文件 |
| fields | array | 否 | 無 | 在 geoip 的返回結果中篩選部分字段 |
| lru_cashe_size | int | 1000 | geoip 占用的緩存大小 | |
| periodic_flush | bool | 否 | false | 是否定期調用刷新方e |
| remove_field | array | 否 | [] | 從結果集中刪除字段 |
| remove_tag | array | 否 | [] | 從結果集中刪除tag |
| source | string | 是 | 無 | 需要解析的存有 IP 的字段名稱 |
| target | string | 否 | "geoip" | 返回的結果中保存 geoip 解析結果的字段名 |
json
對於 json 格式的 log,可以通過 codec 的 json 編碼進行解析,但是如果記錄中只有一部分是 json,這時候就需要在 filter 中使用 json 解碼插件
示例
filter {
json {
source => "message"
target => "jsoncontent"
}
}
運行結果:
{
"@version": "1",
"@timestamp": "2014-11-18T08:11:33.000Z",
"host": "web121.mweibo.tc.sinanode.com",
"message": "{\"uid\":3081609001,\"type\":\"signal\"}",
"jsoncontent": {
"uid": 3081609001,
"type": "signal"
}
}
上面的例子中,解析結果被放到了 target 所指向的節點下,如果希望將解析結果與 log 中其他字段保持在同一層級輸出,那么只需要去掉 target 即可:
{
"@version": "1",
"@timestamp": "2014-11-18T08:11:33.000Z",
"host": "web121.mweibo.tc.sinanode.com",
"message": "{\"uid\":3081609001,\"type\":\"signal\"}",
"uid": 3081609001,
"type": "signal"
}
時間分割 -- split
mutiline 讓 logstash 將多行數據變成一個事件,當然了,logstash 同樣支持將一行數據變成多個事件
logstash 提供了 split 插件,用來把一行數據拆分成多個事件
示例:
filter {
split {
field => "message"
terminator => "#"
}
}
運行結果:
對於 "test1#test2",上述 logstash 配置將其變成了下面兩個事件:
{
"@version": "1",
"@timestamp": "2014-11-18T08:11:33.000Z",
"host": "web121.mweibo.tc.sinanode.com",
"message": "test1"
}
{
"@version": "1",
"@timestamp": "2014-11-18T08:11:33.000Z",
"host": "web121.mweibo.tc.sinanode.com",
"message": "test2"
}
需要注意的是,當 split 插件執行結束后,會直接進入 output 階段,其后的所有 filter 都將不會被執行
數據修改 -- mutate
logstash 還支持在 filter 中對事件中的數據進行修改
重命名 -- rename
對於已經存在的字段,重命名其字段名稱
filter {
mutate {
rename => ["syslog_host", "host"]
}
}
更新字段內容 -- update
更新字段內容,如果字段不存在,不會新建
filter {
mutate {
update => { "sample" => "My new message" }
}
}
替換字段內容 -- replace
與 update 功能相同,區別在於如果字段不存在則會新建字段
filter {
mutate {
replace => { "message" => "%{source_host}: My new message" }
}
}
數據類型轉換 -- convert
filter {
mutate {
convert => ["request_time", "float"]
}
}
文本替換 -- gsub
gsub 提供了通過正則表達式實現文本替換的功能
filter {
mutate {
gsub => [
# replace all forward slashes with underscore
"fieldname", "/", "_",
# replace backslashes, question marks, hashes, and minuses
# with a dot "."
"fieldname2", "[\\?#-]", "."
]
}
}
大小寫轉換 -- uppercase、lowercase
filter {
mutate {
uppercase => [ "fieldname" ]
}
}
去除空白字符 -- strip
類似 php 中的 trim,只去除首尾的空白字符
filter {
mutate {
strip => ["field1", "field2"]
}
}
刪除字段 -- remove、remove_field
remove 不推薦使用,推薦使用 remove_field
filter {
mutate {
remove_field => [ "foo_%{somefield}" ]
}
}
刪除字段 -- remove、remove_field
remove 不推薦使用,推薦使用 remove_field
filter {
mutate {
remove_field => [ "foo_%{somefield}" ]
}
}
分割字段 -- split
將提取到的某個字段按照某個字符分割
filter {
mutate {
split => ["message", "|"]
}
}
針對字符串 "123|321|adfd|dfjld*=123",可以看到輸出結果:
{
"message" => [
[0] "123",
[1] "321",
[2] "adfd",
[3] "dfjld*=123"
],
"@version" => "1",
"@timestamp" => "2014-08-20T15:58:23.120Z",
"host" => "raochenlindeMacBook-Air.local"
}
聚合數組 -- join
將類型為 array 的字段中的 array 元素使用指定字符為分隔符聚合成一個字符串
如我們可以將 split 分割的結果再重新聚合起來:
filter {
mutate {
split => ["message", "|"]
}
mutate {
join => ["message", ","]
}
}
輸出:
{
"message" => "123,321,adfd,dfjld*=123",
"@version" => "1",
"@timestamp" => "2014-08-20T16:01:33.972Z",
"host" => "raochenlindeMacBook-Air.local"
}
合並數組 -- merge
對於幾個類型為 array 或 hash 或 string 的字段,我們可以使用 merge 合並
filter {
mutate {
merge => [ "dest_field", "added_field" ]
}
}
需要注意的是,array 和 hash 兩個字段是不能 merge 的

