elasticsearch的python增刪查改實例分析


Reference:  http://bigg.top/2015/11/29/elasticsearch%E7%9A%84python%E5%A2%9E%E5%88%A0%E6%9F%A5%E6%94%B9%E5%AE%9E%E4%BE%8B%E5%88%86%E6%9E%90/

 


  • ES的部署請查看相關文檔,我這里就不在贅敘。提醒,官方建議ES的在60G以上內存的環境下運行,如果你的服務器的內存是16G,建議至少需要4台機器。
  • ES連接到服務器比較容易,如下:
import elasticsearch

class ES(object):
@classmethod
def connect_host(cls):
hosts=[{"host": "xx.xxx.x.xx"},
{"host": "xx.xxx.x.xx"},
{"host": "xx.xxx.x.xx"},
{"host": "xx.xxx.x.xx"},]
es = elasticsearch.Elasticsearch(
hosts,
sniff_on_start=True,
sniff_on_connection_fail=True,
sniffer_timeout=600
)
return es

查詢操作

  • 通過對RESTAPI的改造,可以很容易實現查詢功能。如下,實現了對一個domain相關doc的查詢,篩選條件包括起止時間,數據排列順序和限制查詢數據的個數。
def es_query(domain="", start=None, end=None, reverse=False, limit_cnt=20, category=0):
es = ES.connect_host()
now = datetime.datetime.now()
if reverse:
order = "desc"
else:
order = "asc"
if not start:
start = now - datetime.timedelta(weeks=2000)
if not end:
end = now
range_body = {
"range": {
"time": {
"gte": start,
"lte": end
}
}
}
and_list = [range_body]
domain_body = {
"term": {
"domain": domain
}
}
category_body = {
"term": {
"category": category
}
}
if domain:
and_list.append(domain_body)
if category:
and_list.append(category_body)
q_body = {
"size": limit_cnt,
"sort": [
{
"time": {
"order": order
}
}
],
"query": {
"filtered": {
"query": {"matchAll": {}},
"filter": {
"and": and_list
}
}
}
}
res = es.search(body=q_body)
ret = []
for hit in res["hits"]["hits"]:
value = {}
src = hit["_source"]
if src:
try:
the_time = src["time"]
if len(the_time) < 20:
value["time"] = datetime.datetime.strptime(the_time, "%Y-%m-%dT%H:%M:%S")
else:
value["time"] = datetime.datetime.strptime(the_time, "%Y-%m-%dT%H:%M:%S.%f")
ret.append(value)
except Exception as e:
print str(e)
ret = []
print "Query xxxxx data failed!"
return ret
  • 其中,reverse表示數據排列的順序,linit_cnt表示限制數量。其中涉及range,sort,size,filter,and等來執行es.search操作。最后一個for循環是一個取數據的過程。
  • 在實際應用過程中,對於一個復雜的查詢,第一次操作失敗率很高,如果查詢結果有幾千個,第一次的query查詢到的success個數通常只有1/3左右。當然,當你用該查詢條件再次查詢時,可以瞬間得到完全成功的結果,所以在你對查詢成功個數要求比較高的情況下,建議多次發起請求,這樣可以得到比較完整的結果。

刪除操作

  • ES的查詢分為按index刪除和按doc刪除。按index查詢相對比較容易理解,即刪除該索引下的所有數據,刪除之后該索引就不存在了。但是有時我們會碰到一些按照doc的情況,即按照一定的query條件查詢到相關的doc,然后刪除相關的所有記錄。ES官方不推薦進行這種操作,而且還有一定的失敗率。如果一定需要這方面的功能,證明你的數據不適合用ES進行存儲。
  • 由於我當時對ES的認識不夠,把大量的數據存儲在了ES,因此對doc的刪除操作需求比較大,寫了一個刪除操作功能(僅供參考,不建議使用,如果需要刪除,建議存儲數據之前設計好數據結構,方便以index為單位刪除)
def es_delete(domain, m_type="xxxx"):
m_data = {
"query": {
"query_string": {
"query": "domain: %s AND type: %s" % (domain, m_type)
}
}
}
data = json.dumps(m_data)
request = urllib2.Request(QUERY_URI, data)
request.get_method = lambda: "DELETE"
urllib2.urlopen(request)
print "Deleted the data!"

更新操作

  • ES不適合對大量的數據(doc)進行修改,與刪除一樣,這是官方極度不推薦的。當然,按照一定的查詢條件更新某些doc也是可以實現的。如果你和我一樣,遇到了比較極端的情形或是一個強迫症患者。請組合以上兩個操作,寫一個比較復雜的query執行刪除操作,然后把新的數據(doc)插入到對應的索引和類型中。

插入操作

  • 插入操作是ES的最基本操作,ES提供了最基本的插入功能,ES入庫時需要批量的插入操作。舉個簡單的插入操作例子:
es = ES.connect_host()
es.index(index=data_index, doc_type="xxxx", body=data, request_timeout=10000)
  • 其中,index表示索引,doc_type表示數據類型,body表示具體的doc數據,最后一個參數表示超時時間。如果是日志文件或其它記錄內容,建議index設置為時間或時間的組合體,如log_2015_11_29。數據類型即當前索引下數據的分類名稱,可以把當前的數據按照不同的類型分類,同時也方便了查詢,查詢時可以很方便的過濾需要的類型。

相關參考

之前時間比較閑,翻譯了部分與Python相關的ElasticSearch文檔,如有疑問,歡迎回復評論,相互討論學習。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM