描述:ElasticSearch是一個基於Lucene的搜索服務器。它提供了一個分布式多用戶能力的全文搜索引擎,基於RESTful web接口。下面介紹了利用Python API接口進行數據查詢,方便其他系統的調用。
安裝API
pip install elasticsearch
建立es連接
from elasticsearch import Elasticsearch
es = Elasticsearch([{'host':'10.10.13.12','port':9200}])
數據檢索功能
es.search(index='logstash-2015.08.20', q='http_status_code:5* AND server_name:"web1"', from_='124119')
常用參數
- index - 索引名
- q - 查詢指定匹配 使用Lucene查詢語法
- from_ - 查詢起始點 默認0
- doc_type - 文檔類型
- size - 指定查詢條數 默認10
- field - 指定字段 逗號分隔
- sort - 排序 字段:asc/desc
- body - 使用Query DSL
- scroll - 滾動查詢
統計查詢功能
# 語法同search大致一樣,但只輸出統計值
In[52]: es.count(index='logstash-2015.08.21', q='http_status_code:500')
Out[52]:{u'_shards':{u'failed':0, u'successful':5, u'total':5}, u'count':17042}
知識擴展
- 滾動demo
# Initialize the scroll
page = es.search(
index ='yourIndex',
doc_type ='yourType',
scroll ='2m',
search_type ='scan',
size =1000,
body ={
# Your query's body
})
sid = page['_scroll_id']
scroll_size = page['hits']['total']
# Start scrolling
while(scroll_size >0):
print "Scrolling..."
page = es.scroll(scroll_id = sid, scroll ='2m')
# Update the scroll ID
sid = page['_scroll_id']
# Get the number of results that we returned in the last scroll
scroll_size = len(page['hits']['hits'])
print "scroll size: "+ str(scroll_size)
# Do something with the obtained page
以上demo實現了一次取若干數據,數據取完之后結束,不會獲取到最新更新的數據。我們滾動完之后想獲取最新數據怎么辦?滾動的時候會有一個統計值,如total: 5。跳出循環之后,我們可以用_from參數定位到5開始滾動之后的數據。
- Query DSL
range過濾器查詢范圍
gt: > 大於
lt: < 小於
gte: >= 大於或等於
lte: <= 小於或等於
"range":{
"money":{
"gt":20,
"lt":40
}
}
bool組合過濾器
must:所有分句都必須匹配,與 AND 相同。
must_not:所有分句都必須不匹配,與 NOT 相同。
should:至少有一個分句匹配,與 OR 相同。
{
"bool":{
"must":[],
"should":[],
"must_not":[],
}
}
term過濾器
- term單過濾
{
"terms":{
"money":20
}
}
- terms復數版本,允許多個匹配條件
{
"terms":{
"money": [20,30]
}
}
正則查詢
{
"regexp": {
"http_status_code": "5.*"
}
}
match查詢
- match 精確匹配
{
"match":{
"email":"123456@qq.com"
}
}
- multi_match 多字段搜索
{
"multi_match":{
"query":"11",
"fields":["Tr","Tq"]
}
}
demo
- 獲取最近一小時的數據
{'query':
{'filtered':
{'filter':
{'range':
{'@timestamp':{'gt':'now-1h'}}
}
}
}
}
- 條件過濾查詢
{
"query":{
"filtered":{
"query":{"match":{"http_status_code":500}},
"filter":{"term":{"server_name":"vip03"}}
}
}
}
- Terms Facet 單字段統計
{'facets':
{'stat':
{'terms':
{'field':'http_status_code',
'order':'count',
'size':50}
}
},
'size':0
}
- 一次統計多個字段
{'facets':
{'cip':
{'terms':
{'fields':['client_ip']}},
'status_facets':{'terms':{'fields':['http_status_code'],
'order':'term',
'size':50}}},
'query':{'query_string':{'query':'*'}},
'size':0
}
- 多個字段一起統計
{'facets':
{'tag':
{'terms':
{'fields':['http_status_code','client_ip'],
'size':10
}
}
},
'query':
{'match_all':{}},
'size':0
}
數據組裝
以下是kibana首頁的demo,用來統計一段時間內的日志數量
{
"facets": {
"0": {
"date_histogram": {
"field": "@timestamp",
"interval": "5m"
},
"facet_filter": {
"fquery": {
"query": {
"filtered": {
"query": {
"query_string": {
"query": "*"
}
},
"filter": {
"bool": {
"must": [
{
"range": {
"@timestamp": {
'gt': 'now-1h'
}
}
},
{
"exists": {
"field": "http_status_code.raw"
}
},
# --------------- -------
# 此處加匹配條件
]
}
}
}
}
}
}
}
},
"size": 0
}
如果想添加匹配條件,在以上代碼標識部分加上過濾條件,按照以下代碼格式即可
{
"query": {
"query_string": {"query": "backend_name:baidu.com"}
}
},
先介紹到這里,后續會有Query DSL API介紹。
