import os import json from datetime import datetime from elasticsearch import Elasticsearch, RequestsHttpConnection from elasticsearch import Transport from elasticsearch.exceptions import NotFoundError class ES(object): _index = "" _type = "" def __init__(self, hosts): # 基於requests實例化es連接池 self.conn_pool = Transport(hosts=hosts, connection_class=RequestsHttpConnection).connection_pool def get_conn(self): """ 從連接池獲取一個連接 """ conn = self.conn_pool.get_connection() return conn def request(self, method, url, headers=None, params=None, body=None): """ 想es服務器發送一個求情 @method 請求方式 @url 請求的絕對url 不包括域名 @headers 請求頭信息 @params 請求的參數:dict @body 請求體:json對象(headers默認Content-Type為application/json) # return 返回體:python內置數據結構 """ conn = self.get_conn() try: status, headers, body = conn.perform_request(method, url, headers=headers, params=params, body=body) except NotFoundError as e: return None if method == "HEAD": return status return json.loads(body) def search(self, query=None, method="GET"): url = "/%s/%s/_search" % (self._index, self._type) if method == "GET": data = self.get(url, params=query) elif method == "POST": data = self.post(url, body=query) else: return None return data def get(self, url, params=None, method="GET"): """ 使用get請求訪問es服務器 """ data = self.request(method, url, params=params) return data def put(self, url, body=None, method="PUT"): """ 使用put請求訪問es """ data = self.request(method, url, body=body) return data def post(self, url, body=None, method="POST"): """使用post請求訪問服務器""" data = self.request(method, url, body=body) return data def head(self, url, *args, **kwargs): status = self.request("HEAD", url, *args, **kwargs) return status def delete(self, url, *args, **kwargs): ret = self.request("DELETE", url, *args, **kwargs) return ret