通過Python讀取elasticsearch中的數據


1.說明

在前面的分享《通過Python將監控數據由influxdb寫入到MySQL》一文中,主要介紹了influxdb-->MySQL。InfluxDB主要存儲的由telegraf收集的DB性能數據,此外還有資源、主從、集群等數據。而 Server Log、DB Log(Error Log 和 Slow Log)則是通過filebeat 和 Logstash收集、過濾保存到elasticsearch中。所以,有必要實現通過Python讀取elasticsearch中的數據(寫入到MySQL)的功能。

此處實現的功能是讀取index中的host字段,將數值保存到MySQL中;換言之,通過Python查看那些機器已經部署了收集log的程序,並將查詢出的server IP保存到MySQL數據庫中。 

2.在MySQL庫存創建表host_dblog_collector

 腳本如下

CREATE TABLE `host_dblog_collector` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `ip_address` varchar(255) NOT NULL DEFAULT '',
  `datetime_created` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '數據行創建時間',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=30 DEFAULT CHARSET=utf8mb4;

 3.用來收集的python代碼

#coding:utf8
import os
import time
from os import walk
from datetime import datetime
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

import db_conn
mysqldb = db_conn.db
# use cursor
cursor = mysqldb.cursor()

###數據收集前,清除之前收集的數據
sql_delete = "delete from host_dblog_collector "
cursor.execute(sql_delete)
mysqldb.commit()

class ElasticObj:
    def __init__(self, index_name,index_type,ip ="ES的Server IP"):
        '''

        :param index_name: 索引名稱
        :param index_type: 索引類型,默認為_doc
        '''
        self.index_name =index_name
        self.index_type = index_type
        # 無用戶名密碼狀態
        #self.es = Elasticsearch([ip])
        #用戶名密碼狀態
        self.es = Elasticsearch([ip],http_auth=('ES的賬號', 'ES的密碼'),port=ES端口)

    #### 獲取已部署日志收集的server host
    def get_deploymentlog_serverhost(self):
        doc = {
              "size": 0,  ###此處的sieze為0,表示不取文檔的數據,只取聚合結果數據
              "aggs": {
                    "db_hosts": {
                        ##"cardinality":{"field": "fields.db_host.keyword"} ## 這個是先top size 這個數據量的記錄,再去distnct
                        "terms":{
                                  "field": "fields.db_host.keyword",
                                  "size": 1000   ##此處的size 可以理解為分組后取多少組數據
                                   }

                        }
                    }
              ###"_source":"fields.db_host",
              ##"size": 1500  ###如果沒有size的話,默認顯示10行
        }

        _searched = self.es.search(index=self.index_name, doc_type=self.index_type, body=doc)
        print(_searched)

        for agg_bucket_hosts in _searched['aggregations']['db_hosts']['buckets']:
            # print hit['_source']
            # print (hit['_source']['fields']['db_host'])
            server_ip = agg_bucket_hosts['key']
            sql_insert = "insert into host_dblog_collector(ip_address) " \
                   "values('%s')" % \
                   (server_ip)

            cursor.execute(sql_insert)
            mysqldb.commit()

### 以mysql-開頭的所有的index,索引的類型為_doc
obj =ElasticObj("mysql-*","_doc",ip ="ES服務器的IP")

obj.get_deploymentlog_serverhost()

 補充說明:代碼中引用了db_conn模塊,相應的代碼請在《通過Python將監控數據由influxdb寫入到MySQL》一文中查看,在此不再贅述。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM