Elasticsearch中使用pipeline API來對事件進行處理


在Elasticsearch 5.0之前,如果我們想在將文檔索引到Elasticsearch之前預處理文檔,那么唯一的方法是使用Logstash或以編程方式/手動預處理它們,然后將它們索引到Elasticsearch。 Elasticsearch缺乏預處理/轉換文檔的能力,它只是按原樣索引文檔。 但是,在Elasticsearch 5.x之后引入一個名為ingest node的功能,為Elasticsearch本身的文檔預處理和豐富之前提供了一個輕量級的解決方案。
當我們的數據進入到Elastic集群中,並指定需要用到的Pipeline,那么Elasticsearch中的ingest node將會幫我們安裝規定的processor順序來執行對數據的操作和處理。這在某種程度上方便了我們許多對集群的部署。如果我們單獨部署一個Logstash有時沒有那么多的靈活性。我們可以通過編程的方式隨時修改這個pipeline

 當我們的數據進入到Elastic集群中,並指定需要用到的Pipeline,那么Elasticsearch中的ingest node將會幫我們安裝規定的processor順序來執行對數據的操作和處理。這在某種程度上方便了我們許多對集群的部署。如果我們單獨部署一個Logstash有時沒有那么多的靈活性。我們可以通過編程的方式隨時修改這個pipeline。

 如果使用默認配置實現Elasticsearch節點,則默認情況下將啟用master,data和ingest(即,它將充當主節點,數據節點和提取節點)。 要在節點上禁用ingest,請在elasticsearch.yml文件中配置以下設置:

node.ingest: false

ingest節點可用於在對文檔執行實際索引之前預處理文檔。 此預處理通過截取批量和索引請求的攝取節點執行,它將轉換應用於數據,然后將文檔傳遞回索引或批量API。 隨着新的攝取功能的發布,Elasticsearch已經取出了Logstash的過濾器部分,以便我們可以在Elasticsearch中處理原始日志和豐富。
要在索引之前預處理文檔,我們必須定義pipeline(其中包含稱為處理器的步驟序列,用於轉換傳入文檔)。 要使用pipeline,我們只需在索引或批量請求上指定pipeline參數,以告訴攝取節點使用哪個pipeliPOST my_index/my_type?pipeline=my_pipeline_id

POST my_index/my_type?pipeline=my_pipeline_id 
{
     "key": "value"
}

定義一個pipeline
pipeline定義了一系列處理器。 每個處理器以某種方式轉換文檔。 每個處理器按照在pipeline中定義的順序執行。 pipeline由兩個主要字段組成:description和processor列表。
description參數是一個非必需字段,用於存儲一些描述/管道的用法; 使用processor參數,可以列出處理器以轉換文檔。processor的典型結構如下所示:

{
     "description" : "...",
     "processors" : [ ... ]
}

inget節點有大約20個內置processor,包括gsub,grok,轉換,刪除,重命名等。 這些可以在構建管道時使用。 除了內置processor外,還可以使用攝取附件(如ingest attachment,ingetst geo-ip和ingest user-agent)等攝取插件,並可在構建pipeline時使用。 這些插件在默認情況下不可用,可以像任何其他Elasticsearch插件一樣進行安裝。
Pipeline以cluster狀態存儲,並且立即傳播到所有ingest node。 當ingest node接收到新pipeline時,它們將以內存pipeline表示形式更新其節點,並且pipeline更改將立即生效。


Ingest APIs


 ingest節點提供一組稱為inget API的API,可用於定義,模擬,刪除或查找有關pipeline的信息。 攝取API端點是_ingest。


Put pipeline API

此API用於定義新pipeline。 此API還用於添加新pipeline或更新現有pipeline。
我們來看一個例子吧。 如下面的代碼所示,我們定義了一個名為firstpipeline的新pipeline,它將消息字段中的值轉換為大寫

PUT _ingest/pipeline/firstpipeline
{
  "description": "uppsercase the incoming vlaue in the message filed",
  "processors": [
    {
      "uppercase": {
        "field": "message"
      }
    }
  ]
}

我們在Kibana中執行上面的命令,我們可以看到成功返回:

讓我們來測試一下我們新建立的pipeline:

PUT myindex/_doc/1?pipeline=firstpipeline
{
  "name": "pipeline",
  "message": "this is so cool!"
}

我們看見我們的文檔印被成功創建並存於一個叫做myindex的index里。下面我們來查看一下,我們剛才定義的pipeline是否已經起作用了。

GET myindex/_doc/1

我們可以看到我們的message已經都變成大寫的了。

創建管道時,可以定義多個處理器,執行順序取決於定義中定義的順序。 讓我們看一個這樣的例子。 如下面的代碼所示,我們創建了一個名為secondpipeline的新管道,它轉換“message”字段中存在的大寫值,並將“message”字段重命名為“data”。 它創建一個名為“label”的新字段,其值為testlabel:

PUT _ingest/pipeline/secondpipeline
{
  "description": "uppercase the incoming value in the message field",
  "processors": [
    {
      "uppercase": {
        "field": "message",
        "ignore_failure": true
      }
    },
    {
      "rename": {
        "field": "message",
        "target_field": "data",
        "ignore_failure": true
      }
    },
    {
      "set": {
        "field": "label",
        "value": "testlabel",
        "override": false
      }
    }
  ]
}

我們的第二個被叫做secondpipeline的也已經創建好了。接下來,讓我們來利用這個pipeline來對我們的文檔進行處理。我們在Kibnana中輸入:

PUT myindex/_doc/1?pipeline=secondpipeline
{
  "name": "pipeline",
  "message": "this is so cool!"
}

然后,我們使用如下的命令來查詢我們剛才輸入的文檔:

GET myindex/_doc/1

顯示的結果如下:

通過上面的例子,我們可以看到我們之前的message項不見了,取而代之的是data,同時它里面的字符都變成大寫的了。另外,它也新增加了一個叫做label的項,並且它的值被設置為testlabel。
提示:如果缺少處理器中使用的字段,則處理器將拋出異常,並且不會對文檔編制索引。 為了防止處理器拋出異常,我們可以利用
“ignore_failure”:true參數。


獲取 pipeline API

此API用於檢索現有pipeline的定義。 使用此API,可以找到單個pipeline定義的詳細信息或查找所有pipeline的定義。
查找所有pipeline定義的命令是:

GET _ingest/pipeline

要查找現有pipeline的定義,請將管道ID傳遞給get管道.api。 以下是查找名為secondpipeline的pipeline定義的示例:

GET _ingest/pipeline/secondpipeline

我們也可以使用filter_path來獲取pipeline的部分內容,比如:

GET _ingest/pipeline/secondpipeline?filter_path=*.processors.uppercase

上面將返回如下的結果:

{
  "secondpipeline" : {
    "processors" : [
      {
        "uppercase" : {
          "field" : "message",
          "ignore_failure" : true
        }
      }
    ]
  }
}

上面返回processors中的uppercase內容。

刪除 pipeline API

刪除管道API按ID或通配符匹配刪除pipeline。 以下是刪除名為firstpipeline的pipeline的示例:

DELETE _ingest/pipeline/firstpipeline

這樣firstpipeline就被刪除了。
由於pipeline是群集級存儲而被保存在每個節點的內存中,並且pipeline始終在ingest node中運行,因此最好在群集中保留需要的pipeline,而刪除那些不需要的pipeline。

 

設置默認pipeline API


模擬 pipeline API

此pipeline API可用於根據請求正文中提供的文檔集模擬pipeline的執行。 可以指定要對提供的文檔執行的現有pipeline,或者在請求的主體中提供pipeline定義。 要模擬ingest pipeline,請將“_simulate”端點添加到pipeline API。
以下是模擬現有pipeline的示例:

POST _ingest/pipeline/secondpipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "name": "pipeline",
        "message": "this is so cool!"
      }
    },
    {
      "_source": {
        "name": "nice",
        "message": "this is nice!"
      }
    }
  ]
}

執行的結果是:

我們可以在右邊看出來執行pipeline所顯示的結果。

內置processors

默認情況下,Elasticsearch提供大量的ingest處理器。 我們可以在地址https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest-processors.html 找到已經為我設計好的內置的processors。下面是一些常見的一些processor的列表:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2026 CODEPRJ.COM