在Elasticsearch 5.0之前,如果我們想在將文檔索引到Elasticsearch之前預處理文檔,那么唯一的方法是使用Logstash或以編程方式/手動預處理它們,然后將它們索引到Elasticsearch。 Elasticsearch缺乏預處理/轉換文檔的能力,它只是按原樣索引文檔。 但是,在Elasticsearch 5.x之后引入一個名為ingest node的功能,為Elasticsearch本身的文檔預處理和豐富之前提供了一個輕量級的解決方案。
當我們的數據進入到Elastic集群中,並指定需要用到的Pipeline,那么Elasticsearch中的ingest node將會幫我們安裝規定的processor順序來執行對數據的操作和處理。這在某種程度上方便了我們許多對集群的部署。如果我們單獨部署一個Logstash有時沒有那么多的靈活性。我們可以通過編程的方式隨時修改這個pipeline
當我們的數據進入到Elastic集群中,並指定需要用到的Pipeline,那么Elasticsearch中的ingest node將會幫我們安裝規定的processor順序來執行對數據的操作和處理。這在某種程度上方便了我們許多對集群的部署。如果我們單獨部署一個Logstash有時沒有那么多的靈活性。我們可以通過編程的方式隨時修改這個pipeline。
如果使用默認配置實現Elasticsearch節點,則默認情況下將啟用master,data和ingest(即,它將充當主節點,數據節點和提取節點)。 要在節點上禁用ingest,請在elasticsearch.yml文件中配置以下設置:
node.ingest: false
ingest節點可用於在對文檔執行實際索引之前預處理文檔。 此預處理通過截取批量和索引請求的攝取節點執行,它將轉換應用於數據,然后將文檔傳遞回索引或批量API。 隨着新的攝取功能的發布,Elasticsearch已經取出了Logstash的過濾器部分,以便我們可以在Elasticsearch中處理原始日志和豐富。
要在索引之前預處理文檔,我們必須定義pipeline(其中包含稱為處理器的步驟序列,用於轉換傳入文檔)。 要使用pipeline,我們只需在索引或批量請求上指定pipeline參數,以告訴攝取節點使用哪個pipeliPOST my_index/my_type?pipeline=my_pipeline_id
POST my_index/my_type?pipeline=my_pipeline_id { "key": "value" }
定義一個pipeline
pipeline定義了一系列處理器。 每個處理器以某種方式轉換文檔。 每個處理器按照在pipeline中定義的順序執行。 pipeline由兩個主要字段組成:description和processor列表。
description參數是一個非必需字段,用於存儲一些描述/管道的用法; 使用processor參數,可以列出處理器以轉換文檔。processor的典型結構如下所示:
{ "description" : "...", "processors" : [ ... ] }
inget節點有大約20個內置processor,包括gsub,grok,轉換,刪除,重命名等。 這些可以在構建管道時使用。 除了內置processor外,還可以使用攝取附件(如ingest attachment,ingetst geo-ip和ingest user-agent)等攝取插件,並可在構建pipeline時使用。 這些插件在默認情況下不可用,可以像任何其他Elasticsearch插件一樣進行安裝。
Pipeline以cluster狀態存儲,並且立即傳播到所有ingest node。 當ingest node接收到新pipeline時,它們將以內存pipeline表示形式更新其節點,並且pipeline更改將立即生效。
Ingest APIs
ingest節點提供一組稱為inget API的API,可用於定義,模擬,刪除或查找有關pipeline的信息。 攝取API端點是_ingest。
Put pipeline API
此API用於定義新pipeline。 此API還用於添加新pipeline或更新現有pipeline。
我們來看一個例子吧。 如下面的代碼所示,我們定義了一個名為firstpipeline的新pipeline,它將消息字段中的值轉換為大寫
PUT _ingest/pipeline/firstpipeline { "description": "uppsercase the incoming vlaue in the message filed", "processors": [ { "uppercase": { "field": "message" } } ] }
我們在Kibana中執行上面的命令,我們可以看到成功返回:
讓我們來測試一下我們新建立的pipeline:
PUT myindex/_doc/1?pipeline=firstpipeline { "name": "pipeline", "message": "this is so cool!" }

我們看見我們的文檔印被成功創建並存於一個叫做myindex的index里。下面我們來查看一下,我們剛才定義的pipeline是否已經起作用了。
GET myindex/_doc/1
我們可以看到我們的message已經都變成大寫的了。
創建管道時,可以定義多個處理器,執行順序取決於定義中定義的順序。 讓我們看一個這樣的例子。 如下面的代碼所示,我們創建了一個名為secondpipeline的新管道,它轉換“message”字段中存在的大寫值,並將“message”字段重命名為“data”。 它創建一個名為“label”的新字段,其值為testlabel:
PUT _ingest/pipeline/secondpipeline { "description": "uppercase the incoming value in the message field", "processors": [ { "uppercase": { "field": "message", "ignore_failure": true } }, { "rename": { "field": "message", "target_field": "data", "ignore_failure": true } }, { "set": { "field": "label", "value": "testlabel", "override": false } } ] }

我們的第二個被叫做secondpipeline的也已經創建好了。接下來,讓我們來利用這個pipeline來對我們的文檔進行處理。我們在Kibnana中輸入:
PUT myindex/_doc/1?pipeline=secondpipeline { "name": "pipeline", "message": "this is so cool!" }
然后,我們使用如下的命令來查詢我們剛才輸入的文檔:
GET myindex/_doc/1
顯示的結果如下:

通過上面的例子,我們可以看到我們之前的message項不見了,取而代之的是data,同時它里面的字符都變成大寫的了。另外,它也新增加了一個叫做label的項,並且它的值被設置為testlabel。
提示:如果缺少處理器中使用的字段,則處理器將拋出異常,並且不會對文檔編制索引。 為了防止處理器拋出異常,我們可以利用
“ignore_failure”:true參數。
獲取 pipeline API
此API用於檢索現有pipeline的定義。 使用此API,可以找到單個pipeline定義的詳細信息或查找所有pipeline的定義。
查找所有pipeline定義的命令是:
GET _ingest/pipeline

要查找現有pipeline的定義,請將管道ID傳遞給get管道.api。 以下是查找名為secondpipeline的pipeline定義的示例:
GET _ingest/pipeline/secondpipeline

我們也可以使用filter_path來獲取pipeline的部分內容,比如:
GET _ingest/pipeline/secondpipeline?filter_path=*.processors.uppercase
上面將返回如下的結果:
{ "secondpipeline" : { "processors" : [ { "uppercase" : { "field" : "message", "ignore_failure" : true } } ] } }
上面返回processors中的uppercase內容。
刪除 pipeline API
刪除管道API按ID或通配符匹配刪除pipeline。 以下是刪除名為firstpipeline的pipeline的示例:
DELETE _ingest/pipeline/firstpipeline

這樣firstpipeline就被刪除了。
由於pipeline是群集級存儲而被保存在每個節點的內存中,並且pipeline始終在ingest node中運行,因此最好在群集中保留需要的pipeline,而刪除那些不需要的pipeline。
設置默認pipeline API

模擬 pipeline API
此pipeline API可用於根據請求正文中提供的文檔集模擬pipeline的執行。 可以指定要對提供的文檔執行的現有pipeline,或者在請求的主體中提供pipeline定義。 要模擬ingest pipeline,請將“_simulate”端點添加到pipeline API。
以下是模擬現有pipeline的示例:
POST _ingest/pipeline/secondpipeline/_simulate { "docs": [ { "_source": { "name": "pipeline", "message": "this is so cool!" } }, { "_source": { "name": "nice", "message": "this is nice!" } } ] }
執行的結果是:
我們可以在右邊看出來執行pipeline所顯示的結果。
內置processors
默認情況下,Elasticsearch提供大量的ingest處理器。 我們可以在地址https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest-processors.html 找到已經為我設計好的內置的processors。下面是一些常見的一些processor的列表:
