ElasticSearch Pipeline 為新增數據設置更新時間


模擬測試

測試

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "set": {
          "field": "timestamp",
          "value": "{{_ingest.timestamp}}"
        }
      },
      {
        "script": {
          "lang": "painless",
          "source": """ ZonedDateTime zdt = ZonedDateTime.parse(ctx.timestamp); DateTimeFormatter dtf = DateTimeFormatter.ofPattern( "yyyy-MM-dd HH:mm:ss"); String datetime = zdt.format(dtf); ctx.newdate = datetime; ctx.newdate2 =         System.currentTimeMillis()/1000; """
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": "測試"
      }
    }
  ]
}

我們使用_ingest.timestamp 與painless 多種方式設置了數據最新更新時間

返回結果

newdate2 為數據更新時間秒,newdate為格式轉換后的數據,timestamp 為 _ingest.timestamp 獲取到的時間

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "newdate2" : 1624848304,
          "message" : "測試",
          "newdate" : "2021-06-28 02:45:04",
          "timestamp" : "2021-06-28T02:45:04.759053131Z"
        },
        "_ingest" : {
          "timestamp" : "2021-06-28T02:45:04.759053131Z"
        }
      }
    }
  ]
}

實際應用

創建Pipeline

PUT _ingest/pipeline/add_timestamp
{

    "processors": [
      {
        "set": {
          "field": "timestamp",
          "value": "{{_ingest.timestamp}}"
        }
      },
      {
        "script": {
          "lang": "painless",
          "source": """ ZonedDateTime zdt = ZonedDateTime.parse(ctx.timestamp); DateTimeFormatter dtf = DateTimeFormatter.ofPattern( "yyyy-MM-dd HH:mm:ss"); String datetime = zdt.format(dtf); ctx.newdate = datetime; ctx.newdate2 =         System.currentTimeMillis()/1000; """
        }
      }
    ]
 
}	

查看創建Pipeline

GET _ingest/pipeline/add_timestamp

{
  "add_timestamp" : {
    "processors" : [
      {
        "set" : {
          "field" : "timestamp",
          "value" : "{{_ingest.timestamp}}"
        }
      },
      {
        "script" : {
          "lang" : "painless",
          "source" : """ ZonedDateTime zdt = ZonedDateTime.parse(ctx.timestamp); DateTimeFormatter dtf = DateTimeFormatter.ofPattern( "yyyy-MM-dd HH:mm:ss"); String datetime = zdt.format(dtf); ctx.newdate = datetime; ctx.newdate2 =         System.currentTimeMillis()/1000; """
        }
      }
    ]
  }
}

新增數據測試

PUT test_index_20210628/_doc/1?pipeline=add_timestamp
{
  "test":"測試數據"
}

查看新增數據

GET test_index_20210628/_doc/1	

{
  "_index" : "test_index_20210628",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 1,
  "_seq_no" : 0,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "newdate2" : 1624849340,
    "test" : "測試數據",
    "newdate" : "2021-06-28 03:02:20",
    "timestamp" : "2021-06-28T03:02:20.252887295Z"
  }
}

創建索引時直接設置Pipeline

我們也可以在創建索引時設置Pipeline,這時就不需要每次添加數據時指定Pipeline

# 創建索引指定pipeline
PUT test_index_20210628_02
{
  "settings": {
    "default_pipeline": "add_timestamp"
  }
}

# 添加測試數據
PUT test_index_20210628_02/_doc/1
{
  "test":"測試數據"
}

# 獲取數據
GET test_index_20210628_02/_doc/1

# 返回結果
{
  "_index" : "test_index_20210628_02",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 1,
  "_seq_no" : 0,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "newdate2" : 1624849478,
    "test" : "測試數據",
    "newdate" : "2021-06-28 03:04:38",
    "timestamp" : "2021-06-28T03:04:38.940542643Z"
  }
}

個人公眾號(大數據學習交流): hadoopwiki


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM