使用Elasticsearch的processors來對csv格式數據進行解析


來源數據是一個csv文件,具體內容如下圖所示:

導入數據到es中

有兩種辦法,第一種是在kibana界面直接上傳文件導入

第二種方法是使用filebeat讀取文件導入
這里采用第二種辦法

配置文件名:filebeat_covid19.yml

filebeat.inputs:
- type: log
  paths:
    - /covid19/covid19.csv # 文件路徑根據實際情況修改
  exclude_lines: ['^Lat'] # 去掉csv文件的第一行數據header
 
output.elasticsearch:
  hosts: ["http://localhost:9200"]
  index: covid19 # 設置索引名
 
setup.ilm.enabled: false # 不使用索引生命周期管理
setup.template.name: covid19
setup.template.pattern: covid19

注意:csv文件的第一行是數據的header,需要去掉這一行。為此,采用了exclude_lines: ['^Lat']來去掉第一行。

執行如下命令導入數據:./filebeat -e -c filebeat_covid19.yml

Filebeat的registry文件存儲Filebeat用於跟蹤上次讀取位置的狀態和位置信息。如果由於某種原因,我們想重復對這個csv文件的處理,我們可以刪除如下的目錄:

  • data/registry 針對 .tar.gz and .tgz 歸檔文件安裝
  • /var/lib/filebeat/registry 針對 DEB 及 RPM 安裝包
  • c:\ProgramData\filebeat\registry 針對 Windows zip 文件

對數據進行查詢:GET covid19/_search
其中一條數據格式:

{
    "_index" : "covid19",
    "_type" : "_doc",
    "_id" : "udJG93EB9vfbZvWY2eEV",
    "_score" : 1.0,
    "_source" : {
      "@timestamp" : "2020-05-09T02:32:26.345Z",
      "log" : {
        "file" : {
          "path" : "/usr/local/src/covid19.csv"
        },
        "offset" : 3308
      },
      "message" : """37.1232245,-78.4927721,"Virginia, US",Virginia,",",US,2640,0,0""",
      "input" : {
        "type" : "log"
      },
      "ecs" : {
        "version" : "1.1.0"
      },
      "host" : {
        "os" : {
          "codename" : "Core",
          "platform" : "centos",
          "version" : "7 (Core)",
          "family" : "redhat",
          "name" : "CentOS Linux",
          "kernel" : "4.4.196-1.el7.elrepo.x86_64"
        },
        "id" : "8e40a96218dc4a3db226ae44244c0b26",
        "containerized" : false,
        "name" : "bogon",
        "hostname" : "bogon",
        "architecture" : "x86_64"
      },
      "agent" : {
        "ephemeral_id" : "4d3cb998-1a1c-4545-8c60-ab0ddd135d86",
        "hostname" : "bogon",
        "id" : "147d9456-23f4-470e-94cb-fdddab45f5a6",
        "version" : "7.5.0",
        "type" : "filebeat"
      }
    }
  }

利用Processors來加工數據

去掉無用的字段

PUT _ingest/pipeline/covid19_parser
{
  "processors": [
    {
      "remove": {
        "field": ["log", "input", "ecs", "host", "agent"],
        "if": "ctx.log != null && ctx.input != null && ctx.ecs != null && ctx.host != null && ctx.agent != null"
      }
    }
  ]
}

上面的pipeline定義了一個叫做remove的processor。它檢查log,input, ecs, host及agent都不為空的情況下,刪除字段log, input,ecs, host及agent。

應用pipleline,執行如下命令:POST covid19/_update_by_query?pipeline=covid19_parser

替換引號

導入的message數據為:

"""37.1232245,-78.4927721,"Virginia, US",Virginia,",",US,221,0,0"""

這里的數據有很多的引號"字符,想把這些字符替換為符號'。為此,需要gsub processors來幫我們處理。重新修改我們的pipeline:

PUT _ingest/pipeline/covid19_parser
{
  "processors": [
    {
      "remove": {
        "field": ["log", "input", "ecs", "host", "agent"],
        "if": "ctx.log != null && ctx.input != null && ctx.ecs != null && ctx.host != null && ctx.agent != null"
      }
    },
    {
      "gsub": {
        "field": "message",
        "pattern": "\"",
        "replacement": "'"
      }
    }    
  ]
}

注意:上述語句在kibana的Dev Tools中不能執行“自動縮進”命令,否則“gsub”中的“pattern”,會由"pattern": "\"",變成"pattern": """"""",

應用pipleline,執行如下命令:POST covid19/_update_by_query?pipeline=covid19_parser

看出來我們已經成功都去掉了引號。我們的message的信息如下:

"37.1232245,-78.4927721,'Virginia, US',Virginia,',',US,221,0,0"

解析信息

這一步的操作具體來說是把message的信息,由一行信息轉換成json樣式的鍵值對數據

首先使用Kibana所提供的Grok Debugger來幫助我們分析數據。我們將使用如下的grok pattern來解析我們的message:

%{NUMBER:lat:float},%{NUMBER:lon:float},'%{DATA:address}',%{DATA:city},',',%{DATA:country},%{NUMBER:infected:int},%{NUMBER:death:int}

重新修改pipeline:

PUT _ingest/pipeline/covid19_parser
{
  "processors": [
    {
      "remove": {
        "field": ["log", "input", "ecs", "host", "agent"],
        "if": "ctx.log != null && ctx.input != null && ctx.ecs != null && ctx.host != null && ctx.agent != null"
      }
    },
    {
      "gsub": {
        "field": "message",
        "pattern": "\"",
        "replacement": "'"
      }
    },
    {
     "grok": {
        "field": "message",
        "patterns": [
          "%{NUMBER:lat:float},%{NUMBER:lon:float},'%{DATA:address}',%{DATA:city},',',%{DATA:country},%{NUMBER:infected:int},%{NUMBER:death:int}"
        ]
      }
    }        
  ]
}

使用如下的命令來重新對數據進行分析:

POST covid19/_update_by_query?pipeline=covid19_parser

可以看到新增加的country,infected,address等等的字段。

添加location字段

需要創建一個新的location字段,把原先表示經緯度的lon及lat字段給概括進去
更新pipeline為:

PUT _ingest/pipeline/covid19_parser
{
  "processors": [
    {
      "remove": {
        "field": ["log", "input", "ecs", "host", "agent"],
        "if": "ctx.log != null && ctx.input != null && ctx.ecs != null && ctx.host != null && ctx.agent != null"
      }
    },
    {
      "gsub": {
        "field": "message",
        "pattern": "\"",
        "replacement": "'"
      }
    },
    {
     "grok": {
        "field": "message",
        "patterns": [
          "%{NUMBER:lat:float},%{NUMBER:lon:float},'%{DATA:address}',%{DATA:city},',',%{DATA:country},%{NUMBER:infected:int},%{NUMBER:death:int}"
        ]
      }
    },
    {
      "set": {
        "field": "location.lat",
        "value": "{{lat}}"
      }
    },
    {
      "set": {
        "field": "location.lon",
        "value": "{{lon}}"
      }
    }              
  ]
}

設置了一個叫做location.lat及location.lon的兩個字段。它們的值分別是{{lat}}{{lon}}

由於location是一個新增加的字段,在默認的情況下,它的兩個字段都會被Elasticsearch設置為text的類型。為了能夠讓我們的數據在地圖中進行顯示,它必須是一個geo_point的數據類型。為此,我們必須通過如下命令來設置它的數據類型:

PUT covid19/_mapping
{
  "properties": {
    "location": {
      "type": "geo_point"
    }
  }
}

再使用如下的命令來對我們的數據重新進行處理:

POST covid19/_update_by_query?pipeline=covid19_parser

同時也可以查看covid19的mapping。

GET covid19/_mapping

我們可以發現location的數據類型為:

"location" : {
     "type" : "geo_point"
 },

它顯示location的數據類型是對的。

到目前為止,已經夠把數據處理成所需要的數據了,可以用來進一步展示使用。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM