模擬測試
測試
POST _ingest/pipeline/_simulate
{
"pipeline": {
"processors": [
{
"set": {
"field": "timestamp",
"value": "{{_ingest.timestamp}}"
}
},
{
"script": {
"lang": "painless",
"source": """ ZonedDateTime zdt = ZonedDateTime.parse(ctx.timestamp); DateTimeFormatter dtf = DateTimeFormatter.ofPattern( "yyyy-MM-dd HH:mm:ss"); String datetime = zdt.format(dtf); ctx.newdate = datetime; ctx.newdate2 = System.currentTimeMillis()/1000; """
}
}
]
},
"docs": [
{
"_source": {
"message": "測試"
}
}
]
}
我們使用_ingest.timestamp 與painless 多種方式設置了數據最新更新時間
返回結果
newdate2 為數據更新時間秒,newdate為格式轉換后的數據,timestamp 為 _ingest.timestamp 獲取到的時間
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_doc",
"_id" : "_id",
"_source" : {
"newdate2" : 1624848304,
"message" : "測試",
"newdate" : "2021-06-28 02:45:04",
"timestamp" : "2021-06-28T02:45:04.759053131Z"
},
"_ingest" : {
"timestamp" : "2021-06-28T02:45:04.759053131Z"
}
}
}
]
}
實際應用
創建Pipeline
PUT _ingest/pipeline/add_timestamp
{
"processors": [
{
"set": {
"field": "timestamp",
"value": "{{_ingest.timestamp}}"
}
},
{
"script": {
"lang": "painless",
"source": """ ZonedDateTime zdt = ZonedDateTime.parse(ctx.timestamp); DateTimeFormatter dtf = DateTimeFormatter.ofPattern( "yyyy-MM-dd HH:mm:ss"); String datetime = zdt.format(dtf); ctx.newdate = datetime; ctx.newdate2 = System.currentTimeMillis()/1000; """
}
}
]
}
查看創建Pipeline
GET _ingest/pipeline/add_timestamp
{
"add_timestamp" : {
"processors" : [
{
"set" : {
"field" : "timestamp",
"value" : "{{_ingest.timestamp}}"
}
},
{
"script" : {
"lang" : "painless",
"source" : """ ZonedDateTime zdt = ZonedDateTime.parse(ctx.timestamp); DateTimeFormatter dtf = DateTimeFormatter.ofPattern( "yyyy-MM-dd HH:mm:ss"); String datetime = zdt.format(dtf); ctx.newdate = datetime; ctx.newdate2 = System.currentTimeMillis()/1000; """
}
}
]
}
}
新增數據測試
PUT test_index_20210628/_doc/1?pipeline=add_timestamp
{
"test":"測試數據"
}
查看新增數據
GET test_index_20210628/_doc/1
{
"_index" : "test_index_20210628",
"_type" : "_doc",
"_id" : "1",
"_version" : 1,
"_seq_no" : 0,
"_primary_term" : 1,
"found" : true,
"_source" : {
"newdate2" : 1624849340,
"test" : "測試數據",
"newdate" : "2021-06-28 03:02:20",
"timestamp" : "2021-06-28T03:02:20.252887295Z"
}
}
創建索引時直接設置Pipeline
我們也可以在創建索引時設置Pipeline,這時就不需要每次添加數據時指定Pipeline
# 創建索引指定pipeline
PUT test_index_20210628_02
{
"settings": {
"default_pipeline": "add_timestamp"
}
}
# 添加測試數據
PUT test_index_20210628_02/_doc/1
{
"test":"測試數據"
}
# 獲取數據
GET test_index_20210628_02/_doc/1
# 返回結果
{
"_index" : "test_index_20210628_02",
"_type" : "_doc",
"_id" : "1",
"_version" : 1,
"_seq_no" : 0,
"_primary_term" : 1,
"found" : true,
"_source" : {
"newdate2" : 1624849478,
"test" : "測試數據",
"newdate" : "2021-06-28 03:04:38",
"timestamp" : "2021-06-28T03:04:38.940542643Z"
}
}
個人公眾號(大數據學習交流): hadoopwiki