ElasticSearch筆記-管道(pipeline)


攝取節點(ingest)

ES集群中存在一個攝取節點,在數據保存到文檔索引之前,我們可能需要對文檔進行預處理,而攝取節點會攔截這些請求,根據需要將文檔中的內容進行處理然后傳遞回索引或者API中。
例如,您可以使用管道來移除字段、從文本中提取值以及豐富數據。

攝取節點的配置:

默認配置下所有節點都啟用了ingest。因此任何一個ES節點都可以處理ingest任務。就像之前ES集群中描述的,我們可以創建一個專門處理相關業務的ingest節點。
控制節點的ingest開關在其elasticsearch.yml中的參數

## 關閉ingest
node.ingest: false

管道(pipeline)

攝取節點節點對數據的處理主要是通過管道(pipeline),在索引和請求中指定管道參數,這樣ingest節點在攔截請求后就指定使用哪條管道進行處理。

創建管道

PUT _ingest/pipeline/test_pipeline
{
  "description": "測試管道",
  "processors": [
    {
      "set": {
        "field": "des",
        "value": "管道默認數據"
      }
    }
  ]
}

查詢管道

# 查詢test_pipeline管道
GET _ingest/pipeline/test_pipeline
# 查詢全部管道
GET _ingest/pipeline/*

刪除管道

DELETE _ingest/pipeline/test_pipeline
DELETE _ingest/pipeline/*

使用管道

創建索引:

PUT index10
{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0
  },
  "mappings": {
    "properties": {
      "id":{
        "type":"long"
      },
      "description":{
        "type":"text"
      },
      "name":{
        "properties": {
          "firstname":{"type":"keyword"},
          "lastname":{"type":"keyword"}
        }
      }
    }
  }
}

插入數據,使用已創建的管道

PUT index10/_doc/1?pipeline=test_pipeline
{
    "id": 1,
    "description": "描述1"
}

查看數據:

{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "index10",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 1.0,
        "_source" : {
          "des" : "管道默認數據",
          "description" : "描述1",
          "id" : 1
        }
      }
    ]
  }
}

會發現其des內容並非我們插入的數據,而是管道設置的參數。
從上面的內容可以發現,管道其實很像是我們平時使用的攔截器操作,會攔截一些操作然后對其進行修改。

模擬管道

GET _ingest/pipeline/_simulate
  {
  "pipeline" :
  {
    "description": "_description",
    "processors": [
      {
        "set" : {
          "field" : "field2",
          "value" : "_value"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "foo": "bar"
      }
    },
    {
      "_source": {
        "foo": "rab"
      }
    }
  ]
}

結果:

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "field2" : "_value",
          "foo" : "bar"
        },
        "_ingest" : {
          "timestamp" : "2022-02-15T08:38:42.604065Z"
        }
      }
    },
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "field2" : "_value",
          "foo" : "rab"
        },
        "_ingest" : {
          "timestamp" : "2022-02-15T08:38:42.604094Z"
        }
      }
    }
  ]
}

管道中對文檔數據操作

操作文檔數據

假如需要對文檔中某些字段進行操作,只需要在field中指定字段名稱或者通過_source前綴進行訪問。

GET _ingest/pipeline/_simulate
  {
  "pipeline" :
  {
    "description": "_description",
    "processors": [
      {
        "set" : {
          "field" : "foo",
          "value" : "foo_value"
        }
      },
        {"set" : {
          "field" : "_source.bar",
          "value" : "bar_value"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "foo": "foo1",
        "bar":"bar1"
      }
    },
    {
      "_source": {
        "foo": "foo2",
        "bar":"bar2"
      }
    }
  ]
}

結果可以發現使用這兩個方式都可以修改對應內容。

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "bar" : "bar_value",
          "foo" : "foo_value"
        },
        "_ingest" : {
          "timestamp" : "2022-02-15T08:54:16.390878Z"
        }
      }
    },
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "bar" : "bar_value",
          "foo" : "foo_value"
        },
        "_ingest" : {
          "timestamp" : "2022-02-15T08:54:16.390885Z"
        }
      }
    }
  ]
}

除了文檔中的字段,我們也可以使用_index、_type、_id、_routing對ES的元數據進行訪問。(無需擔心此操作會影響文檔數據,因為ES限制映射中的字段不能存在和元數據相同的字段名)
在上面例子的返回內容中可以看到有一個_ingest屬性。_ingest的數據同樣可以使用_ingest.timestamp來提取其時間戳。但是需要注意的是ingest元數據並不是持久的數據,其在管道處理完畢之后就被丟棄。

使用文檔中的數據

有些時候我們使用管道只是進行時間戳的賦值或者字段拼接,其內容根據數據內容的變化而變化。這個時候我們需要獲取到文檔或者ES的元素進行操作。這個情況下我們可以使用{{field}}的方式進行訪問。
獲取_ingest中的時間戳賦值給received

{
  "set": {
    "field": "received",
    "value": "{{_ingest.timestamp}}"
  }
}

簡單的字段拼接

{
  "set": {
    "field": "field_c",
    "value": "{{field_a}} {{field_b}}"
  }
}

動態參數設置

同樣我們可以在field字段上使用{{}}使其可以根據實際數據對不同的字段進行賦值。

{
  "set": {
    "field": "{{service}}",
    "value": "{{code}}"
  }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM