大佬請自覺路過~ ~ ~
1、背景介紹
本文以我的博客站點其中一段時間的訪問日志為例進行分析
-
用到的知識點
基本數據類型列表,基本數據類型字典,re
模塊正則匹配,pandas
模塊數據處理,xlwt
模塊excel
寫入等 -
最終實現的功能
分析得到日志中訪問ip
的top20
,訪問地址的top20
,訪問客戶端ua
的排名,並且生成excel
報表
2、思路演進
2.1、第一步讀取日志
對nginx
進行日志分析,首先拿到需要分析的nginx
日志文件,日志文件的內容具有固定的定義方法,每一行的日志中每一個特殊的字段都代表着具體的含義,例如:
95.143.192.110 - - [15/Dec/2019:10:22:00 +0800] "GET /post/pou-xi-he-jie-jue-python-zhong-wang-luo-nian-bao-de-zheng-que-zi-shi/ HTTP/1.1" 304 0 "https://www.ssgeek.com/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36"
上面的日志內容的字段信息依次代表着:訪問者來源ip
、訪問時間、http
請求方法、請求地址、http
狀態碼、本次請求的字節大小、refer
信息、客戶端ua
標識
因此,首先提煉出一行內容,對這行內容進行分組統計並記錄每個字段的具體信息,然后把對這一行的分析手段去對整個日志文件進行分析,為了匹配日志中的每個字段,需要用到re
模塊進行正則匹配,代碼如下:
import re
obj = re.compile(r'(?P<ip>.*?)- - \[(?P<time>.*?)\] "(?P<request>.*?)" (?P<status>.*?) (?P<bytes>.*?) "(?P<referer>.*?)" "(?P<ua>.*?)"')
def load_log(path):
with open(path, mode="r", encoding="utf-8") as f:
for line in f:
line = line.strip()
parse(line)
def parse(line):
# 解析單行nginx日志
try:
result = obj.match(line)
print(result.group("ip"))
except:
pass
if __name__ == '__main__':
load_log("nginx_access.log")
通過re
模塊依次分組匹配為:ip
、time
、request
、status
、bytes
、referer
、ua
上面的內容最終打印出來了所有的訪問者來源ip
進一步加強,輸出所有字段,直接打印print(result.groupdict())
即可,輸出結果是多個字典,如下所示:
{'ip': '46.229.168.150 ', 'time': '24/Dec/2019:13:21:39 +0800', 'request': 'GET /post/zabbix-web-qie-huan-wei-nginx-ji-https HTTP/1.1', 'status': '301', 'bytes': '178', 'referer': '-', 'ua': 'Mozilla/5.0 (compatible; SemrushBot/6~bl; +http://www.semrush.com/bot.html)'}
2.2、第二步解析日志
精准分析單行日志,並且加入一些格式化輸出和過濾的手段
load_log()
函數:
在load_log()
函數中,為了避免有錯誤的日志(類似於“臟數據”),因此定義了兩個空列表lst
和error_lst
用來記錄匹配的結果,列表中的每一個元素表示匹配的一行日志,最后打印了總行數,匹配到的行數,不能匹配到的行數(錯誤日志行數)
parse()
函數:
在parse()
函數中,傳入參數line
,一次對每行中分組匹配到的每一個字段進行處理,處理完成后賦值到列表元素,其中客戶端ua標識僅僅列出了一些常見的,如果想要匹配的更為精確,可以參考常用瀏覽器(PC/移動)user-agent參考對照表,把匹配規則寫的更精確即可
import re
import datetime
obj = re.compile(
r'(?P<ip>.*?)- - \[(?P<time>.*?)\] "(?P<request>.*?)" (?P<status>.*?) (?P<bytes>.*?) "(?P<referer>.*?)" "(?P<ua>.*?)"')
def load_log(path):
lst = []
error_lst = []
i = 0
with open(path, mode="r", encoding="utf-8") as f:
for line in f:
line = line.strip()
dic = parse(line)
if dic: # 正確的數據添加到lst列表中
lst.append(dic)
else:
error_lst.append(line) # 臟數據添加到error_lst列表中
i += 1
print(i)
print(len(error_lst))
print(len(lst))
def parse(line):
# 解析單行nginx日志
dic = {}
try:
result = obj.match(line)
# ip處理
ip = result.group("ip")
if ip.strip() == '-' or ip.strip() == "": # 如果是匹配到沒有ip就把這條數據丟棄
return False
dic['ip'] = ip.split(",")[0] # 如果有兩個ip,取第一個ip
# 狀態碼處理
status = result.group("status") # 狀態碼
dic['status'] = status
# 時間處理
time = result.group("time") # 21/Dec/2019:21:45:31 +0800
time = time.replace(" +0800", "") # 替換+0800為空
t = datetime.datetime.strptime(time, "%d/%b/%Y:%H:%M:%S") # 將時間格式化成友好的格式
dic['time'] = t
# request處理
request = result.group(
"request") # GET /post/pou-xi-he-jie-jue-python-zhong-wang-luo-nian-bao-de-zheng-que-zi-shi/ HTTP/1.1
a = request.split()[1].split("?")[0] # 往往url后面會有一些參數,url和參數之間用?分隔,取出不帶參數的url
dic['request'] = a
# user_agent處理
ua = result.group("ua")
if "Windows NT" in ua:
u = "windows"
elif "iPad" in ua:
u = "ipad"
elif "Android" in ua:
u = "android"
elif "Macintosh" in ua:
u = "mac"
elif "iPhone" in ua:
u = "iphone"
else:
u = "其他設備"
dic['ua'] = u
# refer處理
referer = result.group("referer")
dic['referer'] = referer
return dic
except:
return False
if __name__ == '__main__':
load_log("nginx_access.log")
執行代碼,查看打印的結果,控制台輸出:
9692
542
9150
依次表示日志文件中的總行數、匹配錯誤(沒有匹配到的)的行數、匹配正確的行數
2.3、第三步分析日志
利用pandas
模塊進行日志的分析
analyse()
函數:
將解析過濾得到的lst
列表作為參數傳入,列表中的數據格式形如[{ip:xxx, api:xxx, status:xxxx, ua:xxx}]
df = pd.DataFrame(lst)
將解析得到的列表轉換成為類似表格的類型,控制台的輸出df
如下,處理后為每個數據加上了序號,第一行相當於表頭,表頭就是前面得到的字典中的key
ip status ... ua referer
0 95.143.192.110 200 ... mac -
1 95.143.192.110 304 ... mac -
2 95.143.192.110 304 ... mac -
3 95.143.192.110 304 ... mac https://www.ssgeek.com/
4 203.208.60.122 200 ... android -
... ... ... ... ... ...
9145 46.4.60.249 404 ... 其他設備 -
9146 46.4.60.249 404 ... 其他設備 -
9147 46.4.60.249 404 ... 其他設備 -
9148 46.4.60.249 404 ... 其他設備 -
9149 154.223.188.124 404 ... windows -
pd.value_counts(df['ip'])
取出ip
並統計數ip
的次數;得到的結果第一列是ip
,第二列是次數,pandas
默認將第一列認為是行索引,因此需要將數據整體右移,通過reset_index()
重新定義一個索引即可,效果形如:
index ip
0 89.163.242.228 316
1 207.180.220.114 312
2 78.46.90.53 302
3 144.76.38.10 301
4 78.46.61.245 301
... ... ...
1080 203.208.60.85 1
1081 66.249.72.8 1
1082 141.8.132.13 1
1083 207.46.13.119 1
1084 203.208.60.7 1
這個時候發現索引有了,但是表頭也跟着右移了,不對應了,需要重新設置一個表頭reset_index().rename(columns={"index": "ip", "ip": "count"})
,效果形如
ip count
0 89.163.242.228 316
1 207.180.220.114 312
2 78.46.90.53 302
3 78.46.61.245 301
4 144.76.38.10 301
... ... ...
1080 47.103.17.71 1
1081 42.156.254.92 1
1082 220.243.136.156 1
1083 180.163.220.61 1
1084 106.14.215.243 1
往往分析日志只需要得到訪問次數的前幾名,例如前20
名,pandas
同樣給出了很方便的iloc
通過切片實現這個需求,iloc[:20, :]
:取出前20
行,取出所有列,最終的處理代碼為
ip_count = pd.value_counts(df['ip']).reset_index().rename(columns={"index": "ip", "ip": "count"}).iloc[:20, :]
print(ip_count)
得到的數據結果為
ip count
0 89.163.242.228 316
1 207.180.220.114 312
2 78.46.90.53 302
3 144.76.38.10 301
4 78.46.61.245 301
5 144.76.29.148 301
6 204.12.208.154 301
7 148.251.92.39 301
8 5.9.70.72 286
9 223.71.139.28 218
10 95.216.19.59 209
11 221.13.12.147 131
12 117.15.90.21 130
13 175.184.166.181 129
14 148.251.49.107 128
15 171.37.204.72 127
16 124.95.168.140 118
17 171.34.178.76 98
18 60.216.138.190 97
19 141.8.142.158 87
同樣,可以把request
、ua
等進行相同的操作
2.4、第四步生成報告
利用xlwt
模塊將pandas分析得到的數據寫入到excel
表格中,寫入前需要將pandas處理后的數據轉化成普通的數據
ip_count_values = ip_count.values
request_count_values = request_count.values
ua_count_values = ua_count.values
這個數據類型是:數組對象numpy.ndarray
,形如:
[['89.163.242.228 ' 316]
['207.180.220.114 ' 312]
['78.46.90.53 ' 302]
['204.12.208.154 ' 301]
['144.76.29.148 ' 301]
['144.76.38.10 ' 301]
['78.46.61.245 ' 301]
['148.251.92.39 ' 301]
['5.9.70.72 ' 286]
['223.71.139.28 ' 218]
['95.216.19.59 ' 209]
['221.13.12.147 ' 131]
['117.15.90.21 ' 130]
['175.184.166.181 ' 129]
['148.251.49.107 ' 128]
['171.37.204.72 ' 127]
['124.95.168.140 ' 118]
['171.34.178.76 ' 98]
['60.216.138.190 ' 97]
['141.8.142.158 ' 87]]
通過xlwt
模塊寫入sheet
頁,每個sheet
頁中寫入對應處理的數據
# 寫入excel
wb = xlwt.Workbook() # 打開一個excel文檔
sheet = wb.add_sheet("ip訪問top20") # 新建一個sheet頁
# 寫入頭信息
row = 0
sheet.write(row, 0, "ip") # 寫入行,列,內容
sheet.write(row, 1, "count") # 寫入行,列,內容
row += 1 # 行號加一
for item in ip_count_values:
sheet.write(row, 0, item[0])
sheet.write(row, 1, item[1])
row += 1
2.5、第五步日志采集
日志分析完了,回過頭來需要的是采集到日志文件,並且定時的去進行分析,可以利用time
模塊得到時間並且判斷,實現定時的分析,例如,每月3號的凌晨1點進行日志分析
import time
if __name__ == '__main__':
while 1:
stime = datetime.datetime.now().strftime("%d:%H:%M:%S")
if stime == "03:01:00:00":
lst, error_lst = load_log("nginx_access.log")
analyse(lst)
time.sleep(1)
當然也可以通過服務器級別的定時任務功能定時的調用腳本分析
2.6、結果展示
按照前面的演進過程,最終的代碼如下:
import re
import datetime
import pandas as pd
import xlwt
obj = re.compile(
r'(?P<ip>.*?)- - \[(?P<time>.*?)\] "(?P<request>.*?)" (?P<status>.*?) (?P<bytes>.*?) "(?P<referer>.*?)" "(?P<ua>.*?)"')
def load_log(path):
lst = []
error_lst = []
i = 0
with open(path, mode="r", encoding="utf-8") as f:
for line in f:
line = line.strip()
dic = parse(line)
if dic: # 正確的數據添加到lst列表中
lst.append(dic)
else:
error_lst.append(line) # 臟數據添加到error_lst列表中
i += 1
return lst, error_lst
def parse(line):
# 解析單行nginx日志
dic = {}
try:
result = obj.match(line)
# ip處理
ip = result.group("ip")
if ip.strip() == '-' or ip.strip() == "": # 如果是匹配到沒有ip就把這條數據丟棄
return False
dic['ip'] = ip.split(",")[0] # 如果有兩個ip,取第一個ip
# 狀態碼處理
status = result.group("status") # 狀態碼
dic['status'] = status
# 時間處理
time = result.group("time") # 21/Dec/2019:21:45:31 +0800
time = time.replace(" +0800", "") # 替換+0800為空
t = datetime.datetime.strptime(time, "%d/%b/%Y:%H:%M:%S") # 將時間格式化成友好的格式
dic['time'] = t
# request處理
request = result.group(
"request") # GET /post/pou-xi-he-jie-jue-python-zhong-wang-luo-nian-bao-de-zheng-que-zi-shi/ HTTP/1.1
a = request.split()[1].split("?")[0] # 往往url后面會有一些參數,url和參數之間用?分隔,取出不帶參數的url
dic['request'] = a
# user_agent處理
ua = result.group("ua")
if "Windows NT" in ua:
u = "windows"
elif "iPad" in ua:
u = "ipad"
elif "Android" in ua:
u = "android"
elif "Macintosh" in ua:
u = "mac"
elif "iPhone" in ua:
u = "iphone"
else:
u = "其他設備"
dic['ua'] = u
# refer處理
referer = result.group("referer")
dic['referer'] = referer
return dic
except:
return False
def analyse(lst): # [{ip:xxx, api:xxx, status:xxxx, ua:xxx}]
df = pd.DataFrame(lst) # 轉換成表格
# print(df)
# print(df['ip']) # 只取出ip這一列
ip_count = pd.value_counts(df['ip']).reset_index().rename(columns={"index": "ip", "ip": "count"}).iloc[:20, :]
request_count = pd.value_counts(df['request']).reset_index().rename(columns={"index": "request", "request": "count"}).iloc[:20, :]
ua_count = pd.value_counts(df['ua']).reset_index().rename(columns={"index": "ua", "ua": "count"}).iloc[:, :]
# 從pandas轉化成我們普通的數據
ip_count_values = ip_count.values
request_count_values = request_count.values
ua_count_values = ua_count.values
# print(type(ip_count_values))
# 寫入excel
wb = xlwt.Workbook() # 打開一個excel文檔
sheet = wb.add_sheet("ip訪問top20") # 新建一個sheet頁
# 寫入頭信息
row = 0
sheet.write(row, 0, "ip") # 寫入行,列,內容
sheet.write(row, 1, "count") # 寫入行,列,內容
row += 1 # 行號加一
for item in ip_count_values:
sheet.write(row, 0, item[0])
sheet.write(row, 1, item[1])
row += 1
sheet = wb.add_sheet("request訪問top20") # 新建一個sheet頁
# 寫入頭信息
row = 0
sheet.write(row, 0, "request") # 寫入行,列,內容
sheet.write(row, 1, "count") # 寫入行,列,內容
row += 1 # 行號加一
for item in request_count_values:
sheet.write(row, 0, item[0])
sheet.write(row, 1, item[1])
row += 1
sheet = wb.add_sheet("ua訪問top") # 新建一個sheet頁
# 寫入頭信息
row = 0
sheet.write(row, 0, "ua") # 寫入行,列,內容
sheet.write(row, 1, "count") # 寫入行,列,內容
row += 1 # 行號加一
for item in ua_count_values:
sheet.write(row, 0, item[0])
sheet.write(row, 1, item[1])
row += 1
wb.save("abc.xls")
if __name__ == '__main__':
lst, error_lst = load_log("nginx_access.log")
analyse(lst)
生成的excel
報表內容如下
-
ip
排名
-
訪問地址排名
-
客戶端
ua
排名
2.7、可擴展方向
本文進行日志的分析算是入門之作,可以進一步擴展的方向比如:分析報表的定時消息郵件等推送,分析報表的圖形化展示等等