ElasticSearch Pipeline 为新增数据设置更新时间


模拟测试

测试

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "set": {
          "field": "timestamp",
          "value": "{{_ingest.timestamp}}"
        }
      },
      {
        "script": {
          "lang": "painless",
          "source": """ ZonedDateTime zdt = ZonedDateTime.parse(ctx.timestamp); DateTimeFormatter dtf = DateTimeFormatter.ofPattern( "yyyy-MM-dd HH:mm:ss"); String datetime = zdt.format(dtf); ctx.newdate = datetime; ctx.newdate2 =         System.currentTimeMillis()/1000; """
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": "测试"
      }
    }
  ]
}

我们使用_ingest.timestamp 与painless 多种方式设置了数据最新更新时间

返回结果

newdate2 为数据更新时间秒,newdate为格式转换后的数据,timestamp 为 _ingest.timestamp 获取到的时间

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "newdate2" : 1624848304,
          "message" : "测试",
          "newdate" : "2021-06-28 02:45:04",
          "timestamp" : "2021-06-28T02:45:04.759053131Z"
        },
        "_ingest" : {
          "timestamp" : "2021-06-28T02:45:04.759053131Z"
        }
      }
    }
  ]
}

实际应用

创建Pipeline

PUT _ingest/pipeline/add_timestamp
{

    "processors": [
      {
        "set": {
          "field": "timestamp",
          "value": "{{_ingest.timestamp}}"
        }
      },
      {
        "script": {
          "lang": "painless",
          "source": """ ZonedDateTime zdt = ZonedDateTime.parse(ctx.timestamp); DateTimeFormatter dtf = DateTimeFormatter.ofPattern( "yyyy-MM-dd HH:mm:ss"); String datetime = zdt.format(dtf); ctx.newdate = datetime; ctx.newdate2 =         System.currentTimeMillis()/1000; """
        }
      }
    ]
 
}	

查看创建Pipeline

GET _ingest/pipeline/add_timestamp

{
  "add_timestamp" : {
    "processors" : [
      {
        "set" : {
          "field" : "timestamp",
          "value" : "{{_ingest.timestamp}}"
        }
      },
      {
        "script" : {
          "lang" : "painless",
          "source" : """ ZonedDateTime zdt = ZonedDateTime.parse(ctx.timestamp); DateTimeFormatter dtf = DateTimeFormatter.ofPattern( "yyyy-MM-dd HH:mm:ss"); String datetime = zdt.format(dtf); ctx.newdate = datetime; ctx.newdate2 =         System.currentTimeMillis()/1000; """
        }
      }
    ]
  }
}

新增数据测试

PUT test_index_20210628/_doc/1?pipeline=add_timestamp
{
  "test":"测试数据"
}

查看新增数据

GET test_index_20210628/_doc/1	

{
  "_index" : "test_index_20210628",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 1,
  "_seq_no" : 0,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "newdate2" : 1624849340,
    "test" : "测试数据",
    "newdate" : "2021-06-28 03:02:20",
    "timestamp" : "2021-06-28T03:02:20.252887295Z"
  }
}

创建索引时直接设置Pipeline

我们也可以在创建索引时设置Pipeline,这时就不需要每次添加数据时指定Pipeline

# 创建索引指定pipeline
PUT test_index_20210628_02
{
  "settings": {
    "default_pipeline": "add_timestamp"
  }
}

# 添加测试数据
PUT test_index_20210628_02/_doc/1
{
  "test":"测试数据"
}

# 获取数据
GET test_index_20210628_02/_doc/1

# 返回结果
{
  "_index" : "test_index_20210628_02",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 1,
  "_seq_no" : 0,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "newdate2" : 1624849478,
    "test" : "测试数据",
    "newdate" : "2021-06-28 03:04:38",
    "timestamp" : "2021-06-28T03:04:38.940542643Z"
  }
}

个人公众号(大数据学习交流): hadoopwiki


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM