示例場景
日志說明
有兩台Web服務器,日志文件存放在/usr/local/nginx/logs/目錄,日志默認為nginx定義格式。如:
123.13.17.13 - - [25/Aug/2016:00:00:01 +0800] "GET /AppFiles/apk/studynet/icon_v120/apk_80111_1.jpg HTTP/1.1" 206 51934 "http://img.xxx.com:8080/AppFiles/apk/studynet/icon_v120/apk_80111_1.jpg" "Dalvik/1.6.0 (Linux; U; Android 4.4.2; S100 Build/KOT49H)" 120.210.166.150 - - [25/Aug/2016:00:00:01 +0800] "GET /AppFiles/apk/studynet/products/product_lc01.zip HTTP/1.1" 206 16631 "http://img.xxx.com:8080/AppFiles/apk/studynet/products/product_lc01.zip" "Dalvik/1.6.0 (Linux; U; Android 4.4.2; S908 Build/KVT49L)" 123.13.17.13 - - [25/Aug/2016:00:00:01 +0800] "GET /AppFiles/apk/studynet/icon_v120/apk_80111_0.jpg HTTP/1.1" 206 53119 "http://img.xxx.com:8080/AppFiles/apk/studynet/icon_v120/apk_80111_0.jpg" "Dalvik/1.6.0 (Linux; U; Android 4.4.2; S100 Build/KOT49H)" 219.137.119.16 - - [25/Aug/2016:00:00:01 +0800] "GET /AppFiles/apk/gamenet/icon/icon_0_506_0.jpg HTTP/1.1" 404 1035 "-" "Dalvik/v3.3.110_update3 (Linux; U; Android 2.2.1-R-20151127.1131; ET_35 Build/KTU84Q)" 120.210.166.150 - - [25/Aug/2016:00:00:01 +0800] "GET /AppFiles/apk/studynet/products/product_lc01.zip HTTP/1.1" 206 40719 "http://img.xxx.com:8080/AppFiles/apk/studynet/products/product_lc01.zip" "Dalvik/1.6.0 (Linux; U; Android 4.4.2; S908 Build/KVT49L)"
以空格分隔,共有12列數據:
1、客戶端IP 2、空白(遠程登錄名稱) 3、空白(認證的遠程用戶) 4、請求時間 5、時區(UTC) 6、請求方法 7、請求資源 8、http協議 9、狀態碼 10、發送字節數 11、訪問來源 12、客戶瀏覽信息(不具體拆分)
場景部署
在兩台Web服務器上部署HDFS客戶端,以便定期上傳Web日志到HDFS存儲平台,最終實現分布式計算。
上傳日志到HDFS存儲的腳本
【/root/hadooptest/hdfsput.py】
#!/usr/bin/env python # -*- encoding: utf-8 -*- import subprocess import sys import datetime webid = 'test1' #HDFS存儲日志標志,另一台Web服務器為:test2 currdate = datetime.datetime.now().strftime('%Y%m%d') logspath = '/usr/local/nginx/logs/access.log' #日志路徑 logname = 'access.log.'+webid try: #創建HDFS目錄,目錄格式:nginx/20160825,加wait()是為了讓父進程等待子進程完成后再繼續往下執行(subporcess默認啟動子進程后不等待其執行結果就繼續往下執行) subprocess.Popen(['/usr/local/hadoop-2.6.4/bin/hadoop','fs','-mkdir','-p','hdfs:///user/root/nginx'+currdate],stdout=subprocess.PIPE).wait() except Exception as e: pass putinfo = subprocess.Popen(['/usr/local/hadoop-2.6.4/bin/hadoop','fs','-put',logspath,'hdfs:///user/root/nginx/' +currdate +'/'+logname],stdout=subprocess.PIPE) #上傳本地日志到HDFS for line in putinfo.stdout: print line
添加定時功能到crontab
0 0 * * * /usr/bin/python /root/hadooptest/hdfsput.py >> /dev/null 2>&1
兩台Web服務器都上傳日志后,HDFS上信息如下:
[root@wx ~]# hadoop fs -ls /user/root/nginx/20160825 Found 2 items -rw-r--r-- 1 root supergroup 15 2016-08-25 15:58 /user/root/nginx/20160825/access.log.test1 -rw-r--r-- 1 root supergroup 28 2016-08-25 15:58 /user/root/nginx/20160825/access.log.test2
網站訪問流量統計
網站訪問流量作為衡量一個站點的價值、熱度的重要指標,另外CDN服務中流量會涉及計費,如何快速准確分析當前站點的流量數據至關重要。下面實現精確到分鍾統計網站訪問流量,原理是在mapper操作時將Web日志中小時的每分鍾作為key,將對應的行發送字節數作為value,在reducer操作時對相同key做累加(sum統計)。
【/root/hadooptest/httpflow.py】
#/usr/bin/env python # -*- coding:utf-8 -*- from mrjob.job import MRJob import re class MRCounter(MRJob): def mapper(self, key, line): i = 0 for flow in line.split(): #獲取時間段,為域日志的第4列,內容如:“[24/Aug/2016:00:00:02”
if i==3: timerow = flow.split(':')
hm = timerow[1] + ':' + timerow[2] #獲取'小時:分鍾',作為key if i==9 and re.match(r'\d{1,}',flow): #獲取日志第10列:發送的字節數,作為value yield hm,int(flow) #初始化key:value i+=1 def reducer(self, key, occurences): yield key,sum(occurences) #相同key“小時:分鍾”的value做累加操作 if __name__ == '__main__': MRCounter.run()
生成Hadoop任務,運行:
python /root/hadoop/httpflow.py -r hadoop -o hdfs://output/httpflow hdfs:///user/root/nginx
建議將分析的數據定期入庫MySQL,利用MySQL靈活豐富的SQL支持,可以很方便的對數據進行加工,輕松輸出比較美觀的數據報表。
網站HTTP狀態碼統計
統計一個網站的HTTP狀態碼比例數據,可以幫助我們了解網站的可用度及健康狀態,比如我們關注的200、404/5xx狀態等。在此示例中我們利用Mrjob的多步調用的形式來實現,除了基本的mapper、reducer方法外,還可以自定義處理方法,在steps中添加調用即可。
【/root/hadooptest/httpstatus.py】
#!/usr/bin/env python # -*- encoding: utf-8 -*- from mrjob.job import MRJob import re class MRCounter(MRJob): def mapper(self, key, line): i = 0 for httpcode in line.split(): if i == 8 and re.match(r'\d{1,3}',httpcode): #獲取日志中HTTP狀態碼段,作為key yield httpcode,1 #初始化key:value,value計數為1,方便reducer做累加 i+=1 def reducer(self, httpcode,occurrences): yield httpcode,sum(occurrences) #對排序后的key對應的value作sum累加 def steps(self): return [self.mr(mapper=self.mapper),self.mr(reducer=self.reducer)] #在steps方法中添加調用隊列 if __name__ == '__main__': MRCounter.run()
生成Hadoop任務,運行:
python httpstatus.py -r hadoop -o hdfs:///output/httpstatus hdfs:///user/nginx
分析結果:
[root@wx hadooptest]# hadoop fs -cat /output/httpstatus/part-00000 "200" 608997 "206" 2802574 "302" 1 "304" 34600 "400" 30 "401" 1 "404" 1653791 "416" 180358 "499" 2689
網站分鍾級請求數統計
一個網站請求量大小,直接關系到網站的訪問質量,非常有必要對改數據進行分析且關注。本示例以分鍾為單位對網站的訪問數進行統計。
【/root/hadooptest/http_minute_conn.py】
#!/usr/bin/env python # -*- encoding: utf-8 -*- from mrjob.job import MRJob import re class MRCounter(MRJob): def mapper(self, key, line): i = 0 for dt in line.split(): if i == 3: #獲取時間段,位於日志的第4列,內容如“[24/Aug/2016:00:00:02” timerow = dt.split(':') hm = timerow[1] + ':' + timerow[2] #獲取'小時:分鍾',作為key yield hm,1 #初始化key:value i+=1 def reducer(self, key,occurrences): yield key,sum(occurrences) #對排序后的key對應的value作sum累加 if __name__ == '__main__': MRCounter.run()
生成Hadoop任務,運行:
python http_minute_conn.py -r hadoop -o hdfs:///output/http_minute_conn hdfs:///user/nginx
網站訪問來源IP統計
統計用戶的訪問來源IP可以更好地了解網站的用戶分布,同時也可以幫助安全人員捕捉攻擊來源。實現原理是定義匹配IP正則字符串作為key,將value初始化為1,執行reducer操作時做累加(sum)統計
【/root/hadooptest/ipstat.py】
#!/usr/bin/env python # -*- encoding: utf-8 -*- from mrjob.job import MRJob import re IP_RE = re.compile(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}') #定義IP正則匹配 class MRCounter(MRJob): def mapper(self, key, line): for ip in IP_RE.findall(line): #匹配IP正則后生成key:value,其中key為IP地址,value初始值為1 yield ip,1 def reducer(self, ip,occurrences): yield ip,sum(occurrences) #對排序后的key對應的value作sum累加 if __name__ == '__main__': MRCounter.run()
生成Hadoop任務,運行:
python ipstat.py -r hadoop -o hdfs:///output/ipstat hdfs:///user/nginx
網站文件訪問統計
通過統計網站文件的訪問次數可以幫助運維人員了解訪問最集中的文件,以便進行有針對性的優化,比如調整靜態文件過期策略、優化動態cgi的執行速度、拆分業務邏輯等。實現原理是講訪問文件作為key,初始化value為1,執行reducer是做累加(sum)統計。
【/root/hadooptest/httpfile.py】
#!/usr/bin/env python # -*- encoding: utf-8 -*- from mrjob.job import MRJob import re IP_RE = re.compile(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}') #定義IP正則匹配 class MRCounter(MRJob): def mapper(self, key, line): i = 0 for url in line.split(): if i==6: #獲取日志中URL文件資源字段,作為key yield url,1 i+=1 def reducer(self, url,occurrences): yield url,sum(occurrences) #對排序后的key對應的value作sum累加 if __name__ == '__main__': MRCounter.run()
生成Hadoop任務,運行:
python httpfile.py -r hadoop -o hdfs:///output/httpfile hdfs:///user/nginx
同理,我們可以使用以上方法對User-Agent域進行分析,包括瀏覽器類型及版本、操作系統及版本、瀏覽器內核等信息,為更好地提升用戶體驗提供數據支持。
參考資料:
根據劉天斯《Python自動化運維技術與最佳實踐》整理