一、介紹
ElasticSearch是一個基於Lucene的搜索服務器。它提供了一個分布式多用戶能力的全文搜索引擎,基於RESTful web接口。下面介紹了利用Python API接口進行數據查詢,方便其他系統的調用。
安裝API
pip3 install elasticsearch
建立es連接
無用戶名密碼狀態
from elasticsearch import Elasticsearch es = Elasticsearch([{'host':'10.10.13.12','port':9200}])
默認的超時時間是10秒,如果數據量很大,時間設置更長一些。如果端口是9200,直接寫IP即可。代碼如下:
es = Elasticsearch(['10.10.13.12'], timeout=3600)
用戶名密碼狀態
如果Elasticsearch開啟了驗證,需要用戶名和密碼
es = Elasticsearch(['10.10.13.12'], http_auth=('xiao', '123456'), timeout=3600)
數據檢索功能
es.search(index='logstash-2015.08.20', q='http_status_code:5* AND server_name:"web1"', from_='124119')
- index - 索引名
- q - 查詢指定匹配 使用Lucene查詢語法
- from_ - 查詢起始點 默認0
- doc_type - 文檔類型
- size - 指定查詢條數 默認10
- field - 指定字段 逗號分隔
- sort - 排序 字段:asc/desc
- body - 使用Query DSL
- scroll - 滾動查詢
統計查詢功能
語法同search大致一樣,但只輸出統計值
es.count(index='logstash-2015.08.21', q='http_status_code:500')
輸出:
{'_shards':{'failed':0, 'successful':5, 'total':5}, 'count':17042}
17042 就是統計值!
知識擴展
滾動demo
# Initialize the scroll page = es.search( index ='yourIndex', doc_type ='yourType', scroll ='2m', search_type ='scan', size =1000, body ={ # Your query's body }) sid = page['_scroll_id'] scroll_size = page['hits']['total'] # Start scrolling while(scroll_size >0): print "Scrolling..." page = es.scroll(scroll_id = sid, scroll ='2m') # Update the scroll ID sid = page['_scroll_id'] # Get the number of results that we returned in the last scroll scroll_size = len(page['hits']['hits']) print "scroll size: "+ str(scroll_size) # Do something with the obtained page
以上demo實現了一次取若干數據,數據取完之后結束,不會獲取到最新更新的數據。我們滾動完之后想獲取最新數據怎么辦?滾動的時候會有一個統計值,如total: 5。跳出循環之后,我們可以用_from參數定位到5開始滾動之后的數據。
但是我用的不是這個,用的是以下方法,鏈接如下:
https://www.cnblogs.com/blue163/p/8126156.html
在下面的內容中,我會詳細介紹此代碼如何使用!
二、Query DSL
range過濾器查詢范圍
gt: > 大於 lt: < 小於 gte: >= 大於或等於 lte: <= 小於或等於
示例代碼1
"range":{ "money":{ "gt":20, "lt":40 } }
時間范圍
最近時間段
比如我要查詢最近1分鍾的
"range": { '@timestamp': {'gt': 'now-1m'} }
最新1小時
"range": { '@timestamp': {'gt': 'now-1h'} }
最新1天的
"range": { '@timestamp': {'gt': 'now-1d'} }
指定時間段
那么問題來了,它是根據當前時間來計算最近的時間。但是有些情況下,我需要制定時間范圍,精確到分鍾
假設需要查詢早上8點到9點的數據,可以這樣
"range": { '@timestamp': { "gt" : "{}T{}:00:00".format("2018-12-17","08"), "lt": "{}T{}:59:59".format("2018-12-17","09"), "time_zone": "Asia/Shanghai" } }
注意:日期和小時之間,有一個字母T來間隔。不能用空格!
time_zone 表示時區,如果默認的時區不會,可能會影響查詢結果!
bool組合過濾器
must:所有分句都必須匹配,與 AND 相同。
must_not:所有分句都必須不匹配,與 NOT 相同。
should:至少有一個分句匹配,與 OR 相同。
示例代碼
{ "bool":{ "must":[], "should":[], "must_not":[], } }
term過濾器
term單過濾
{ "terms":{ "money":20 } }
表示money包含20的記錄
terms復數版本
允許多個匹配條件
{ "terms":{ "money": [20,30] } }
表示money包含20或者30的記錄
結合bool+term來舉一個實際的例子:
查詢path字段中包含applogs最近1分鍾的記錄
"bool": { "must": [ { "terms": { "path": [ "applogs", ] } }, { "range": { '@timestamp': {'gt': 'now-1m'} } } ] }
這里使用了terms復數版本,可以隨時添加多個條件!
正則查詢
{ "regexp": { "http_status_code": "5.*" } }
match查詢
match 精確匹配
{ "match":{ "email":"123456@qq.com" } }
multi_match 多字段搜索
{ "multi_match":{ "query":"11", "fields":["Tr","Tq"] } }
demo
獲取最近一小時的數據
{'query':
{'filtered':
{'filter':
{'range':
{'@timestamp':{'gt':'now-1h'}}
}
}
}
}
條件過濾查詢
{ "query":{ "filtered":{ "query":{"match":{"http_status_code":500}}, "filter":{"term":{"server_name":"vip03"}} } } }
Terms Facet 單字段統計
{'facets':
{'stat':
{'terms':
{'field':'http_status_code',
'order':'count',
'size':50}
}
},
'size':0
}
一次統計多個字段
{'facets':
{'cip':
{'terms':
{'fields':['client_ip']}},
'status_facets':{'terms':{'fields':['http_status_code'],
'order':'term',
'size':50}}},
'query':{'query_string':{'query':'*'}},
'size':0
}
多個字段一起統計
{'facets':
{'tag':
{'terms':
{'fields':['http_status_code','client_ip'],
'size':10
}
}
},
'query':
{'match_all':{}},
'size':0
}
數據組裝
以下是kibana首頁的demo,用來統計一段時間內的日志數量
{ "facets": { "0": { "date_histogram": { "field": "@timestamp", "interval": "5m" }, "facet_filter": { "fquery": { "query": { "filtered": { "query": { "query_string": { "query": "*" } }, "filter": { "bool": { "must": [ { "range": { "@timestamp": { 'gt': 'now-1h' } } }, { "exists": { "field": "http_status_code.raw" } }, # --------------- ------- # 此處加匹配條件 ] } } } } } } } }, "size": 0 }
如果想添加匹配條件,在以上代碼標識部分加上過濾條件,按照以下代碼格式即可
{ "query": { "query_string": {"query": "backend_name:baidu.com"} } },
先介紹到這里,后續會有Query DSL API介紹。
三、需求分析
需求
下面是kibana展示的日志

需要統計某一天的日志,統計每一個小時用戶數,要求用戶id不能重復。一個用戶id就是一個用戶,也稱之為一個PV。
看一段message字段信息
2018-12-17 12:00:00,533 l=INFO [r=9538381535][s=2] [t=http-xxx-543] [APP=user] [Class=o.s.u.c.AccountController:1189] [Method=findCustomerByLoid]- Operation=find customer by loid,Params=loid:001,Content=start
其中有一個[r=9538381535],這個9538381535就是用戶id。那么用戶登錄手機APP操作,都會帶着這個id,產生一條日志。
比如user項目,那么最終要的數據格式如下:
"user":{ "00":1, "01":0, ... "22":3245, "23":765 }
這里使用24小時制來表示每一個時間段,有多個個用戶訪問了。注意:已經去重了用戶id,統計用戶數!
四、相關技術點
在放出最終代碼之前,先來介紹相關技術點,便於理解代碼。按照代碼從上到下原則,分別來介紹!
項目列表
project_list = ['user',...]
實際的項目是user,但是存儲到elasticsearch中,是userlogs,加了一個logs后綴。這個是java后端代碼定義的,便於識別!
判斷日期是否合法
def isVaildDate(self, date): try: if ":" in date: time.strptime(date, "%Y-%m-%d %H:%M:%S") else: time.strptime(date, "%Y-%m-%d") return True except: return False
因為需要統計一周的數據,所以腳本執行時,需要傳一個日期參數。那么日期參數,傳給程序是否合法呢?需要有一個函數來判斷!
記錄日志
def write_log(self, content): """ 寫入日志文件 :param path: :param content: :return: """ path = "print.log" with open(path, mode='a+', encoding='utf-8') as f: content = time.strftime('%Y-%m-%d %H:%M:%S') + ' ' + content + "\n" print(content) f.write(content)
為啥不用Python的日志模塊呢?因為測試發現,它寫入一些,我不想要的信息,太占用磁盤空間了。所以,我單獨寫了一個記錄日志方法。
獲取elasticsearch數據
def Get_Data_By_Body(self, project, fixed_date, hour): """ 獲取數據 :param project: 項目名 :param fixed_date: 指定日期 :param hour: 24小時制中的某一個小時 :return: object """ # 查詢條件,查詢項目最近1小時的數據。 doc = { "query": { "bool": { "must": [ { "terms": { "path": [ project + "logs", ] } }, { "range": { '@timestamp': { "gt": "{}T{}:00:00".format(fixed_date, hour), "lt": "{}T{}:59:59".format(fixed_date, hour), "time_zone": "Asia/Shanghai" } } } ] } } }
由於線上數據量過大,因此直接查詢一天的數據,會卡死。所以是切分為每一個小時查詢!
上面的query表示查詢語句,大概就是查詢指定項目(項目名+logs),1小時范圍內的數據
scroll獲取數據
由於1小時內的數據量,也很大。不能直接返回!默認不指定size,是返回10條數據!
size = 1000 # 指定返回1000條 queryData = self.es.search(index=self.index_name, body=doc, size=size, scroll='1m', )
參數解釋:
size 指定返回的條數,默認返回10條
index 指定索引名
body 查詢語句
scroll 告訴 Elasticsearch 把搜索上下文再保持一分鍾。1m表示1分鍾
返回結果
mdata = queryData.get("hits").get("hits") # 返回數據,它是一個列表類型 if not mdata: self.write_log('%s mdata is empty!' % project)
queryData 返回一個字典,那么真正的查詢結果在queryData['hits']['hits']中,如果這個值沒有,表示沒有查詢到數據!
注意:它並不是返回所有的結果,而是一頁的數據,是一個列表類型。因為我們使用了scroll獲取數據,只返回一頁!
分頁數據
上面只是返回了1頁,我要所有數據,怎么辦?需要使用分頁,先來看一下分頁公式
divmod(總條數, 每頁大小)
注意:divmod返回一個元祖,第一個元素,就是要分頁數
總條數,使用
total = queryData['hits']['total'] # 返回數據的總條數
每頁大小,就是上面指定的size
size = 1000 # 指定返回1000條
那么遍歷每一頁數據,需要這樣
scroll_id = queryData['_scroll_id'] # 獲取scrollID total = queryData['hits']['total'] # 返回數據的總條數 # 使用divmod設置分頁查詢 # divmod(total,1000)[0]+1 表示總條數除以1000,結果取整數加1 for i in range(divmod(total, size)[0] + 1): res = self.es.scroll(scroll_id=scroll_id, scroll='1m') # scroll參數必須指定否則會報錯 mdata += res["hits"]["hits"] # 擴展列表
scroll_id給es.scroll獲取數據使用,這個參數必須要有。
由於Python中的range是顧頭不顧尾,所以需要加1。使用for循環,就可以遍歷每一個分頁數
es.scroll(scroll_id=scroll_id, scroll='1m') 才是真正查詢每一頁的數據,必須要指定這2個參數。它的返回結果,就是查詢結果!返回一個列表
上面的mdata是一個列表,res也是列表。因此使用+=就可以擴展列表,得到所有數據!
創建年月日目錄
def create_folder(self, fixed_date): """ 創建年/月/日 文件夾 :return: path """ # 系統當前時間年份 # year = time.strftime('%Y', time.localtime(time.time())) # # 月份 # month = time.strftime('%m', time.localtime(time.time())) # # 日期 # day = time.strftime('%d', time.localtime(time.time())) # 年月日 year, month, day = fixed_date.split("-") # 具體時間 小時分鍾毫秒 # mdhms = time.strftime('%m%d%H%M%S', time.localtime(time.time())) # 判斷基礎目錄是否存在 if not os.path.exists(os.path.join(self.BASE_DIR, 'data_files')): os.mkdir(os.path.join(self.BASE_DIR, 'data_files')) # 年月日 fileYear = os.path.join(self.BASE_DIR, 'data_files', year) fileMonth = os.path.join(fileYear, month) fileDay = os.path.join(fileMonth, day) # 判斷目錄是否存在,否則創建 try: if not os.path.exists(fileYear): os.mkdir(fileYear) os.mkdir(fileMonth) os.mkdir(fileDay) else: if not os.path.exists(fileMonth): os.mkdir(fileMonth) os.mkdir(fileDay) else: if not os.path.exists(fileDay): os.mkdir(fileDay) return fileDay except Exception as e: print(e) return False
統計結果是最終寫入到一個txt里面,那么如何存儲呢?使用年月日目錄在區分,可以知道這個txt文件,是屬於哪一天的。到了一定時間后,可以定期清理,非常方便!
這里使用的傳參方式,傳入一個日期。所以使用"-"就可以切割出年月日
# 年月日 year, month, day = fixed_date.split("-")
輸出24小時
使用以下代碼就可以實現
hour_list = ['{num:02d}'.format(num=i) for i in range(24)]
輸出:
['00', '01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23']
項目統計字典
需要統計每一個項目的每一個小時的用戶id,用戶id必須去重。既然要去重,我們首先會想到用集合。
但是還有一個辦法,使用字典,也可以去重。因為字典的key是唯一的。
構造24小時字典
先來構造項目user的數據,格式如下:
"basebusiness": { "00": {}, "01": {}, "02": {}, "03": {}, "04": {}, "05": {}, "06": {}, "07": {}, "08": {}, "09": {}, "10": {}, "11": {}, "12": {}, "13": {}, "14": {}, "15": {}, "16": {}, "17": {}, "18": {}, "19": {}, "20": {}, "21": {}, "22": {}, "23": {}, }
這只是一個項目,實際有很多項目。所以每一個字典,都有這樣的24小時數據。相關代碼如下:
project_dic = {} # 項目統計字典
# 24小時
hour_list = ['{num:02d}'.format(num=i) for i in range(24)]
for hour in hour_list: # 遍歷24小時
# print("查詢{}點的數據###############".format(hour))
self.write_log("查詢{}點的數據###############".format(hour))
for project in project_list: # 遍歷項目列表
if not project_dic.get(project):
project_dic[project] = {} # 初始化項目字典
if not project_dic[project].get(hour):
project_dic[project][hour] = {} # 初始化項目小時字典
這里的每一個小時,都是空字典。還沒有添加數據,需要添加用戶id,下面會講到!
正則匹配用戶id
看這一點字符串
2018-12-17 12:00:00,533 l=INFO [r=9538381535][s=2] [t=http-xxx-543] [APP=user]
需要提取出9538381535,思路就是:匹配中括號內容-->提取以r=開頭的內容-->使用等號切割,獲取用戶id
匹配中括號內容
p1 = re.compile(r'[[](.*?)[]]', re.S) # 最小匹配,匹配中括號的內容
注意:這里要使用最小匹配,不能使用貪婪匹配。這一段正則,我是用網上找的,測試ok
提取和切割,就比較簡單了。使用startswith和split方法,就可以了!
使用字典去重
接下來,需要將用戶id寫入到字典中,需要去重,否則字典添加時,會報錯!
那么如何使用字典去重呢?只需要遵循一個原則即可! 有則忽略,無則添加
# 判斷字典中rid不存在時,避免字典鍵值重復 if not project_dic[project][hour].get(rid): project_dic[project][hour][rid] = True # 添加值
生成器
這里主要在2個方法中,使用了生成器。生成器的優點,就是節省內容。
一處在是Get_Data_By_Body方法中,它需要返回所有查詢的數據。數據量非常大,因此必須要生成器,否則服務器內容就溢出!
還有一處,就main方法。它是返回項目的統計結果。注意,它不是最終結果。它里面保存了每一個項目,每一個小時中的用戶id,是已經去重的用戶id。
數據量也是比較大,當然,沒有Get_Data_By_Body方法返回的結果大。
統計每一個小時用戶數
main方法,返回的字典格式如下:
"user":{ "00":{ "242412":True, } "01":{ "":True, }, ... "22":{ "457577":True, "546583":True, }, "23":{ "457577":True, "546583":True, "765743":True, } }
我需要知道,每一個小時的用戶數。怎么統計呢?用2個方法
1. 遍歷字典的每一個小時,使用計數器
2. 使用len方法(推薦)
最簡單的方法,就是使用len方法,就可以知道每一個小時有多少個key
for i in dic: # 遍歷數據 if not final_dic.get(i): final_dic[i] = {} # 初始化字典 for h in sorted(dic[i]): # 遍歷項目的每一個小時 # 統計字典的長度 final_dic[i][h] = len(dic[i][h])
有序字典
看下面的數據

可以發現,24小時,排序是亂的。這樣給領導看時,不太美觀。所以需要對24小時進行排序!
在Python 3.6之前,字典的key是無序的。因此,需要定義一個有序字典,在寫入之前,要對字典的key做一次排序。
這樣順序寫入到有序字典之后,之后再次調用,依然是有序的!
order_dic = OrderedDict() # 實例化一個有序字典 final_dic = {} # 最終統計結果 for dic in data: # 遍歷生成器 for i in dic: # 遍歷數據 if not final_dic.get(i): final_dic[i] = order_dic # 初始化字典 # 有序字典必須先對普通字典key做排序 for h in sorted(dic[i]): # 遍歷項目的每一個小時 # 統計字典的長度 final_dic[i][h] = len(dic[i][h])
完整代碼
#!/usr/bin/env python3 # coding: utf-8 import re import os import sys import json import time from collections import OrderedDict from elasticsearch import Elasticsearch # 項目列表 project_list = ['usercenter', ['login']] # yesterday = (datetime.datetime.now() + datetime.timedelta(days=-1)).strftime("%Y-%m-%d") # today = datetime.datetime.now().strftime("%Y-%m-%d") class ElasticObj: def __init__(self, index_name, ip, fixed_date, timeout=3600): ''' :param index_name: 索引名稱 :param ip: elasticsearch地址 :param timeout: 設置超時間,默認是10秒的,如果數據量很大,時間要設置更長一些 ''' self.index_name = index_name self.ip = ip self.timeout = timeout # 無用戶名密碼狀態 # self.es = Elasticsearch([self.ip], timeout=self.timeout) # 用戶名密碼狀態 # self.es = Elasticsearch([ip],http_auth=('elastic', 'password'),port=9200) self.es = Elasticsearch([self.ip], http_auth=('elastic', '123456'), timeout=self.timeout) self.fixed_date = fixed_date # 指定日期 # 當前py文件所在的文件夾 self.BASE_DIR = os.path.dirname(os.path.abspath(__file__)) self.fileDay = self.create_folder() # 創建日志和數據目錄 @staticmethod def isVaildDate(date): """ 判斷日期是否合法 :param date: 日期,比如: 2018-03-30 :return: bool """ try: if ":" in date: time.strptime(date, "%Y-%m-%d %H:%M:%S") else: time.strptime(date, "%Y-%m-%d") return True except: return False def write_log(self, content): """ 寫入日志文件 :param content: 寫入內容 :return: """ path = os.path.join(self.fileDay,"output_%s.log" %self.fixed_date) # path = "output_{}.log".format(self.fixed_date) with open(path, mode='a+', encoding='utf-8') as f: content = time.strftime('%Y-%m-%d %H:%M:%S') + ' ' + content + "\n" print(content) f.write(content) def Get_Data_By_Body(self, project, hour): """ 獲取數據 :param project: 項目名 :param hour: 24小時制中的某一個小時 :return: 生成器 """ # doc = {'query': {'match_all': {}}} # 查詢條件,查詢項目最近1小時的數據。now-1h表示最近1小時 # print(type(fixed_date)) # print("{date}T00:00:00".format(date=fixed_date)) # 24小時 doc = { "query": { "bool": { "must": [ { "terms": { "path": [ project + "logs", ] } }, { # "range": { # '@timestamp': {'gt': 'now-1m'} # } "range": { '@timestamp': { "gt": "{}T{}:00:00".format(self.fixed_date, hour), "lt": "{}T{}:59:59".format(self.fixed_date, hour), "time_zone": "Asia/Shanghai" } } } ] } } } # queryData = self.es.search(index=self.index_name, body=doc) # scroll 參數告訴 Elasticsearch 把搜索上下文再保持一分鍾,1m表示1分鍾 # size 參數允許我們配置沒匹配結果返回的最大命中數。每次調用 scroll API 都會返回下一批結果,直到不再有可以返回的結果,即命中數組為空。 size = 1000 # 指定返回1000條 queryData = self.es.search(index=self.index_name, body=doc, size=size, scroll='1m', ) # print(queryData['hits']['total']) mdata = queryData.get("hits").get("hits") # 返回查詢的數據,不是所有數據,而是一頁的數據,它是一個列表類型 if not mdata: self.write_log('%s mdata is empty!' % project) # scroll_id 的值就是上一個請求中返回的 _scroll_id 的值 scroll_id = queryData['_scroll_id'] # 獲取scrollID total = queryData['hits']['total'] # 返回數據的總條數 # print("查詢項目{} {}點的數據,總共有{}條".format(project,hour,total)) self.write_log("查詢項目{} {}點的數據,總共有{}條".format(project, hour, total)) # 使用divmod設置分頁查詢 # divmod(total,1000)[0]+1 表示總條數除以1000,結果取整數加1 for i in range(divmod(total, size)[0] + 1): res = self.es.scroll(scroll_id=scroll_id, scroll='1m') # scroll參數必須指定否則會報錯 mdata += res["hits"]["hits"] # 擴展列表 # yield mdata # print(mdata) # return mdata yield mdata def create_folder(self): """ 創建年/月/日 文件夾 :return: path """ # 系統當前時間年份 # year = time.strftime('%Y', time.localtime(time.time())) # # 月份 # month = time.strftime('%m', time.localtime(time.time())) # # 日期 # day = time.strftime('%d', time.localtime(time.time())) # 年月日 year, month, day = self.fixed_date.split("-") # 具體時間 小時分鍾毫秒 # mdhms = time.strftime('%m%d%H%M%S', time.localtime(time.time())) # 判斷基礎目錄是否存在 if not os.path.exists(os.path.join(self.BASE_DIR, 'data_files')): os.mkdir(os.path.join(self.BASE_DIR, 'data_files')) # 年月日 fileYear = os.path.join(self.BASE_DIR, 'data_files', year) fileMonth = os.path.join(fileYear, month) fileDay = os.path.join(fileMonth, day) # 判斷目錄是否存在,否則創建 try: if not os.path.exists(fileYear): os.mkdir(fileYear) os.mkdir(fileMonth) os.mkdir(fileDay) else: if not os.path.exists(fileMonth): os.mkdir(fileMonth) os.mkdir(fileDay) else: if not os.path.exists(fileDay): os.mkdir(fileDay) return fileDay except Exception as e: print(e) return False def main(self): """ 主要處理邏輯 :return: 生成器 """ project_dic = {} # 項目統計字典 # fixed_date = datetime.datetime.strptime(fixed_date, "%Y-%m-%d") # strftime("%Y-%m-%d") # conv_date = fixed_date.strftime("%Y-%m-%d") # print(conv_date, type(conv_date)) # exit() # now_hour = fixed_date.strftime('%H') # 當前時間的小時 # print(now_hour) # 24小時 hour_list = ['{num:02d}'.format(num=i) for i in range(24)] # hour_list = ['{num:02d}'.format(num=i) for i in range(2)] # project="usercenter" # project_dic[project] = {now_hour: {}} # 初始化字典 for hour in hour_list: # 遍歷24小時 # print("查詢{}點的數據###############".format(hour)) self.write_log("查詢{}點的數據###############".format(hour)) for project in project_list: # 遍歷項目列表 if not project_dic.get(project): project_dic[project] = {} # 初始化項目字典 if not project_dic[project].get(hour): project_dic[project][hour] = {} # 初始化項目小時字典 mdata = self.Get_Data_By_Body(project, hour) # 獲取數據 for item in mdata: # 遍歷生成器 for hit in item: # 遍歷返回數據 # hit是一個字典 str1 = hit['_source']['message'] # 查詢指定字典 p1 = re.compile(r'[[](.*?)[]]', re.S) # 最小匹配,匹配中括號的內容 for i in re.findall(p1, str1): # 遍歷結果 if i.startswith('r='): # 判斷以r=開頭的 rid = i.split("=")[1] # 獲取rid # print("rid",rid) # 判斷字典中rid不存在時,避免字典鍵值重復 if not project_dic[project][hour].get(rid): project_dic[project][hour][rid] = True # 添加值 time.sleep(1) # 休眠1秒鍾 # return project_dic yield project_dic if __name__ == '__main__': # fixed_date = "2018-12-16" fixed_date = sys.argv[1] # 日期參數 if not ElasticObj.isVaildDate(fixed_date): print("日期不合法!") exit() startime = time.time() # 開始時間 index_name = "common-*" es_server = "192.168.92.131" obj = ElasticObj(index_name, es_server, fixed_date) # 連接elasticsearch print("正在查詢日期%s這一天的數據" % fixed_date) obj.write_log("###########################################") obj.write_log("正在查詢日期%s這一天的數據" % fixed_date) data = obj.main() # print("初步結果",data) # fileDay = obj.create_folder() # 創建目錄 # if not fileDay: # # print("創建目錄失敗!") # obj.write_log("創建目錄失敗!") # exit() order_dic = OrderedDict() # 實例化一個有序字典 final_dic = {} # 最終統計結果 for dic in data: # 遍歷生成器 for i in dic: # 遍歷數據 if not final_dic.get(i): final_dic[i] = order_dic # 初始化字典 # 有序字典必須先對普通字典key做排序 for h in sorted(dic[i]): # 遍歷項目的每一個小時 # 統計字典的長度 final_dic[i][h] = len(dic[i][h]) # print("最終結果",final_dic) # 統計結果 obj.write_log("最終結果執行完畢!") # 寫入文件 with open(os.path.join(obj.fileDay, 'access_data.txt'), encoding='utf-8', mode='a') as f: f.write(json.dumps(final_dic) + "\n") endtime = time.time() take_time = endtime - startime if take_time < 1: # 判斷不足1秒時 take_time = 1 # 設置為1秒 # 計算花費時間 m, s = divmod(take_time, 60) h, m = divmod(m, 60) # print("本次花費時間 %02d:%02d:%02d" % (h, m, s)) obj.write_log("統計日期%s這一天的數據完成!請查閱data_files目錄的日志和數據文件" % fixed_date) obj.write_log("本次花費時間 %02d:%02d:%02d" % (h, m, s))
日志文件和數據文件,都在年月日目錄里面!
本文參考鏈接:
http://www.cnblogs.com/letong/p/4749234.html
http://www.linuxyw.com/790.html
https://www.cnblogs.com/blue163/p/8126156.html
