轉載自:https://elasticstack.blog.csdn.net/article/details/114383426

在今天的文章中,我們將詳述如何使用 Logstash 來解析 JSON 文件的日志,並把它導入到 Elasticsearch 中。在之前的文章 “Logstash:Data轉換,分析,提取,豐富及核心操作” 也有提到過,但是沒有具體的例子。總體說來解析 JSON 文件的日志有兩種方法:
在 file input 里使用 JSON codec
在 file input 里不使用 JSON codec,但是在 filter 的部分使用 JSON filter
我們把 JSON 格式的數據解析並導入到 Elasticsearch 的流程如下:
准備數據
我們准備了如下的數據:
sample.json
{"id": 4,"timestamp":"2019-06-10T18:01:32Z","paymentType":"Visa","name":"Cary Boyes","gender":"Male","ip_address":"223.113.73.232","purpose":"Grocery","country":"Pakistan","pastEvents":[{"eventId":7,"transactionId":"63941-950"},{"eventId":8,"transactionId":"55926-0011"}],"age":46}
{"id": 5,"timestamp":"2020-02-18T12:27:35Z","paymentType":"Visa","name":"Betteanne Diament","gender":"Female","ip_address":"159.148.102.98","purpose":"Computers","country":"Brazil","pastEvents":[{"eventId":9,"transactionId":"76436-101"},{"eventId":10,"transactionId":"55154-3330"}],"age":41}
構建 Logstash 配置文件
使用 json codec
input {
file {
path => [ "/Users/liuxg/data/logstash_json/sample.json" ]
start_position => "beginning"
sincedb_path => "/dev/null"
codec => "json"
}
}
output {
stdout {
codec => rubydebug
}
}
我們運行 Logstash:
sudo ./bin/logstash -f logstash_json.conf
上面的命令輸出的結果為:
從上面的結果中,我們可以看出來文檔被正確地解析。
使用 JSON filter
我們可以在 file input 中不使用任何的 code,但是我們可以可以使用 JSON filter 來完成解析的工作:
logstash_json_fileter.conf
input {
file {
path => [ "/Users/liuxg/data/logstash_json/sample.json" ]
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
json {
source => "message"
}
}
output {
stdout {
codec => rubydebug
}
}
在上面,我們添加了 filter 這個部分。我們使用了 json 這個過濾器來完成對 JSON 格式的解析。重新運行我們的 Logstash。我們可以看到如下的輸出:
在上面,我們可以看到一個叫做 message 的字段。這個字段顯然它會占存儲空間。我們可以把它刪除掉。同時,我們也可以去掉那些不需要的元字段以節省空間。
logstash_json_fileter.conf
input {
file {
path => [ "/Users/liuxg/data/logstash_json/sample.json" ]
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
json {
source => "message"
}
if [paymentType] == "Mastercard" {
drop{}
}
mutate {
remove_field => ["message", "path", "host", "@version"]
}
}
output {
stdout {
codec => rubydebug
}
}
在上面,我們檢查 paymentType 是否為 Mastercard,如果是的話,我們把整個事件丟棄。同時我們刪除不需要的字段,比如 message, path 等。重新運行 Logstash。我們可以看到如下的輸出:
顯然這次的輸出比剛才的要干凈很多。你可能已經注意到 @timestamp 的值和 timestamp 的值不太一樣。在 Kibana 中,我們經常會使用 @timestamp 作為事件的時間標簽。我們可以做如下的處理:
logstash_json_fileter.conf
input {
file {
path => [ "/Users/liuxg/data/logstash_json/sample.json" ]
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
json {
source => "message"
}
if [paymentType] == "Mastercard" {
drop{}
}
date {
match => [ "timestamp", "ISO8601" ]
locale => en
}
mutate {
remove_field => ["message", "path", "host", "@version", "timestamp"]
}
}
output {
stdout {
codec => rubydebug
}
}
在上面,我們添加了 date 過濾器來解析時間。同時我們也刪除 timestamp 這個字段。我們得到的結果是:
從上面我們可以看出來 @timestamp 的時間現在是時間的 timestamp 字段的時間。
在上面,我們看到 postEvent 是一個數組。如果我們想把這個數組拆分,並把其中的每一個事件作為一個分別的事件。我們可以使用 split 過濾器來完成。
logstash_json_fileter.conf
input {
file {
path => [ "/Users/liuxg/data/logstash_json/sample.json" ]
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
json {
source => "message"
}
if [paymentType] == "Mastercard" {
drop{}
}
date {
match => [ "timestamp", "ISO8601" ]
locale => en
}
mutate {
remove_field => ["message", "path", "host", "@version", "timestamp"]
}
split {
field => "[pastEvents]"
}
}
output {
stdout {
codec => rubydebug
}
}
從上面我們可以看出來 postEvents 數組被拆分,並形成多個文檔。上面的最終文檔還是有些美中不足:eventId 及 transactionId 還是處於 pastEvents 對象之下。我們想把它移到和 id 同一級的位置。為此,我們做如下的修改:
logstash_json_fileter.conf
input {
file {
path => [ "/Users/liuxg/data/logstash_json/sample.json" ]
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
json {
source => "message"
}
if [paymentType] == "Mastercard" {
drop{}
}
date {
match => [ "timestamp", "ISO8601" ]
locale => en
}
split {
field => "[pastEvents]"
}
mutate {
add_field => {
"eventId" => "%{[pastEvents][eventId]}"
"transactionId" => "%{[pastEvents][transactionId]}"
}
remove_field => ["message", "path", "host", "@version", "timestamp", "pastEvents"]
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
index => "logstash_json"
}
}
重新運行 Logstash。我們可以看到如下的輸出:
在上面,我們把 eventId 及 transactionId 移到文檔的根下面,並刪除 pastEvents 這個字段。我們同時也把文檔導入到 Elasticsearch 中。
我們可以在 Elasticsearch 中對文檔進行搜索:
GET logstash_json/_search
{
"took" : 1,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 4,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "logstash_json",
"_type" : "_doc",
"_id" : "JXZRAHgBoLC90rTy6jNl",
"_score" : 1.0,
"_source" : {
"gender" : "Female",
"@timestamp" : "2020-02-18T12:27:35.000Z",
"id" : 5,
"country" : "Brazil",
"name" : "Betteanne Diament",
"paymentType" : "Visa",
"transactionId" : "76436-101",
"eventId" : "9",
"ip_address" : "159.148.102.98",
"age" : 41,
"purpose" : "Computers"
}
},
{
"_index" : "logstash_json",
"_type" : "_doc",
"_id" : "KHZRAHgBoLC90rTy6jNl",
"_score" : 1.0,
"_source" : {
"gender" : "Male",
"@timestamp" : "2019-06-10T18:01:32.000Z",
"id" : 4,
"country" : "Pakistan",
"name" : "Cary Boyes",
"paymentType" : "Visa",
"transactionId" : "55926-0011",
"eventId" : "8",
"ip_address" : "223.113.73.232",
"age" : 46,
"purpose" : "Grocery"
}
},
...
