攝取節點(ingest)
ES集群中存在一個攝取節點,在數據保存到文檔索引之前,我們可能需要對文檔進行預處理,而攝取節點會攔截這些請求,根據需要將文檔中的內容進行處理然后傳遞回索引或者API中。
例如,您可以使用管道來移除字段、從文本中提取值以及豐富數據。
攝取節點的配置:
默認配置下所有節點都啟用了ingest。因此任何一個ES節點都可以處理ingest任務。就像之前ES集群中描述的,我們可以創建一個專門處理相關業務的ingest節點。
控制節點的ingest開關在其elasticsearch.yml中的參數
## 關閉ingest
node.ingest: false
管道(pipeline)
攝取節點節點對數據的處理主要是通過管道(pipeline),在索引和請求中指定管道參數,這樣ingest節點在攔截請求后就指定使用哪條管道進行處理。
創建管道
PUT _ingest/pipeline/test_pipeline
{
"description": "測試管道",
"processors": [
{
"set": {
"field": "des",
"value": "管道默認數據"
}
}
]
}
查詢管道
# 查詢test_pipeline管道
GET _ingest/pipeline/test_pipeline
# 查詢全部管道
GET _ingest/pipeline/*
刪除管道
DELETE _ingest/pipeline/test_pipeline
DELETE _ingest/pipeline/*
使用管道
創建索引:
PUT index10
{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0
},
"mappings": {
"properties": {
"id":{
"type":"long"
},
"description":{
"type":"text"
},
"name":{
"properties": {
"firstname":{"type":"keyword"},
"lastname":{"type":"keyword"}
}
}
}
}
}
插入數據,使用已創建的管道
PUT index10/_doc/1?pipeline=test_pipeline
{
"id": 1,
"description": "描述1"
}
查看數據:
{
"took" : 0,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "index10",
"_type" : "_doc",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"des" : "管道默認數據",
"description" : "描述1",
"id" : 1
}
}
]
}
}
會發現其des內容並非我們插入的數據,而是管道設置的參數。
從上面的內容可以發現,管道其實很像是我們平時使用的攔截器操作,會攔截一些操作然后對其進行修改。
模擬管道
GET _ingest/pipeline/_simulate
{
"pipeline" :
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value" : "_value"
}
}
]
},
"docs": [
{
"_source": {
"foo": "bar"
}
},
{
"_source": {
"foo": "rab"
}
}
]
}
結果:
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"field2" : "_value",
"foo" : "bar"
},
"_ingest" : {
"timestamp" : "2022-02-15T08:38:42.604065Z"
}
}
},
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"field2" : "_value",
"foo" : "rab"
},
"_ingest" : {
"timestamp" : "2022-02-15T08:38:42.604094Z"
}
}
}
]
}
管道中對文檔數據操作
操作文檔數據
假如需要對文檔中某些字段進行操作,只需要在field中指定字段名稱或者通過_source前綴進行訪問。
GET _ingest/pipeline/_simulate
{
"pipeline" :
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "foo",
"value" : "foo_value"
}
},
{"set" : {
"field" : "_source.bar",
"value" : "bar_value"
}
}
]
},
"docs": [
{
"_source": {
"foo": "foo1",
"bar":"bar1"
}
},
{
"_source": {
"foo": "foo2",
"bar":"bar2"
}
}
]
}
結果可以發現使用這兩個方式都可以修改對應內容。
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"bar" : "bar_value",
"foo" : "foo_value"
},
"_ingest" : {
"timestamp" : "2022-02-15T08:54:16.390878Z"
}
}
},
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"bar" : "bar_value",
"foo" : "foo_value"
},
"_ingest" : {
"timestamp" : "2022-02-15T08:54:16.390885Z"
}
}
}
]
}
除了文檔中的字段,我們也可以使用_index、_type、_id、_routing對ES的元數據進行訪問。(無需擔心此操作會影響文檔數據,因為ES限制映射中的字段不能存在和元數據相同的字段名)
在上面例子的返回內容中可以看到有一個_ingest屬性。_ingest的數據同樣可以使用_ingest.timestamp來提取其時間戳。但是需要注意的是ingest元數據並不是持久的數據,其在管道處理完畢之后就被丟棄。
使用文檔中的數據
有些時候我們使用管道只是進行時間戳的賦值或者字段拼接,其內容根據數據內容的變化而變化。這個時候我們需要獲取到文檔或者ES的元素進行操作。這個情況下我們可以使用{{field}}的方式進行訪問。
獲取_ingest中的時間戳賦值給received
{
"set": {
"field": "received",
"value": "{{_ingest.timestamp}}"
}
}
簡單的字段拼接
{
"set": {
"field": "field_c",
"value": "{{field_a}} {{field_b}}"
}
}
動態參數設置
同樣我們可以在field字段上使用{{}}使其可以根據實際數據對不同的字段進行賦值。
{
"set": {
"field": "{{service}}",
"value": "{{code}}"
}
}