Python Elasticsearch DSL 使用簡介
連接 Es:
import elasticsearch
clinet = elasticsearch.Elasticsearch([{"host": "10.44.99.102", "port": 9200}])
or
clinet = Elasticsearch(["10.44.99.102:9200"])
先看一下搜索,q 是指搜索內容,空格對 q 查詢結果沒有影響,size 指定個數,from_ 指定起始位置,filter_path 可以指定需要顯示的數據,如本例中顯示在最后的結果中的只有 _id 和 _type。
res_3 = clinet .search(index="bank", q="Holmes", size=1, from_=1)
res_4 = clinet .search(index="bank", q=" 39225 5686 ", size=1000, filter_path=["hits.hits._id", "hits.hits._type"])
查詢指定索引的所有數據:
其中,index 指定索引,字符串表示一個索引;列表表示多個索引,如 index=["bank", "banner", "country"];正則形式表示符合條件的多個索引,如 index=["apple*"],表示以 apple 開頭的全部索引。
search 中同樣可以指定具體 doc-type。
from elasticsearch_dsl import Search
s = Search(using=clinet,index="situation-event").execute()
logging.warning(s.to_dict())
忽略不可用的
from elasticsearch_dsl import Search
s = Search(using=clinet,index="situation-event")
s = s.params(ignore_unavailable=True)
根據某個字段查詢,可以多個查詢條件疊加:
s = Search(using=clinet, index="situation-event").query("match", event_type="002")
s.query("match", event_title="aaa")
logging.warning(s.execute().to_dict())
多字段查詢:
from elasticsearch_dsl.query import MultiMatch
multi_match = MultiMatch(query="aaa",fields=["event_type","event_title"])
s = Search(using=clinet,index="situation-event").query(multi_match)
s = s.execute()
logging.warning(s.to_dict())
還可以用 Q() 對象進行多字段查詢,fields 是一個列表,query 為所要查詢的值。
q = Q("multi_match",query="aaa",fields=["event_type","event_title"])
s = Search(using=clinet, index="situation-event").query(q)
s = s.execute()
logging.warning(s.to_dict())
Q() 第一個參數是查詢方法,還可以是 bool。
q = Q("bool",must=[Q("match",event_type="002"),Q("match",event_title="aaa")])
s = Search(using=clinet,index="situation-event").query(q)
s = s.execute()
logging.warning(s.to_dict())
通過 Q() 進行組合查詢,相當於上面查詢的另一種寫法。
q = Q("match", event_type="002") | Q("match", event_type="003")
s = Search(using=clinet,index="situation-event").query(q).execute()
logging.warning(s.to_dict())
# {"query": {"bool": {"should": [{"match": {"event_type": "002"}}, {"match": {"event_type": "003"}}]}}}
q = Q("match", event_type="002") & Q("match", event_type="003")
s = Search(using=clinet,index="situation-event").query(q)
logging.warning(s.to_dict())
# {"query": {"bool": {"must": [{"match": {"event_type": "002"}}, {"match": {"event_type": "003"}}]}}}
q = ~Q("match", event_type="002")
s = Search(using=clinet,index="situation-event").query(q).execute()
logging.warning(s.to_dict())
# {"query": {"bool": {"must_not": [{"match": {"event_type": "002"}}]}}}
過濾,在此為范圍過濾,range 是方法,timestamp 是所要查詢的 field 名字,gte 為大於等於,lt 為小於,根據需要設定即可。
關於 term 和 match 的區別,term 是精確匹配,match 會模糊化,會進行分詞,返回匹配度分數,(term 如果查詢小寫字母的字符串,有大寫會返回空即沒有命中,match 則是不區分大小寫都可以進行查詢,返回結果也一樣)
# 范圍查詢
s = Search(using=clinet, index="situation-event").filter("range", update_time={"gte": 0, "lt": time.time()}) \
.query("match", event_type="003")
logging.warning(s.to_dict())
#
# 普通查詢
s = Search(using=clinet, index="situation-event").filter("terms", event_type=["002", "003"]).execute()
logging.warning(s.to_dict())
# {"query": {"bool": {"filter": [{"terms": {"event_type": ["002", "003"]}}]}}}
其他寫法
s = Search(using=clinet, index="situation-event").filter("terms", event_type=["002", "003"])
logging.warning(s.to_dict())
# {"query": {"bool": {"filter": [{"terms": {"event_type": ["002", "003"]}}]}}}
s = Search(using=clinet, index="situation-event").query("bool", filter=[Q("terms", event_type=["002", "003"])])
logging.warning(s.to_dict())
# {"query": {"bool": {"filter": [{"terms": {"event_type": ["002", "003"]}}]}}}
s = Search(using=clinet, index="situation-event").query("bool", filter=[~Q("terms", event_type=["002", "003"])])
logging.warning(s.to_dict())
# {"query": {"bool": {"filter": [{"bool": {"must_not": [{"terms": {"event_type": ["002", "003"]}}]}}]}}}
聚合可以放在查詢,過濾等操作的后面疊加,需要加 aggs。
bucket 即為分組,其中第一個參數是分組的名字,自己指定即可,第二個參數是方法,第三個是指定的 field。
metric 也是同樣,metric 的方法有 sum、avg、max、min 等,但是需要指出的是,有兩個方法可以一次性返回這些值,stats 和 extended_stats,后者還可以返回方差等值。
# 單層聚合
s = Search(using=clinet, index="situation-event")
s.aggs.bucket("per_one","terms",field="event_type")
res = s.execute()
logging.warning(res.to_dict())
# {"query": {"match_all": {}}, "aggs": {"per_one": {"terms": {"field": "event_type"}}}}
#雙層聚合
s = Search(using=clinet, index="situation-event")
s.aggs.bucket("per_one", "terms", field="event_type").bucket("per_two", "terms", field="event_level")
res = s.execute()
logging.warning(res.to_dict())
# {"query": {"match_all": {}}, "aggs": {"per_one": {"terms": {"field": "event_type"}, "aggs": {"per_two": {"terms": {"field": "event_level"}}}}}}
# 單層聚合
s = Search(using=clinet,index="situation-event")
s.aggs.metric("sum_system_id","stats",field="system_id")
res = s.execute()
logging.warning(res.to_dict())
# {"query": {"match_all": {}}, "aggs": {"sum_system_id": {"stats": {"field": "system_id"}}}}
# 雙層聚合
s = Search(using=clinet,index="situation-event")
s.aggs.bucket("pre_one","terms",field="system_id").metric("sum_system_id","stats",field="system_id")
res = s.execute()
logging.warning(res.to_dict())
# {"query": {"match_all": {}}, "aggs": {"pre_one": {"terms": {"field": "system_id"}, "aggs": {"sum_system_id": {"stats": {"field": "system_id"}}}}}}
最后依然要執行 execute(),此處需要注意,s.aggs 操作不能用變量接收(如 res=s.aggs,這個操作是錯誤的),聚合的結果會保存到 res 中顯示。
排序
s = Search().sort(
"category",
"-title",
{"lines" : {"order" : "asc", "mode" : "avg"}}
)
分頁
s = s[10:20]
# {"from": 10, "size": 10}
一些擴展方法,感興趣的同學可以看看:
s = Search()
# 設置擴展屬性使用`.extra()`方法
s = s.extra(explain=True)
# 設置參數使用`.params()`
s = s.params(search_type="count")
# 如要要限制返回字段,可以使用`source()`方法
# only return the selected fields
s = s.source(["title", "body"])
# don"t return any fields, just the metadata
s = s.source(False)
# explicitly include/exclude fields
s = s.source(include=["title"], exclude=["user.*"])
# reset the field selection
s = s.source(None)
# 使用dict序列化一個查詢
s = Search.from_dict({"query": {"match": {"title": "python"}}})
# 修改已經存在的查詢
s.update_from_dict({"query": {"match": {"title": "python"}}, "size": 42})