1.說明
在前面的分享《通過Python將監控數據由influxdb寫入到MySQL》一文中,主要介紹了influxdb-->MySQL。InfluxDB主要存儲的由telegraf收集的DB性能數據,此外還有資源、主從、集群等數據。而 Server Log、DB Log(Error Log 和 Slow Log)則是通過filebeat 和 Logstash收集、過濾保存到elasticsearch中。所以,有必要實現通過Python讀取elasticsearch中的數據(寫入到MySQL)的功能。
此處實現的功能是讀取index中的host字段,將數值保存到MySQL中;換言之,通過Python查看那些機器已經部署了收集log的程序,並將查詢出的server IP保存到MySQL數據庫中。
2.在MySQL庫存創建表host_dblog_collector
腳本如下
CREATE TABLE `host_dblog_collector` ( `id` int(11) NOT NULL AUTO_INCREMENT, `ip_address` varchar(255) NOT NULL DEFAULT '', `datetime_created` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '數據行創建時間', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=30 DEFAULT CHARSET=utf8mb4;
3.用來收集的python代碼
#coding:utf8 import os import time from os import walk from datetime import datetime from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk import db_conn mysqldb = db_conn.db # use cursor cursor = mysqldb.cursor() ###數據收集前,清除之前收集的數據 sql_delete = "delete from host_dblog_collector " cursor.execute(sql_delete) mysqldb.commit() class ElasticObj: def __init__(self, index_name,index_type,ip ="ES的Server IP"): ''' :param index_name: 索引名稱 :param index_type: 索引類型,默認為_doc ''' self.index_name =index_name self.index_type = index_type # 無用戶名密碼狀態 #self.es = Elasticsearch([ip]) #用戶名密碼狀態 self.es = Elasticsearch([ip],http_auth=('ES的賬號', 'ES的密碼'),port=ES端口) #### 獲取已部署日志收集的server host def get_deploymentlog_serverhost(self): doc = { "size": 0, ###此處的sieze為0,表示不取文檔的數據,只取聚合結果數據 "aggs": { "db_hosts": { ##"cardinality":{"field": "fields.db_host.keyword"} ## 這個是先top size 這個數據量的記錄,再去distnct "terms":{ "field": "fields.db_host.keyword", "size": 1000 ##此處的size 可以理解為分組后取多少組數據 } } } ###"_source":"fields.db_host", ##"size": 1500 ###如果沒有size的話,默認顯示10行 } _searched = self.es.search(index=self.index_name, doc_type=self.index_type, body=doc) print(_searched) for agg_bucket_hosts in _searched['aggregations']['db_hosts']['buckets']: # print hit['_source'] # print (hit['_source']['fields']['db_host']) server_ip = agg_bucket_hosts['key'] sql_insert = "insert into host_dblog_collector(ip_address) " \ "values('%s')" % \ (server_ip) cursor.execute(sql_insert) mysqldb.commit() ### 以mysql-開頭的所有的index,索引的類型為_doc obj =ElasticObj("mysql-*","_doc",ip ="ES服務器的IP") obj.get_deploymentlog_serverhost()
補充說明:代碼中引用了db_conn模塊,相應的代碼請在《通過Python將監控數據由influxdb寫入到MySQL》一文中查看,在此不再贅述。
