Logstash:解析 JSON 文件並導入到 Elasticsearch 中


轉載自:https://elasticstack.blog.csdn.net/article/details/114383426

在今天的文章中,我們將詳述如何使用 Logstash 來解析 JSON 文件的日志,並把它導入到 Elasticsearch 中。在之前的文章 “Logstash:Data轉換,分析,提取,豐富及核心操作” 也有提到過,但是沒有具體的例子。總體說來解析 JSON 文件的日志有兩種方法:

    在 file input 里使用 JSON codec
    在 file input 里不使用 JSON codec,但是在 filter 的部分使用 JSON filter

我們把 JSON 格式的數據解析並導入到 Elasticsearch 的流程如下:


准備數據

我們准備了如下的數據:

sample.json

    {"id": 4,"timestamp":"2019-06-10T18:01:32Z","paymentType":"Visa","name":"Cary Boyes","gender":"Male","ip_address":"223.113.73.232","purpose":"Grocery","country":"Pakistan","pastEvents":[{"eventId":7,"transactionId":"63941-950"},{"eventId":8,"transactionId":"55926-0011"}],"age":46}
    {"id": 5,"timestamp":"2020-02-18T12:27:35Z","paymentType":"Visa","name":"Betteanne Diament","gender":"Female","ip_address":"159.148.102.98","purpose":"Computers","country":"Brazil","pastEvents":[{"eventId":9,"transactionId":"76436-101"},{"eventId":10,"transactionId":"55154-3330"}],"age":41}

 

構建 Logstash 配置文件
使用 json codec

    input {
      file {
        path => [ "/Users/liuxg/data/logstash_json/sample.json" ]
        start_position => "beginning"
        sincedb_path => "/dev/null"
        codec   => "json"
      }
    }
     
    output {   
      stdout {
        codec => rubydebug
      }
    }

我們運行 Logstash:

sudo ./bin/logstash -f logstash_json.conf 

上面的命令輸出的結果為:

從上面的結果中,我們可以看出來文檔被正確地解析。


使用 JSON filter

我們可以在 file input 中不使用任何的 code,但是我們可以可以使用 JSON filter 來完成解析的工作:

logstash_json_fileter.conf

    input {
      file {
        path => [ "/Users/liuxg/data/logstash_json/sample.json" ]
        start_position => "beginning"
        sincedb_path => "/dev/null"
      }
    }
     
    filter {
      json {
        source => "message"
      }
     
    }
     
    output {   
      stdout {
        codec => rubydebug
      }
    }

在上面,我們添加了 filter 這個部分。我們使用了 json 這個過濾器來完成對 JSON 格式的解析。重新運行我們的 Logstash。我們可以看到如下的輸出:

在上面,我們可以看到一個叫做 message 的字段。這個字段顯然它會占存儲空間。我們可以把它刪除掉。同時,我們也可以去掉那些不需要的元字段以節省空間。

logstash_json_fileter.conf

    input {
      file {
        path => [ "/Users/liuxg/data/logstash_json/sample.json" ]
        start_position => "beginning"
        sincedb_path => "/dev/null"
      }
    }
     
    filter {
      json {
        source => "message"
      }
     
      if [paymentType] == "Mastercard" {
        drop{}
      }
     
      mutate {
        remove_field => ["message", "path", "host", "@version"]
      }
     
    }
     
    output {   
      stdout {
        codec => rubydebug
      }
    }

在上面,我們檢查 paymentType 是否為 Mastercard,如果是的話,我們把整個事件丟棄。同時我們刪除不需要的字段,比如 message, path 等。重新運行 Logstash。我們可以看到如下的輸出:

顯然這次的輸出比剛才的要干凈很多。你可能已經注意到 @timestamp 的值和 timestamp 的值不太一樣。在 Kibana 中,我們經常會使用 @timestamp 作為事件的時間標簽。我們可以做如下的處理:

logstash_json_fileter.conf

    input {
      file {
        path => [ "/Users/liuxg/data/logstash_json/sample.json" ]
        start_position => "beginning"
        sincedb_path => "/dev/null"
      }
    }
     
    filter {
      json {
        source => "message"
      }
     
      if [paymentType] == "Mastercard" {
        drop{}
      }
     
      date {
        match => [ "timestamp", "ISO8601" ]
        locale => en
      }
     
      mutate {
        remove_field => ["message", "path", "host", "@version", "timestamp"]
      }
     
    }
     
    output {   
      stdout {
        codec => rubydebug
      }
    }

在上面,我們添加了 date 過濾器來解析時間。同時我們也刪除 timestamp 這個字段。我們得到的結果是:

從上面我們可以看出來 @timestamp 的時間現在是時間的 timestamp 字段的時間。

在上面,我們看到 postEvent 是一個數組。如果我們想把這個數組拆分,並把其中的每一個事件作為一個分別的事件。我們可以使用 split 過濾器來完成。

logstash_json_fileter.conf

    input {
      file {
        path => [ "/Users/liuxg/data/logstash_json/sample.json" ]
        start_position => "beginning"
        sincedb_path => "/dev/null"
      }
    }
     
    filter {
      json {
        source => "message"
      }
     
      if [paymentType] == "Mastercard" {
        drop{}
      }
     
      date {
        match => [ "timestamp", "ISO8601" ]
        locale => en
      }
     
      mutate {
        remove_field => ["message", "path", "host", "@version", "timestamp"]
      }
     
      split {
        field => "[pastEvents]"
      }
     
    }
     
    output {   
      stdout {
        codec => rubydebug
      }
    }

從上面我們可以看出來 postEvents 數組被拆分,並形成多個文檔。上面的最終文檔還是有些美中不足:eventId 及 transactionId 還是處於 pastEvents 對象之下。我們想把它移到和 id 同一級的位置。為此,我們做如下的修改:

logstash_json_fileter.conf

    input {
      file {
        path => [ "/Users/liuxg/data/logstash_json/sample.json" ]
        start_position => "beginning"
        sincedb_path => "/dev/null"
      }
    }
     
    filter {
      json {
        source => "message"
      }
     
      if [paymentType] == "Mastercard" {
        drop{}
      }
     
      date {
        match => [ "timestamp", "ISO8601" ]
        locale => en
      }
     
      split {
        field => "[pastEvents]"
      }
     
      mutate {
        add_field => {
          "eventId" => "%{[pastEvents][eventId]}"
          "transactionId" => "%{[pastEvents][transactionId]}"
        }
     
        remove_field => ["message", "path", "host", "@version", "timestamp", "pastEvents"]
      }
    }
     
    output {   
      stdout {
        codec => rubydebug
      }
     
      elasticsearch {
        index => "logstash_json"
      }
    }

重新運行 Logstash。我們可以看到如下的輸出:

在上面,我們把 eventId 及 transactionId 移到文檔的根下面,並刪除 pastEvents 這個字段。我們同時也把文檔導入到 Elasticsearch 中。

我們可以在 Elasticsearch 中對文檔進行搜索:

GET logstash_json/_search

    {
      "took" : 1,
      "timed_out" : false,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 4,
          "relation" : "eq"
        },
        "max_score" : 1.0,
        "hits" : [
          {
            "_index" : "logstash_json",
            "_type" : "_doc",
            "_id" : "JXZRAHgBoLC90rTy6jNl",
            "_score" : 1.0,
            "_source" : {
              "gender" : "Female",
              "@timestamp" : "2020-02-18T12:27:35.000Z",
              "id" : 5,
              "country" : "Brazil",
              "name" : "Betteanne Diament",
              "paymentType" : "Visa",
              "transactionId" : "76436-101",
              "eventId" : "9",
              "ip_address" : "159.148.102.98",
              "age" : 41,
              "purpose" : "Computers"
            }
          },
          {
            "_index" : "logstash_json",
            "_type" : "_doc",
            "_id" : "KHZRAHgBoLC90rTy6jNl",
            "_score" : 1.0,
            "_source" : {
              "gender" : "Male",
              "@timestamp" : "2019-06-10T18:01:32.000Z",
              "id" : 4,
              "country" : "Pakistan",
              "name" : "Cary Boyes",
              "paymentType" : "Visa",
              "transactionId" : "55926-0011",
              "eventId" : "8",
              "ip_address" : "223.113.73.232",
              "age" : 46,
              "purpose" : "Grocery"
            }
          },
      ...


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM