使用Python對ElasticSearch獲取數據及操作


#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
    @Time    : 2018/7/4
    @Author  : LiuXueWen
    @Site    : 
    @File    : ElasticSearchOperation.py
    @Software: PyCharm
    @Description: 對elasticsearch數據的操作,包括獲取數據,發送數據
"""
import elasticsearch
import json

import Util_Ini_Operation

class elasticsearch_data():
    def __init__(self,hosts,username,password,maxsize,is_ssl):
        # 初始化ini操作腳本,獲取配置文件
        try:
            # 判斷請求方式是否ssl加密
            if is_ssl == "true":
                # 獲取證書地址
                cert_pem = Util_Ini_Operation.get_ini("config.ini").get_key_value("certs","certs")
                es_ssl = elasticsearch.Elasticsearch(
                    # 地址
                    hosts=hosts,
                    # 用戶名密碼
                    http_auth=(username,password),
                    # 開啟ssl
                    use_ssl=True,
                    # 確認有加密證書
                    verify_certs=True,
                    # 對應的加密證書地址
                    client_cert=cert_pem
                )
                self.es = es_ssl
            elif is_ssl == "false":
                # 創建普通類型的ES客戶端
                es_ordinary = elasticsearch.Elasticsearch(hosts, http_auth=(username, password), maxsize=int(maxsize))
                self.es = es_ordinary
        except Exception as e:
            print(e)


    def query_data(self,keywords_list,date):
        gte = "now-"+str(date)
        query_data = {
            # 查詢語句
            "query": {
                "bool": {
                    "must": [
                        {
                            "query_string": {
                                "query": keywords_list,
                                "analyze_wildcard": True
                            }
                        },
                        {
                            "range": {
                                "@timestamp": {
                                    "gte": gte,
                                    "lte": "now",
                                    "format": "epoch_millis"
                                }
                            }
                        }
                    ],
                    "must_not": []
                }
            }
        }
        return query_data

    # 從es獲取數據
    def get_datas_by_query(self,index_name,keywords,param,date):
        '''
        :param index_name: 索引名稱
        :param keywords: 關鍵字詞,數組
        :param param: 需要數據條件,例如_source
        :param date: 過去時間范圍,字符串格式,例如過去30分鍾內數據,"30m"
        :return: all_datas 返回查詢到的所有數據(已經過param過濾)
        '''

        all_datas = []
        # 遍歷所有的查詢條件
        for keywords_list in keywords:
            # DSL語句
            query_data = self.query_data(keywords_list,date)
            res = self.es.search(
                index=index_name,
                body=query_data
            )
            for hit in res['hits']['hits']:
                # 獲取指定的內容
                response = hit[param]
                # 添加所有數據到數據集中
                all_datas.append(response)
        # 返回所有數據內容
        return all_datas

    # 當索引不存在創建索引
    def create_index(self,index_name):
        '''
        :param index_name: 索引名稱
        :return:如果創建成功返回創建結果信息,試過已經存在創建新的index失敗返回index的名稱
        '''
        # 獲取索引的映射
        # index_mapping = IndexMapping.index_mapping
        # # 判斷索引是否存在
        # if self.es.indices.exists(index=index_name) is not True:
        #     # 創建索引
        #     res = self.es.indices.create(index=index_name,body=index_mapping)
        #     # 返回結果
        #     return res
        # else:
        #     # 返回索引名稱
        #     return index_name
        pass

    # 插入指定的單條數據內容
    def insert_single_data(self,index_name,doc_type,data):
        '''
        :param index_name: 索引名稱
        :param doc_type: 文檔類型
        :param data: 需要插入的數據內容
        :return: 執行結果
        '''
        res = self.es.index(index=index_name,doc_type=doc_type,body=data)
        return res

    # 向ES中新增數據,批量插入
    def insert_datas(self,index_name):
        '''
        :desc 通過讀取指定的文件內容獲取需要插入的數據集
        :param index_name: 索引名稱
        :return: 插入成功的數據條數
        '''
        insert_datas = []
        # 判斷插入數據的索引是否存在
        self.createIndex(index_name=index_name)
        # 獲取插入數據的文件地址
        data_file_path = self.ini.get_key_value("datafile","datafilepath")
        # 獲取需要插入的數據集
        with open(data_file_path,"r+") as data_file:
            # 獲取文件所有數據
            data_lines = data_file.readlines()
            for data_line in data_lines:
                # string to json
                data_line = json.loads(data_line)
                insert_datas.append(data_line)
        # 批量處理
        res = self.es.bulk(index=index_name,body=insert_datas,raise_on_error=True)
        return res

    # 從ES中在指定的索引中刪除指定數據(根據id判斷)
    def delete_data_by_id(self,index_name,doc_type,id):
        '''
        :param index_name: 索引名稱
        :param index_type: 文檔類型
        :param id: 唯一標識id
        :return: 刪除結果信息
        '''
        res = self.es.delete(index=index_name,doc_type=doc_type,id=id)
        return res

    # 根據條件刪除數據
    def delete_data_by_query(self,index_name,doc_type,param,gt_time,lt_time):
        '''
        :param index_name:索引名稱,為空查詢所有索引
        :param doc_type:文檔類型,為空查詢所有文檔類型
        :param param:過濾條件值
        :param gt_time:時間范圍,大於該時間
        :param lt_time:時間范圍,小於該時間
        :return:執行條件刪除后的結果信息
        '''
        # DSL語句
        query_data = {
            # 查詢語句
            "query": {
                "bool": {
                    "must": [
                        {
                            "query_string": {
                                "query": param,
                                "analyze_wildcard": True
                            }
                        },
                        {
                            "range": {
                                "@timestamp": {
                                    "gte": gt_time,
                                    "lte": lt_time,
                                    "format": "epoch_millis"
                                }
                            }
                        }
                    ],
                    "must_not": []
                }
            }
        }
        res = self.es.delete_by_query(index=index_name,doc_type=doc_type,body=query_data,_source=True)
        return res

    # 指定index中刪除指定時間段內的全部數據
    def delete_all_datas(self,index_name,doc_type,gt_time,lt_time):
        '''
        :param index_name:索引名稱,為空查詢所有索引
        :param doc_type:文檔類型,為空查詢所有文檔類型
        :param gt_time:時間范圍,大於該時間
        :param lt_time:時間范圍,小於該時間
        :return:執行條件刪除后的結果信息
        '''
        # DSL語句
        query_data = {
            # 查詢語句
            "query": {
                "bool": {
                    "must": [
                        {
                            "match_all": {}
                        },
                        {
                            "range": {
                                "@timestamp": {
                                    "gte": gt_time,
                                    "lte": lt_time,
                                    "format": "epoch_millis"
                                }
                            }
                        }
                    ],
                    "must_not": []
                }
            }
        }
        res = self.es.delete_by_query(index=index_name, doc_type=doc_type, body=query_data, _source=True)
        return res

    # 修改ES中指定的數據
    def update_data_by_id(self,index_name,doc_type,id,data):
        '''
        :param index_name: 索引名稱
        :param doc_type: 文檔類型,為空表示所有類型
        :param id: 文檔唯一標識編號
        :param data: 更新的數據
        :return: 更新結果信息
        '''
        res = self.es.update(index=index_name,doc_type=doc_type,id=id,body=data)
        return res

轉載自https://blog.csdn.net/carolcoral/article/details/80984712


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM