百度統計(tongji.baidu.com)是百度推出的一款免費的專業網站流量分析工具,能夠告訴用戶訪客是如何找到並瀏覽用戶的網站的,以及在網站上瀏覽了哪些頁面。這些信息可以幫助用戶改善訪客在其網站上的使用體驗,不斷提升網站的投資回報率。
百度統計提供了幾十種圖形化報告,包括:趨勢分析、來源分析、頁面分析、訪客分析、定制分析等多種統計分析服務。
這里我們參考百度統計的功能,基於 Spark Streaming 簡單實現一個分析系統,使之包括以下分析功能。
- 流量分析。一段時間內用戶網站的流量變化趨勢,針對不同的 IP 對用戶網站的流量進行細分。常見指標是總 PV 和各 IP 的PV。
- 來源分析。各種搜索引擎來源給用戶網站帶來的流量情況,需要精確到具體搜索引擎、具體關鍵詞。通過來源分析,用戶可以及時了解哪種類型的來源為其帶來了更多訪客。常見指標是搜索引擎、關鍵詞和終端類型的 PV 。
- 網站分析。各個頁面的訪問情況,包括及時了解哪些頁面最吸引訪客以及哪些頁面最容易導致訪客流失,從而幫助用戶更有針對性地改善網站質量。常見指標是各頁面的 PV 。
2.1 日志實時采集
Web log 一般在 HTTP 服務器收集,比如 Nginx access 日志文件。一個典型的方案是 Nginx 日志文件 + Flume + Kafka + Spark Streaming,如下所述:
- 接收服務器用 Nginx ,根據負載可以部署多台,數據落地至本地日志文件;
- 每個 Nginx 節點上部署 Flume ,使用 tail -f 實時讀取 Nginx 日志,發送至 KafKa 集群;
- 專用的 Kafka 集群用戶連接實時日志與 Spark 集群,詳細配置可以參考 http://spark.apache.org/docs/2.1.1/streaming-kafka-integration.html ;
- Spark Streaming 程序實時消費 Kafka 集群上的數據,實時分析,輸出;
- 結果寫入 MySQL 數據庫。
當然,還可以進一步優化,比如 CGI 程序直接發日志消息到 Kafka ,節省了寫訪問日志的磁盤開銷。這里主要專注 Spark Streaming 的應用,所以我們不做詳細論述
touch sample_web_log.py vim sample_web_log.py
當前編輯完成的代碼還不能直接通過 ./sample_web_log.py 的方式執行,我們需要為其增加可執行權限。
chmod +x sample_web_log.py
采樣生成web日志
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import random import time class WebLogGeneration(object): # 類屬性,由所有類的對象共享 site_url_base = "http://www.xxx.com/" # 基本構造函數 def __init__(self): # 前面7條是IE,所以大概瀏覽器類型70%為IE ,接入類型上,20%為移動設備,分別是7和8條,5% 為空 # https://github.com/mssola/user_agent/blob/master/all_test.go self.user_agent_dist = { 0.0: "Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)", 0.1: "Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)", 0.2: "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; .NET CLR 2.0.50727)", 0.3: "Mozilla/4.0 (compatible; MSIE6.0; Windows NT 5.0; .NET CLR 1.1.4322)", 0.4: "Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0) like Gecko", 0.5: "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:41.0) Gecko/20100101 Firefox/41.0", 0.6: "Mozilla/4.0 (compatible; MSIE6.0; Windows NT 5.0; .NET CLR 1.1.4322)", 0.7: "Mozilla/5.0 (iPhone; CPU iPhone OS 7_0_3 like Mac OS X) AppleWebKit/537.51.1 (KHTML, like Gecko) Version/7.0 Mobile/11B511 Safari/9537.53", 0.8: "Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19", 0.9: "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.85 Safari/537.36", 1: " ", } self.ip_slice_list = [10, 29, 30, 46, 55, 63, 72, 87, 98, 132, 156, 124, 167, 143, 187, 168, 190, 201, 202, 214, 215, 222] self.url_path_list = ["login.php", "view.php", "list.php", "upload.php", "admin/login.php", "edit.php", "index.html"] self.http_refer = ["http://www.baidu.com/s?wd={query}", "http://www.google.cn/search?q={query}", "http://www.sogou.com/web?query={query}", "http://one.cn.yahoo.com/s?p={query}", "http://cn.bing.com/search?q={query}"] self.search_keyword = ["spark", "hadoop", "hive", "spark mlib", "spark sql"] def sample_ip(self): slice = random.sample(self.ip_slice_list, 4) # 從ip_slice_list中隨機獲取4個元素,作為一個片斷返回 return ".".join([str(item) for item in slice]) # todo def sample_url(self): return random.sample(self.url_path_list, 1)[0] def sample_user_agent(self): dist_uppon = random.uniform(0, 1) return self.user_agent_dist[float('%0.1f' % dist_uppon)] # 主要搜索引擎referrer參數 def sample_refer(self): if random.uniform(0, 1) > 0.2: # 只有20% 流量有refer return "-" refer_str = random.sample(self.http_refer, 1) query_str = random.sample(self.search_keyword, 1) return refer_str[0].format(query=query_str[0]) def sample_one_log(self, count=3): time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) while count > 1: query_log = "{ip} - - [{local_time}] 'GET /{url} HTTP/1.1' 200 0 '{refer}' '{user_agent}' '-' ".format( ip=self.sample_ip(), local_time=time_str, url=self.sample_url(), refer=self.sample_refer(), user_agent=self.sample_user_agent()) print(query_log) count = count - 1 if __name__ == "__main__": web_log_gene = WebLogGeneration() web_log_gene.sample_one_log(random.uniform(10, 100))
生成樣例
72.46.143.190 - - [2019-03-14 00:36:13] "GET /index.html HTTP/1.1" 200 0 "-" "Mozilla/4.0 (compatible; MSIE6.0; Windows NT 5.0; .NET CLR 1.1.4322)" "-"
124.10.72.87 - - [2019-03-14 00:36:13] "GET /upload.php HTTP/1.1" 200 0 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:41.0) Gecko/20100101 Firefox/41.0" "-"
98.202.29.190 - - [2019-03-14 00:36:13] "GET /admin/login.php HTTP/1.1" 200 0 "-" "Mozilla/4.0 (compatible; MSIE6.0; Windows NT 5.0; .NET CLR 1.1.4322)" "-"
190.132.63.215 - - [2019-03-14 00:36:13] "GET /login.php HTTP/1.1" 200 0 "-" "Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)" "-"
156.10.222.190 - - [2019-03-14 00:36:13] "GET /login.php HTTP/1.1" 200 0 "-" "Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0) like Gecko" "-"
98.46.168.187 - - [2019-03-14 00:36:13] "GET /list.php HTTP/1.1" 200 0 "-" "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; .NET CLR 2.0.50727)" "-"
腳本來調用上面的腳本以隨機生成日志,上傳至 HDFS ,然后移動到目標目錄:
#!/bin/bash # HDFS命令 HDFS="/usr/local/myhadoop/hadoop-2.7.3/bin/hadoop fs" # Streaming程序監聽的目錄,注意跟后面Streaming程序的配置要保持一致 streaming_dir=”/spark/streaming” # 清空舊數據 $HDFS -rm "${streaming_dir}"'/tmp/*' > /dev/null 2>&1 $HDFS -rm "${streaming_dir}"'/*' > /dev/null 2>&1 # 一直運行 while [ 1 ]; do ./sample_web_log.py > test.log # 給日志文件加上時間戳,避免重名 tmplog="access.`date +'%s'`.log" # 先放在臨時目錄,再move至Streaming程序監控的目錄下,確保原子性 # 臨時目錄用的是監控目錄的子目錄,因為子目錄不會被監控 $HDFS -put test.log ${streaming_dir}/tmp/$tmplog $HDFS -mv ${streaming_dir}/tmp/$tmplog ${streaming_dir}/ echo "`date +"%F %T"` put $tmplog to HDFS succeed" sleep 1 done
日志文件生成腳本
touch genLog.sh vim genLog.sh
同時需要修改該腳本文件的執行權限。
chmod +x genLog.sh
稍有不同的是,生成的日志文件只是被保存在本地目錄中,而不是 HDFS
#!/bin/bash while [ 1 ]; do ./sample_web_log.py > test.log tmplog="access.`date +'%s'`.log" cp test.log streaming/tmp/$tmplog mv streaming/tmp/$tmplog streaming/ echo "`date +"%F %T"` generating $tmplog succeed" sleep 1 done
Spark Streaming 程序代碼如下所示,可以在 bin/spark-shell 交互式環境下運行,如果要以 Spark 程序的方式運行,按注釋中的說明調整一下 StreamingContext 的生成方式即可。啟動 bin/spark-shell 時,為了避免因 DEBUG 日志信息太多而影響觀察輸出,可以將 DEBUG 日志重定向至文件,屏幕上只顯示主要輸出,方法是 ./bin/spark-shell 2>spark-shell-debug.log:
// 導入類 import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} // 設計計算的周期,單位秒 val batch = 10 /* * 這是bin/spark-shell交互式模式下創建StreamingContext的方法 * 非交互式請使用下面的方法來創建 */ val ssc = new StreamingContext(sc, Seconds(batch)) /* // 非交互式下創建StreamingContext的方法 val conf = new SparkConf().setAppName("NginxAnay") val ssc = new StreamingContext(conf, Seconds(batch)) */ /* * 創建輸入DStream,是文本文件目錄類型 * 本地模式下也可以使用本地文件系統的目錄,比如 file:///home/spark/streaming */ val lines = ssc.textFileStream("hdfs:///spark/streaming") /* * 下面是統計各項指標,調試時可以只進行部分統計,方便觀察結果 */ // 1. 總PV lines.count().print() // 2. 各IP的PV,按PV倒序 // 空格分隔的第一個字段就是IP lines.map(line => {(line.split(" ")(0), 1)}).reduceByKey(_ + _).transform(rdd => { rdd.map(ip_pv => (ip_pv._2, ip_pv._1)). sortByKey(false). map(ip_pv => (ip_pv._2, ip_pv._1)) }).print() // 3. 搜索引擎PV val refer = lines.map(_.split("\"")(3)) // 先輸出搜索引擎和查詢關鍵詞,避免統計搜索關鍵詞時重復計算 // 輸出(host, query_keys) val searchEnginInfo = refer.map(r => { val f = r.split('/') val searchEngines = Map( "www.google.cn" -> "q", "www.yahoo.com" -> "p", "cn.bing.com" -> "q", "www.baidu.com" -> "wd", "www.sogou.com" -> "query" ) if (f.length > 2) { val host = f(2) if (searchEngines.contains(host)) { val query = r.split('?')(1) if (query.length > 0) { val arr_search_q = query.split('&').filter(_.indexOf(searchEngines(host)+"=") == 0) if (arr_search_q.length > 0) (host, arr_search_q(0).split('=')(1)) else (host, "") } else { (host, "") } } else ("", "") } else ("", "") }) // 輸出搜索引擎PV searchEnginInfo.filter(_._1.length > 0).map(p => {(p._1, 1)}).reduceByKey(_ + _).print() // 4. 關鍵詞PV searchEnginInfo.filter(_._2.length > 0).map(p => {(p._2, 1)}).reduceByKey(_ + _).print() // 5. 終端類型PV lines.map(_.split("\"")(5)).map(agent => { val types = Seq("iPhone", "Android") var r = "Default" for (t <- types) { if (agent.indexOf(t) != -1) r = t } (r, 1) }).reduceByKey(_ + _).print() // 6. 各頁面PV lines.map(line => {(line.split("\"")(1).split(" ")(1), 1)}).reduceByKey(_ + _).print() // 啟動計算,等待執行結束(出錯或Ctrl-C退出) ssc.start() ssc.awaitTermination()
Python版
from pyspark import SparkContext from pyspark.streaming import StreamingContext # 設計計算的周期,單位秒 batch = 10 # Create a local StreamingContext with two working thread and batch interval of 1 second sc = SparkContext("local[2]", "NetworkWordCount") ssc = StreamingContext(sc, batch) # 創建輸入DStream,是文本文件目錄類型 lines = ssc.textFileStream("file:///home/ztf/Desktop/workSpace/streaming") # 1. 總PV lines.count().pprint() # 2. 各IP的PV,按PV倒序 # 空格分隔的第一個字段就是IP pairs = lines.map(lambda line: (line.split(" ")[0], 1)) ipCounts = pairs.reduceByKey(lambda x, y: x + y) ipCounts.transform( lambda rdd: rdd.map(lambda ip_pv: (ip_pv[1], ip_pv[0])).sortByKey(False).map(lambda ip_pv: (ip_pv[1], ip_pv[0])) ).pprint() # 3. 搜索引擎PV refer = lines.map(lambda line: line.split("\"")[3]) # 先輸出搜索引擎和查詢關鍵詞,避免統計搜索關鍵詞時重復計算 # 輸出(host, query_keys) def engine_keyword(rdd): f = rdd.split('/') search_engines = { "www.google.cn": "q", "www.yahoo.com": "p", "cn.bing.com": "q", "www.baidu.com": "wd", "www.sogou.com": "query" } if len(f) <= 2: return "", "" host = f[2] if host not in search_engines: return "", "" query = rdd.split("?")[1] if len(query) <= 0: return host, "" arr_search_q = [x for x in query.split('&') if (x.index(search_engines[host] + "=") == 0)] if len(arr_search_q) <= 0: return host, "" return host, arr_search_q[0].split('=')[1] searchEnginInfo = refer.map(engine_keyword) # 輸出搜索引擎PV searchEnginInfo.filter(lambda _: len(_[0]) > 0).map(lambda _: (_[0], 1)).reduceByKey(lambda x, y: x + y).pprint() # 4. 關鍵詞PV searchEnginInfo.filter(lambda _: len(_[1]) > 0).map(lambda _: (_[1], 1)).reduceByKey(lambda x, y: x + y).pprint() # 5. 終端類型PV def agent_function(agent): types = ["iPhone", "Android"] r = "Default" for t in types: if t in agent: r = t return r, 1 lines.map(lambda _: _.split("\"")[5]).map(agent_function).reduceByKey(lambda x, y: x + y).pprint() # 6. 各頁面PV lines.map(lambda line: (line.split("\"")[1].split(" ")[1], 1)).reduceByKey(lambda x, y: x + y).pprint() ssc.start() # Start the computation ssc.awaitTermination() # Wait for the computation to terminate
打開兩個終端,一個調用上面的 bash 腳本模擬提交日志,一個在交互式環境下運行上面的 Streaming 程序。你可以看到各項指標的輸出,比如某個批次下的輸出為(依次對應上面的 6 個計算項):
總PV
-------------------------------------------
Time: 1448533850000 ms
-------------------------------------------
44374
各IP的PV,按PV倒序
-------------------------------------------
Time: 1448533850000 ms
-------------------------------------------
(72.63.87.30,30)
(63.72.46.55,30)
(98.30.63.10,29)
(72.55.63.46,29)
(63.29.10.30,29)
(29.30.63.46,29)
(55.10.98.87,27)
(46.29.98.30,27)
(72.46.63.30,27)
(87.29.55.10,26)
搜索引擎PV
-------------------------------------------
Time: 1448533850000 ms
-------------------------------------------
(cn.bing.com,1745)
(www.baidu.com,1773)
(www.google.cn,1793)
(www.sogou.com,1845)
關鍵詞PV
-------------------------------------------
Time: 1448533850000 ms
-------------------------------------------
(spark,1426)
(hadoop,1455)
(spark sql,1429)
(spark mlib,1426)
(hive,1420)
終端類型PV
-------------------------------------------
Time: 1448533850000 ms
-------------------------------------------
(Android,4281)
(Default,35745)
(iPhone,4348)
各頁面PV
-------------------------------------------
Time: 1448533850000 ms
-------------------------------------------
(/edit.php,6435)
(/admin/login.php,6271)
(/login.php,6320)
(/upload.php,6278)
(/list.php,6411)
(/index.html,6309)
(/view.php,6350)