python 中ELasticsearch 連接池


import os
import json
from datetime import datetime
from elasticsearch import Elasticsearch, RequestsHttpConnection
from elasticsearch import Transport
from elasticsearch.exceptions import NotFoundError
 
 
class ES(object):
 
    _index = ""
    _type = ""
    
    def __init__(self, hosts):
        # 基於requests實例化es連接池
        self.conn_pool = Transport(hosts=hosts, connection_class=RequestsHttpConnection).connection_pool
 
    def get_conn(self):
        """
        從連接池獲取一個連接
        """
        conn = self.conn_pool.get_connection()
        return conn
 
    def request(self, method, url, headers=None, params=None, body=None):
        """
        想es服務器發送一個求情
        @method     請求方式
        @url        請求的絕對url  不包括域名
        @headers    請求頭信息
        @params     請求的參數:dict
        @body       請求體:json對象(headers默認Content-Type為application/json)
        # return    返回體:python內置數據結構
        """
        conn = self.get_conn()
        try:
            status, headers, body = conn.perform_request(method, url, headers=headers, params=params, body=body)
        except NotFoundError as e:
            return None
        if method == "HEAD":
            return status
        return json.loads(body)
 
    def search(self, query=None, method="GET"):
        url = "/%s/%s/_search" % (self._index, self._type)
        if method == "GET":
            data = self.get(url, params=query)
        elif method == "POST":
            data = self.post(url, body=query)
        else:
            return None
        return data
 
    def get(self, url, params=None, method="GET"):
        """
        使用get請求訪問es服務器
        """
        data = self.request(method, url, params=params)
        return data
 
    def put(self, url, body=None, method="PUT"):
        """
        使用put請求訪問es
        """
        data = self.request(method, url, body=body)
        return data
 
    def post(self, url, body=None, method="POST"):
        """使用post請求訪問服務器"""
        data = self.request(method, url, body=body)
        return data
 
    def head(self, url, *args, **kwargs):
        status = self.request("HEAD", url, *args, **kwargs)
        return status
 
    def delete(self, url, *args, **kwargs):
        ret = self.request("DELETE", url, *args, **kwargs)
        return ret

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM