我們之前看見了在Elasticsearch里的ingest node里,我們可以通過以下processor的處理幫我們處理我們的一些數據。它們的功能是非常具體而明確的。那么在Elasticsearch里,有沒有一種更加靈活的方式可供我們來進行編程處理呢?如果有,它使用的語言是什么呢?
在Elasticsearc中,它使用了一個叫做Painless的語言。它是專門為Elasticsearch而建立的。Painless是一種簡單,安全的腳本語言,專為與Elasticsearch一起使用而設計。 它是Elasticsearch的默認腳本語言,可以安全地用於inline和stored腳本。它具有像Groovy那樣的語法。自Elasticsearch 6.0以后的版本不再支持Groovy,Javascript及Python語言。
如何使用腳本
腳本的語法為:
"script": {
"lang": "...",
"source" | "id": "...",
"params": { ... }
}
- 這里lang默認的值為"painless"。在實際的使用中可以不設置,除非有第二種語言供使用
- source可以為inline腳本,或者是一個id,那么這個id對應於一個stored腳本
- 任何有名字的參數,可以被用於腳本的輸入參數
Painless的簡單使用例子
inline 腳本
首先我們來創建一個簡單的文檔:
PUT twitter/_doc/1
{
"user" : "雙榆樹-張三",
"message" : "今兒天氣不錯啊,出去轉轉去",
"uid" : 2,
"age" : 20,
"city" : "北京",
"province" : "北京",
"country" : "中國",
"address" : "中國北京市海淀區",
"location" : {
"lat" : "39.970718",
"lon" : "116.325747"
}
}
在這個文檔里,我們現在想把age修改為30,那么一種辦法就是把所有的文檔內容都讀出來,讓修改其中的age想為30,再重新用同樣的方法寫進去。首先這里需要有幾個動作:先讀出數據,然后修改,再次寫入數據。顯然這樣比較麻煩。在這里我們可以直接使用Painless語言直接進行修改:
POST twitter/_update/1
{
"script": {
"source": "ctx._source.age = 30"
}
}
這里的source表明是我們的Painless代碼。這里我們只寫了很少的代碼在DSL之中。這種代碼稱之為inline。在這里我們直接通過ctx._source.age
來訪問 _souce
里的age。這樣我們通過編程的辦法直接對年齡進行了修改。運行的結果是:
{
"_index" : "twitter",
"_type" : "_doc",
"_id" : "1",
"_version" : 16,
"_seq_no" : 20,
"_primary_term" : 1,
"found" : true,
"_source" : {
"user" : "雙榆樹-張三",
"message" : "今兒天氣不錯啊,出去轉轉去",
"uid" : 2,
"age" : 30,
"city" : "北京",
"province" : "北京",
"country" : "中國",
"address" : "中國北京市海淀區",
"location" : {
"lat" : "39.970718",
"lon" : "116.325747"
}
}
}
顯然這個age已經改變為30。上面的方法固然好,但是每次執行scripts都是需要重新進行編譯的。編譯好的script可以cache並供以后使用。上面的script如果是改變年齡的話,需要重新進行編譯。一種更好的方法是改為這樣的:
POST twitter/_update/1
{
"script": {
"source": "ctx._source.age = params.value",
"params": {
"value": 34
}
}
}
這樣,我們的script的source是不用改變的,只需要編譯一次。下次調用的時候,只需要修改params里的參數即可。
在Elasticsearch里:
"script": {
"source": "ctx._source.num_of_views += 2"
}
和
"script": {
"source": "ctx._source.num_of_views += 3"
}
被視為兩個不同的腳本,需要分別進行編譯,所以最好的辦法是使用params來傳入參數。
存儲的腳本 (stored script)
在這種情況下,scripts可以被存放於一個集群的狀態中。它之后可以通過ID進行調用:
PUT _scripts/add_age
{
"script": {
"lang": "painless",
"source": "ctx._source.age += params.value"
}
}
在這里,我們定義了一個叫做add_age的script。它的作用就是幫我們把source里的age加上一個數值。我們可以在之后調用它:
POST twitter/_update/1
{
"script": {
"id": "add_age",
"params": {
"value": 2
}
}
}
通過上面的執行,我們可以看到,age將會被加上2。
訪問source里的字段
Painless中用於訪問字段值的語法取決於上下文。在Elasticsearch中,有許多不同的Plainless上下文。就像那個鏈接顯示的那樣,Plainless上下文包括:ingest processor, update, update by query, sort,filter等等。
Context 訪問字段
Ingest node: 訪問字段使用ctx ctx.field_name
Updates: 使用_source
字段 ctx._source.field_name
這里的updates包括_update
,_reindex
以及update_by_query。這里,我們對於context(上下文的理解)非常重要。它的意思是針對不同的API,在使用中ctx所包含的字段是不一樣的。在下面的例子中,我們針對一些情況來做具體的分析。
Painless腳本例子
首先我們創建一個叫做add_field_c的pipeline。關於如何創建一個pipleline,大家可以參考我之前寫過的一個文章“如何在Elasticsearch中使用pipeline API來對事件進行處理”。
例子1
PUT _ingest/pipeline/add_field_c
{
"processors": [
{
"script": {
"lang": "painless",
"source": "ctx.field_c = (ctx.field_a + ctx.field_b) * params.value",
"params": {
"value": 2
}
}
}
]
}
這個pipepline的作用是創建一個新的field:field_c。它的結果是field_a及field_b的和,並乘以2。那么我們創建一個如下的文檔:
PUT test_script/_doc/1?pipeline=add_field_c
{
"field_a": 10,
"field_b": 20
}
在這里,我們使用了pipleline add_field_c。執行后的結果是:
{
"took" : 147,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "test_script",
"_type" : "_doc",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"field_c" : 60,
"field_a" : 10,
"field_b" : 20
}
}
]
}
}
顯然,我們可以看到field_c被成功創建了。
例子2
在ingest過程中,可以使用腳本處理器來處理metadata,如_index和_type。 下面是一個Ingest Pipeline的示例,無論原始索引請求中提供了什么,它都會將索引和類型重命名為my_index:
PUT _ingest/pipeline/my_index
{
"description": "use index:my_index and type:_doc",
"processors": [
{
"script": {
"source": """
ctx._index = 'my_index';
ctx._type = '_doc';
"""
}
}
]
}
使用上面的pipeline,我們可以嘗試index一個文檔到any_index:
PUT any_index/_doc/1?pipeline=my_index
{
"message": "text"
}
顯示的結果是:
{
"_index": "my_index",
"_type": "_doc",
"_id": "1",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 89,
"_primary_term": 1,
}
也就是說真正的文檔時存到my_index之中,而不是any_index。
例子3
PUT _ingest/pipeline/blogs_pipeline
{
"processors": [
{
"script": {
"source": """
if (ctx.category == "") {
ctx.category = "None"
}
"""
}
}
]
}
我們上面定義了一個pipeline,它可以幫我們檢查如果 category字段是否為空,如果是,就修改為“None”。還是以之前的那個test_script索引為例:
PUT test_script/_doc/2?pipeline=blogs_pipeline
{
"field_a": 5,
"field_b": 10,
"category": ""
}
GET test_script/_doc/2
顯示的結果是:
{
"_index" : "test_script",
"_type" : "_doc",
"_id" : "2",
"_version" : 2,
"_seq_no" : 6,
"_primary_term" : 1,
"found" : true,
"_source" : {
"field_a" : 5,
"field_b" : 10,
"category" : "None"
}
}
顯然,它把category為“”的字段變為“None”了。
例子4
POST _reindex
{
"source": {
"index": "blogs"
},
"dest": {
"index": "blogs_fixed"
},
"script": {
"source": """
if (ctx._source.category == "") {
ctx._source.category = "None"
}
"""
}
}
上面的這個例子在reindex時,如果category為空時,寫入“None”。我們可以從上面的兩個例子中看出來,針對pipeline,我們可以直接對cxt.field進行操作,而針對update來說,我們可以對cxt._source下的字段進行操作。這也是之前提到的上下文的區別。
例子5
PUT test/_doc/1
{
"counter" : 1,
"tags" : ["red"]
}
您可以使用和update腳本將tag添加到tags列表(這只是一個列表,因此即使存在標記也會添加):
POST test/_update/1
{
"script" : {
"source": "ctx._source.tags.add(params.tag)",
"lang": "painless",
"params" : {
"tag" : "blue"
}
}
}
顯示結果:
GET test/_doc/1
{
"_index" : "test",
"_type" : "_doc",
"_id" : "1",
"_version" : 4,
"_seq_no" : 3,
"_primary_term" : 11,
"found" : true,
"_source" : {
"counter" : 1,
"tags" : [
"red",
"blue"
]
}
}
顯示“blue”,已經被成功加入到tags列表之中了。
您還可以從tags列表中刪除tag。 刪除tag的Painless函數采用要刪除的元素的數組索引。 為避免可能的運行時錯誤,首先需要確保tag存在。 如果列表包含tag的重復項,則此腳本只刪除一個匹配項。
POST test/_update/1
{
"script": {
"source": "if (ctx._source.tags.contains(params.tag)) { ctx._source.tags.remove(ctx._source.tags.indexOf(params.tag)) }",
"lang": "painless",
"params": {
"tag": "blue"
}
}
}
GET test/_doc/1
顯示結果:
{
"_index" : "test",
"_type" : "_doc",
"_id" : "1",
"_version" : 5,
"_seq_no" : 4,
"_primary_term" : 11,
"found" : true,
"_source" : {
"counter" : 1,
"tags" : [
"red"
]
}
}
“blue”顯然已經被刪除了。
Painless腳本簡單的操練
為了說明Painless的工作原理,讓我們將一些曲棍球統計數據加載到Elasticsearch索引中:
PUT hockey/_bulk?refresh
{"index":{"_id":1}}
{"first":"johnny","last":"gaudreau","goals":[9,27,1],"assists":[17,46,0],"gp":[26,82,1],"born":"1993/08/13"}
{"index":{"_id":2}}
{"first":"sean","last":"monohan","goals":[7,54,26],"assists":[11,26,13],"gp":[26,82,82],"born":"1994/10/12"}
{"index":{"_id":3}}
{"first":"jiri","last":"hudler","goals":[5,34,36],"assists":[11,62,42],"gp":[24,80,79],"born":"1984/01/04"}
{"index":{"_id":4}}
{"first":"micheal","last":"frolik","goals":[4,6,15],"assists":[8,23,15],"gp":[26,82,82],"born":"1988/02/17"}
{"index":{"_id":5}}
{"first":"sam","last":"bennett","goals":[5,0,0],"assists":[8,1,0],"gp":[26,1,0],"born":"1996/06/20"}
{"index":{"_id":6}}
{"first":"dennis","last":"wideman","goals":[0,26,15],"assists":[11,30,24],"gp":[26,81,82],"born":"1983/03/20"}
{"index":{"_id":7}}
{"first":"david","last":"jones","goals":[7,19,5],"assists":[3,17,4],"gp":[26,45,34],"born":"1984/08/10"}
{"index":{"_id":8}}
{"first":"tj","last":"brodie","goals":[2,14,7],"assists":[8,42,30],"gp":[26,82,82],"born":"1990/06/07"}
{"index":{"_id":39}}
{"first":"mark","last":"giordano","goals":[6,30,15],"assists":[3,30,24],"gp":[26,60,63],"born":"1983/10/03"}
{"index":{"_id":10}}
{"first":"mikael","last":"backlund","goals":[3,15,13],"assists":[6,24,18],"gp":[26,82,82],"born":"1989/03/17"}
{"index":{"_id":11}}
{"first":"joe","last":"colborne","goals":[3,18,13],"assists":[6,20,24],"gp":[26,67,82],"born":"1990/01/30"}
使用Painless訪問Doc里的值
文檔里的值可以通過一個叫做doc的Map值來訪問。例如,以下腳本計算玩家的總進球數。 此示例使用類型int和for循環。
GET hockey/_search
{
"query": {
"function_score": {
"script_score": {
"script": {
"lang": "painless",
"source": """
int total = 0;
for (int i = 0; i < doc['goals'].length; ++i) {
total += doc['goals'][i];
}
return total;
"""
}
}
}
}
}
這里我們通過script來計算每個文檔的_score。通過script把每個運動員的goal都加起來,並形成最終的_score。這里我們通過doc['goals']這個Map類型來訪問我們的字段值。顯示的結果為:
{
"took" : 25,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 11,
"relation" : "eq"
},
"max_score" : 87.0,
"hits" : [
{
"_index" : "hockey",
"_type" : "_doc",
"_id" : "2",
"_score" : 87.0,
"_source" : {
"first" : "sean",
"last" : "monohan",
"goals" : [
7,
54,
26
],
"assists" : [
11,
26,
13
],
"gp" : [
26,
82,
82
],
"born" : "1994/10/12"
}
},
...
或者,您可以使用script_fields而不是function_score執行相同的操作:
GET hockey/_search
{
"query": {
"match_all": {}
},
"script_fields": {
"total_goals": {
"script": {
"lang": "painless",
"source": """
int total = 0;
for (int i = 0; i < doc['goals'].length; ++i) {
total += doc['goals'][i];
}
return total;
"""
}
}
}
}
顯示的結果為:
{
"took" : 5,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 11,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "hockey",
"_type" : "_doc",
"_id" : "1",
"_score" : 1.0,
"fields" : {
"total_goals" : [
37
]
}
},
{
"_index" : "hockey",
"_type" : "_doc",
"_id" : "2",
"_score" : 1.0,
"fields" : {
"total_goals" : [
87
]
}
},
...
以下示例使用Painless腳本按其組合的名字和姓氏對玩家進行排序。 使用doc ['first']。value和doc ['last']。value訪問名稱。
GET hockey/_search
{
"query": {
"match_all": {}
},
"sort": {
"_script": {
"type": "string",
"order": "asc",
"script": {
"lang": "painless",
"source": "doc['first.keyword'].value + ' ' + doc['last.keyword'].value"
}
}
}
}
檢查缺失項
doc ['field'].value。如果文檔中缺少該字段,則拋出異常。
要檢查文檔是否缺少值,可以調用doc ['field'] .size()== 0
。
使用Painless更新字段
您還可以輕松更新字段。 您可以使用ctx._source.<field-name>
訪問字段的原始源。
首先,讓我們通過提交以下請求來查看玩家的源數據:
GET hockey/_search
{
"stored_fields": [
"_id",
"_source"
],
"query": {
"term": {
"_id": 1
}
}
}
顯示的結果為:
{
"took" : 0,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "hockey",
"_type" : "_doc",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"first" : "johnny",
"last" : "gaudreau",
"goals" : [
9,
27,
1
],
"assists" : [
17,
46,
0
],
"gp" : [
26,
82,
1
],
"born" : "1993/08/13"
}
}
]
}
}
要將玩家1的姓氏更改為hockey,只需將ctx._source.last設置為新值:
POST hockey/_update/1
{
"script": {
"lang": "painless",
"source": "ctx._source.last = params.last",
"params": {
"last": "hockey"
}
}
}
您還可以向文檔添加字段。 例如,此腳本添加一個包含玩家nickname為hockey的新字段。
POST hockey/_update/1
{
"script": {
"lang": "painless",
"source": """
ctx._source.last = params.last;
ctx._source.nick = params.nick
""",
"params": {
"last": "gaudreau",
"nick": "hockey"
}
}
}
顯示的結果為:
GET hockey/_doc/1
{
"_index" : "hockey",
"_type" : "_doc",
"_id" : "1",
"_version" : 2,
"_seq_no" : 11,
"_primary_term" : 1,
"found" : true,
"_source" : {
"first" : "johnny",
"last" : "gaudreau",
"goals" : [
9,
27,
1
],
"assists" : [
17,
46,
0
],
"gp" : [
26,
82,
1
],
"born" : "1993/08/13",
"nick" : "hockey"
}
}
有一個叫做 “nick”的新字段被加入了。
我們甚至可以對日期類型來進行操作從而得到年月等信息:
GET hockey/_search
{
"script_fields": {
"birth_year": {
"script": {
"source": "doc.born.value.year"
}
}
}
}
顯示結果:
{
"took" : 0,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 11,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "hockey",
"_type" : "_doc",
"_id" : "2",
"_score" : 1.0,
"fields" : {
"birth_year" : [
1994
]
}
},
...
Script Caching
Elasticsearch第一次看到一個新腳本,它會編譯它並將編譯后的版本存儲在緩存中。無論是inline或是stored腳本都存儲在緩存中。新腳本可以驅逐緩存的腳本。默認的情況下是可以存儲100個腳本。我們可以通過設置script.cache.max_size來改變其大小,或者通過script.cache.expire來設置過期的時間。這些設置需要在config/elasticsearch.yml里設置。
Script 調試
不能調試的腳本是非常難的。有一個好的調試手段無疑對我們的腳本編程是非常有用的。
Debug.explain
Painless沒有REPL,雖然有一天它很好,但它不會告訴你關於調試Elasticsearch中嵌入的Painless腳本的全部故事,因為腳本可以訪問的數據或“上下文” 是如此重要。 目前,調試嵌入式腳本的最佳方法是在選擇位置拋出異常。 雖然您可以拋出自己的異常(throw new exception('whatever'),但Painless的沙箱會阻止您訪問有用的信息,如對象的類型。 所以Painless有一個實用工具方法Debug.explain,它會為你拋出異常。 例如,您可以使用_explain來探索script query可用的上下文。
PUT /hockey/_doc/1?refresh
{"first":"johnny","last":"gaudreau","goals":[9,27,1],"assists":[17,46,0],"gp":[26,82,1]}
POST /hockey/_explain/1
{
"query": {
"script": {
"script": "Debug.explain(doc.goals)"
}
}
}
這表明doc.goals類是org.elasticsearch.index.fielddata.ScriptDocValues.Longs通過響應:
{
"error": {
"root_cause": [
{
"type": "script_exception",
"reason": "runtime error",
"painless_class": "org.elasticsearch.index.fielddata.ScriptDocValues.Longs",
"to_string": "[1, 9, 27]",
"java_class": "org.elasticsearch.index.fielddata.ScriptDocValues$Longs",
"script_stack": [
"Debug.explain(doc.goals)",
" ^---- HERE"
],
"script": "Debug.explain(doc.goals)",
"lang": "painless"
}
],
"type": "script_exception",
"reason": "runtime error",
"painless_class": "org.elasticsearch.index.fielddata.ScriptDocValues.Longs",
"to_string": "[1, 9, 27]",
"java_class": "org.elasticsearch.index.fielddata.ScriptDocValues$Longs",
"script_stack": [
"Debug.explain(doc.goals)",
" ^---- HERE"
],
"script": "Debug.explain(doc.goals)",
"lang": "painless",
"caused_by": {
"type": "painless_explain_error",
"reason": null
}
},
"status": 400
}
您可以使用相同的技巧來查看_source是_update API中的LinkedHashMap:
POST /hockey/_update/1
{
"script": "Debug.explain(ctx._source)"
}
顯示的結果是:
{
"error": {
"root_cause": [
{
"type": "remote_transport_exception",
"reason": "[localhost][127.0.0.1:9300][indices:data/write/update[s]]"
}
],
"type": "illegal_argument_exception",
"reason": "failed to execute script",
"caused_by": {
"type": "script_exception",
"reason": "runtime error",
"painless_class": "java.util.LinkedHashMap",
"to_string": "{first=johnny, last=gaudreau, goals=[9, 27, 1], assists=[17, 46, 0], gp=[26, 82, 1], born=1993/08/13, nick=hockey}",
"java_class": "java.util.LinkedHashMap",
"script_stack": [
"Debug.explain(ctx._source)",
" ^---- HERE"
],
"script": "Debug.explain(ctx._source)",
"lang": "painless",
"caused_by": {
"type": "painless_explain_error",
"reason": null
}
}
},
"status": 400
}
參考:
【1】https://www.elastic.co/guide/en/elasticsearch/painless/current/painless-walkthrough.html
【2】https://www.elastic.co/guide/en/elasticsearch/painless/current/painless-debugging.html