1、點擊流數據模型
1.1、點擊流概念
點擊流(Click Stream)是指用戶在網站上持續訪問的軌跡。這個概念更注重用戶瀏覽網站的整個流程。用戶對網站的每次訪問包含了一系列的點擊動作行為,這些點擊行為數據就構成了點擊流數據(Click Stream Data),它代表了用戶瀏覽網站的整個流程。
點擊流和網站日志是兩個不同的概念。
點擊流是從用戶的角度出發,注重用戶瀏覽網站的整個流程;
網站日志是面向整個站點,它包含了用戶行為數據、服務器響應數據等眾多日志信息,我們通過對網站日志的分析可以獲得用戶的點擊流數據。
網站是由多個網頁(Page)構成,當用戶在訪問多個網頁時,網頁與網頁之間是靠Referrers參數來標識上級網頁來源。由此,可以確定網頁被依次訪問的順序,當然也可以通過時間來標識訪問的次序。其次,用戶對網站的每次訪問,可視作是一次會話(Session),在網站日志中將會用不同的Sessionid來唯一標識每次會話。如果把 Page 視為“點”的話,那么我們可以很容易的把 Session 描繪成一條“線”,也就是用戶的點擊流數據軌跡曲線。
圖:點擊流概念模型
.2、點擊流模型生成
點擊流數據在具體操作上是由散點狀的點擊日志數據梳理所得。點擊數據在數據建模時存在兩張模型表Pageviews和visits,例如:
頁面點擊流模型 Pageviews 表
Session |
時間 |
|
訪問頁面 URL |
停留時長 |
第幾步 |
S001 |
2012-01-01 12: |
31:12 |
/a/.... |
30 |
1 |
S002 |
2012-01-01 12: |
31:16 |
/a/.... |
10 |
1 |
S002 |
2012-01-01 12: |
31:26 |
/b/.... |
10 |
2 |
S002 |
2012-01-01 12: |
31:36 |
/e/.... |
30 |
3 |
S003 |
2012-01-01 15: |
35:06 |
/a/.... |
30 |
1 |
點擊流模型 Visits 表(按 session 聚集的頁面訪問信息)
Session |
起始時間 |
結束時間 |
進 入頁面 |
離 開頁面 |
訪問頁面數 |
IP |
referal |
S001 |
2012-01-01 12:1:12 |
2012-01-01 12:1:12 |
/a/... |
/a/... |
1 |
101.0.0.1 |
somesite.com |
S002 |
2012-01-01 12:31:16 |
2012-01-01 12:35:06 |
/a/... |
/e/... |
3 |
201.0.0.2 |
- |
S003 |
2012-01-01 12:35:42 |
2012-01-01 12:35:42 |
/c/... |
/c/... |
1 |
234.0.0.3 |
baidu.com |
S004 |
2012-01-01 15:16:39 |
2012-01-01 15:19:23 |
/c/... |
/e/... |
3 |
101.0.0.1 |
google.com |
…… |
…… |
…… |
…… |
…… |
…… |
…… |
…… |
2、如何進行網站流量分析
流量分析整體來說是一個內涵非常豐富的體系,整體過程是一個金字塔結構:
金字塔的頂部是網站的目標:投資回報率(ROI)。
2.1、網站流量分析模型舉例
2.1.1、網站流量質量分析(流量分析)
流量對於每個網站來說都是很重要,但流量並不是越多越好,應該更加看重流量的質量,換句話來說就是流量可以為我們帶來多少收入。
X 軸代表量,指網站獲得的訪問量。Y 軸代表質,指可以促進網站目標的事件次數(比如商品瀏覽、注冊、購買等行為)。圓圈大小表示獲得流量的成本。
BD 流量是指商務拓展流量。一般指的是互聯網經過運營或者競價排名等方式,從外部拉來的流量。比如電商網站在百度上花錢來競價排名,產生的流量就是 BD 流量的一部分。
2.1.2、網站流量多維度細分(流量分析)
細分是指通過不同維度對指標進行分割,查看同一個指標在不同維度下的表現,進而找出有問題的那部分指標,對這部分指標進行優化。
2.1.3、網站內容及導航分析(內容分析)
對於所有網站來說,頁面都可以被划分為三個類別:導航頁、功能頁、內容頁
導航頁的目的是引導訪問者找到信息,功能頁的目的是幫助訪問者完成特定任務,內容頁的目的是向訪問者展示信息並幫助訪問者進行決策。
首頁和列表頁都是典型的導航頁,站內搜索頁面、注冊表單頁面和購物車頁面都是典型的功能頁,而產品詳情頁、新聞和文章頁都是典型的內容頁。
比如從內容導航分析中,以下兩類行為就是網站運營者不希望看到的行為:
第一個問題:訪問者從導航頁(首頁)還沒有看到內容頁面之前就從導航頁離開網站,需要分析導航頁造成訪問者中途離開的原因。
第二個問題:訪問者從導航頁進入內容頁后,又返回到導航頁,說明需要分
析內容頁的最初設計,並考慮中內容頁提供交叉的信息推薦。
2.1.4、網站轉化以及漏斗分析(轉化分析)
所謂轉化,即網站業務流程中的一個封閉渠道,引導用戶按照流程最終實現業務目標(比如商品成交);而漏斗模型則是指進入渠道的用戶在各環節遞進過程中逐漸流失的形象描述;
對於轉化渠道,主要進行兩部分的分析:
訪問者的流失和迷失
阻力的流失
造成流失的原因很多,如:不恰當的商品或活動推薦對支付環節中專業名詞的解釋、幫助信息等內容不當
迷失
造成迷失的主要原因是轉化流量設計不合理,訪問者在特定階段得不到需要的信息,並且不能根據現有的信息作出決策,比如在線購買演唱會門票,直到支付也沒看到在線選座的提示,這時候就很可能會產生迷失,返回查看。
總之,網站數據分析是一門內容非常豐富的學科,本課程中主要關注網站流量分析過程中的技術運用,更多關於網站數據分析的業務知識可學習文檔首頁推薦的資料。
2.2、流量分析常見分類
指標是網站分析的基礎,用來記錄和衡量訪問者在網站自的各種行為。比如我們經常說的流量就是一個網站指標,它是用來衡量網站獲得的訪問量。在進行流量分析之前,我們先來了解一些常見的指標。
2.2.1、骨灰級指標
IP:1 天之內,訪問網站的不重復 IP 數。一天內相同 IP 地址多次訪問網站只被計算 1 次。曾經 IP 指標可以用來表示用戶訪問身份,目前則更多的用來獲取訪問者的地理位置信息。
PageView 瀏覽量: 即通常說的 PV 值,用戶每打開 1 個網站頁面,記錄 1 個
PV。用戶多次打開同一頁面 PV 累計多次。通俗解釋就是頁面被加載的總次數。
Unique PageView: 1 天之內,訪問網站的不重復用戶數(以瀏覽器 cookie 為依據),一天內同一訪客多次訪問網站只被計算 1 次。
2.2.2、基礎級指標
訪問次數:訪客從進入網站到離開網站的一系列活動記為一次訪問,也稱會話(session),1 次訪問(會話)可能包含多個 PV。
網站停留時間:訪問者在網站上花費的時間。
頁面停留時間:訪問者在某個特定頁面或某組網頁上所花費的時間。
2.2.3、復合級指標
人均瀏覽頁數:平均每個獨立訪客產生的 PV。人均瀏覽頁數=瀏覽次數/獨立訪客。體現網站對訪客的吸引程度。
跳出率:指某一范圍內單頁訪問次數或訪問者與總訪問次數的百分比。其中跳出指單頁訪問或訪問者的次數,即在一次訪問中訪問者進入網站后只訪問了一個頁面就離開的數量。
退出率:指某一范圍內退出的訪問者與綜合訪問量的百分比。其中退出指訪問者離開網站的次數,通常是基於某個范圍的。
有了上述這些指標之后,就能結合業務進行各種不同角度的分類分析,主要是以下幾大方面:
2.2.4、基礎分析(PV,IP,UV)
趨勢分析:根據選定的時段,提供網站流量數據,通過流量趨勢變化形態,分析網站訪客的訪問規律、網站發展狀況提供參考。
對比分析:根據選定的兩個對比時段,提供網站流量在時間上的縱向對比報表,幫您發現網站發展狀況、發展規律、流量變化率等。
當前在線:提供當前時刻站點上的訪客量,以及最近 15 分鍾流量、來源、受訪、訪客變化情況等,方便用戶及時了解當前網站流量狀況。
訪問明細:提供最近 7 日的訪客訪問記錄,可按每個 PV 或每次訪問行為(訪客的每次會話)顯示,並可按照來源、搜索詞等條件進行篩選。 通過訪問明細,用戶可以詳細了解網站流量的累計過程,從而為用戶快速找出流量變動原因提供最原始、最准確的依據。
2.2.5、來源分析
來源分類:提供不同來源形式(直接輸入、搜索引擎、其他外部鏈接、站內來源)、不同來源項引入流量的比例情況。通過精確的量化數據,幫助用戶分析什么類型的來路產生的流量多、效果好,進而合理優化推廣方案。
搜索引擎:提供各搜索引擎以及搜索引擎子產品引入流量的比例情況。
搜索詞:提供訪客通過搜索引擎進入網站所使用的搜索詞,以及各搜索詞引入流量的特征和分布。幫助用戶了解各搜索詞引入流量的質量,進而了解訪客的興趣關注點、網站與訪客興趣點的匹配度,為優化 SEO(搜索引擎優化)方案及 SEM(搜索引擎營銷)提詞方案提供詳細依據。
最近 7 日的訪客搜索記錄:可按每個 PV 或每次訪問行為(訪客的每次會話)顯示,並可按照訪客類型、地區等條件進行篩選。為您搜索引擎優化提供最詳細的原始數據。
來路域名:提供具體來路域名引入流量的分布情況,並可按“社會化媒體”、“搜索引擎”、“郵箱”等網站類型對來源域名進行分類。 幫助用戶了解哪類推廣渠道產生的流量多、效果好,進而合理優化網站推廣方案。
來路頁面:提供具體來路頁面引入流量的分布情況。 尤其對於通過流量置換、包廣告位等方式從其他網站引入流量的用戶,該功能可以方便、清晰地展現廣告引入的流量及效果,為優化推廣方案提供依據。
來源升降榜:提供開通統計后任意兩日的 TOP10000 搜索詞、來路域名引入流量的對比情況,並按照變化的劇烈程度提供排行榜。 用戶可通過此功能快速找到哪些來路對網站流量的影響比較大,從而及時排查相應來路問題。
2.2.6、受訪分析
受訪域名:提供訪客對網站中各個域名的訪問情況。 一般情況下,網站不同域名提供的產品、內容各有差異,通過此功能用戶可以了解不同內容的受歡迎程度以及網站運營成效。
受訪頁面:提供訪客對網站中各個頁面的訪問情況。 站內入口頁面為訪客進入網站時瀏覽的第一個頁面,如果入口頁面的跳出率較高則需要關注並優化;站內出口頁面為訪客訪問網站的最后一個頁面,對於離開率較高的頁面需要關注並優化。
受訪升降榜:提供開通統計后任意兩日的 TOP10000 受訪頁面的瀏覽情況對比,並按照變化的劇烈程度提供排行榜。 可通過此功能驗證經過改版的頁面是否有流量提升或哪些頁面有巨大流量波動,從而及時排查相應問題。
熱點圖:記錄訪客在頁面上的鼠標點擊行為,通過顏色區分不同區域的點擊熱度;支持將一組頁面設置為"關注范圍",並可按來路細分點擊熱度。 通過訪客在頁面上的點擊量統計,可以了解頁面設計是否合理、廣告位的安排能否獲取更多佣金等。
用戶視點:提供受訪頁面對頁面上鏈接的其他站內頁面的輸出流量,並通過輸出流量的高低繪制熱度圖,與熱點圖不同的是,所有記錄都是實際打開了下一頁面產生了瀏覽次數(PV)的數據,而不僅僅是擁有鼠標點擊行為。
訪問軌跡:提供觀察焦點頁面的上下游頁面,了解訪客從哪些途徑進入頁面,又流向了哪里。 通過上游頁面列表比較出不同流量引入渠道的效果;通過下游頁面列表了解用戶的瀏覽習慣,哪些頁面元素、內容更吸引訪客點擊。
2.2.7、訪客分析
地區運營商:提供各地區訪客、各網絡運營商訪客的訪問情況分布。 地方網站、下載站等與地域性、網絡鏈路等結合較為緊密的網站,可以參考此功能數據,合理優化推廣運營方案。
終端詳情:提供網站訪客所使用的瀏覽終端的配置情況。 參考此數據進行網頁設計、開發,可更好地提高網站兼容性,以達到良好的用戶交互體驗。
新老訪客:當日訪客中,歷史上第一次訪問該網站的訪客記為當日新訪客;歷史上已經訪問過該網站的訪客記為老訪客。 新訪客與老訪客進入網站的途徑和瀏覽行為往往存在差異。該功能可以輔助分析不同訪客的行為習慣,針對不同訪客優化網站,例如為制作新手導航提供數據支持等。
忠誠度:從訪客一天內回訪網站的次數(日訪問頻度)與訪客上次訪問網站的時間兩個角度,分析訪客對網站的訪問粘性、忠誠度、吸引程度。 由於提升網站內容的更新頻率、增強用戶體驗與用戶價值可以有更高的忠誠度,因此該功能在網站內容更新及用戶體驗方面提供了重要參考。
活躍度:從訪客單次訪問瀏覽網站的時間與網頁數兩個角度,分析訪客在網站上的活躍程度。 由於提升網站內容的質量與數量可以獲得更高的活躍度,因此該功能是網站內容分析的關鍵指標之一。
2.2.8、轉化路徑分析
轉化定義:
訪客在您的網站完成了某項您期望的活動,記為一次轉化,如注冊、下載、購買。
目標示例:
·獲得用戶目標:在線注冊、創建賬號等。
·咨詢目標:咨詢、留言、電話等。
·互動目標:視頻播放、加入購物車、分享等。
·收入目標:在線訂單、付款等。
路徑分析:
根據設置的特定路線,監測某一流程的完成轉化情況,算出每步的轉換率和流失率數據,
如注冊流程,購買流程等。
轉化類型:
l 頁面
l 事件
三、 整體技術流程及架構
1、數據處理流程
網站流量日志數據分析是一個純粹的數據分析項目,其整體流程基本上就是
依據數據的處理流程進行。有以下幾個大的步驟:
數據采集
數據采集概念,目前行業會有兩種解釋:一是數據從無到有的過程(web服務器打印的日志、自定義采集的日志等)叫做數據采集;另一方面也有把通過使用Flume等工具把數據采集到指定位置的這個過程叫做數據采集。
關於具體含義要結合語境具體分析,明白語境中具體含義即可。
數據預處理
通過mapreduce程序對采集到的原始日志數據進行預處理,比如清洗,格式
整理,濾除臟數據等,並且梳理成點擊流模型數據。
數據入庫
將預處理之后的數據導入到HIVE倉庫中相應的庫和表中。
數據分析
項目的核心內容,即根據需求開發ETL分析語句,得出各種統計結果。
數據展現
將分析所得數據進行數據可視化,一般通過圖表進行展示。
2、系統的架構
相對於傳統的BI數據處理,流程幾乎差不多,但是因為是處理大數據,所以流程中各環節所使用的技術則跟傳統BI完全不同:
數據采集:定制開發采集程序,或使用開源框架Flume
數據預處理:定制開發mapreduce程序運行於hadoop集群數據倉庫技術:基於hadoop之上的Hive
數據導出:基於hadoop的sqoop數據導入導出工具數據可視化:定制開發web程序(echarts)
整個過程的流程調度:hadoop生態圈中的azkaban工具
其中,需要強調的是:系統的數據分析不是一次性的,而是按照一定的時間頻率反復計算,因而整個處理鏈條中的各個環節需要按照一定的先后依賴關系緊密銜接,即涉及到大量任務單元的管理調度,所以,項目中需要添加一個任務調度模塊。
3、數據展現
數據展現的目的是將分析所得的數據進行可視化,以便運營決策人員能更方便地獲取數據,更快更簡單地理解數據。
市面上有許多開源的數據可視化軟件、工具。比如Echarts.
四、 模塊開發----數據采集
1、需求
在網站web流量日志分析這種場景中,對數據采集部分的可靠性、容錯能力要求通常不會非常嚴苛,因此使用通用的 flume 日志采集框架完全可以滿足需
求。
2、Flume 日志采集系統
2.1、Flume 采集
Flume 采集系統的搭建相對簡單:
1、在服務器上部署 agent 節點,修改配置文件
2、啟動 agent 節點,將采集到的數據匯聚到指定的 HDFS 目錄中
3、針對nginx日志生成場景,如果通過flume(1.6)收集,無論是Spooling Directory Source和Exec Source均不能滿足動態實時收集的需求,在當前flume1.7穩定版本中,提供了一個非常好用的TaildirSource,使用這個source,可以監控一個目錄,並且使用正則表達式匹配該目錄中的文件名進行實時收集。
核心配置如下:
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources = r1 a1.sources.r1.type = TAILDIR a1.sources.r1.channels = c1 a1.sources.r1.positionFile = /root/logs/taildir_position.json a1.sources.r1.filegroups = f1 f2 a1.sources.r1.filegroups.f1 = /root/logs/example.log a1.sources.r1.filegroups.f2 = /root/logs/toupload/.*log.* # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/ a1.sinks.k1.hdfs.filePrefix = events- a1.sinks.k1.hdfs.round = true a1.sinks.k1.hdfs.roundValue = 10 a1.sinks.k1.hdfs.roundUnit = minute a1.sinks.k1.hdfs.rollInterval = 3 a1.sinks.k1.hdfs.rollSize = 20 a1.sinks.k1.hdfs.rollCount = 5 a1.sinks.k1.hdfs.batchSize = 1 a1.sinks.k1.hdfs.useLocalTimeStamp = true #生成的文件類型,默認是 Sequencefile,可用 DataStream,則為普通文本 a1.sinks.k1.hdfs.fileType = DataStream # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 制作log命令: ## while true; do echo example... >> /root/logs/example.log; echo access... >> /root/logs/toupload/access.log.1;sleep 0.3;done 啟動命令: bin/flume-ng agent -c conf/ -f conf/kkkk.conf -n a1 -Dflume.root.logger=INFO,console
filegroups:指定filegroups,可以有多個,以空格分隔;(TailSource可以同時監控 tail多個目錄中的文件)
positionFile:配置檢查點文件的路徑,檢查點文件會以json格式保存已經tail文件的位置,解決了斷點不能續傳的缺陷。
filegroups.<filegroupName>:配置每個filegroup的文件絕對路徑,文件名可以用正則表達式匹配。
通過以上配置,就可以監控文件內容的增加和文件的增加。產生和所配置的文件名正則表達式不匹配的文件,則不會被tail。
2.2、數據內容樣例
58.215.204.118 - - [18/Sep/2013:06:51:35 +0000] "GET /wp-includes/js/jquery/jquery.js?ver=1.10.2 HTTP/1.1" 304 0 "http://blog.fens.me/nodejs-socketio-chat/" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0"
字段解析:
訪客ip地址: 58.215.204.118
訪客用戶信息: - -
請求時間:[18/Sep/2013:06:51:35 +0000]
請求方式:GET
請求的url:/wp-includes/js/jquery/jquery.js?ver=1.10.2
請求所用協議:HTTP/1.1
響應碼:304
返回的數據流量:0
訪客的來源url:http://blog.fens.me/nodejs-socketio-chat/
訪客所用瀏覽器:Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101
Firefox/23.0
五、 模塊開發----數據預處理
1、主要目的
過濾“不合規”數據,清洗無意義的數據格式轉換和規整根據后續的統計需求,過濾分離出各種不同主題(不同欄目 path)的基礎數據。
2、實現方式
import java.io.IOException; import java.util.HashSet; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 處理原始日志,過濾出真實pv請求 轉換時間格式 對缺失字段填充默認值 對記錄標記valid和invalid * */ public class WeblogPreProcess { static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> { // 用來存儲網站url分類數據 Set<String> pages = new HashSet<String>(); Text k = new Text(); NullWritable v = NullWritable.get(); /** * 從外部配置文件中加載網站的有用url分類數據 存儲到maptask的內存中,用來對日志數據進行過濾 */ @Override protected void setup(Context context) throws IOException, InterruptedException { pages.add("/about"); pages.add("/black-ip-list/"); pages.add("/cassandra-clustor/"); pages.add("/finance-rhive-repurchase/"); pages.add("/hadoop-family-roadmap/"); pages.add("/hadoop-hive-intro/"); pages.add("/hadoop-zookeeper-intro/"); pages.add("/hadoop-mahout-roadmap/"); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); WebLogBean webLogBean = WebLogParser.parser(line); if (webLogBean != null) { // 過濾 WebLogParser.filtStaticResource(webLogBean, pages); /* if (!webLogBean.isValid()) return; */ k.set(webLogBean.toString()); context.write(k, v); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(WeblogPreProcess.class); job.setMapperClass(WeblogPreProcessMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // FileInputFormat.setInputPaths(job, new Path(args[0])); // FileOutputFormat.setOutputPath(job, new Path(args[1])); FileInputFormat.setInputPaths(job, new Path("d:/weblog/input")); FileOutputFormat.setOutputPath(job, new Path("d:/weblog/output")); job.setNumReduceTasks(0); boolean res = job.waitForCompletion(true); System.exit(res?0:1); } }
import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Locale; import java.util.Set; public class WebLogParser { //194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] "GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)" public static WebLogBean parser(String line) { WebLogBean webLogBean = new WebLogBean(); String[] arr = line.split(" "); if (arr.length > 11) { webLogBean.setRemote_addr(arr[0]); webLogBean.setRemote_user(arr[1]); String time_local = formatDate(arr[3].substring(1)); if(null==time_local || "".equals(time_local)) time_local="-invalid_time-"; webLogBean.setTime_local(time_local); webLogBean.setRequest(arr[6]); webLogBean.setStatus(arr[8]); webLogBean.setBody_bytes_sent(arr[9]); webLogBean.setHttp_referer(arr[10]); //如果useragent元素較多,拼接useragent if (arr.length > 12) { StringBuilder sb = new StringBuilder(); for(int i=11;i<arr.length;i++){ sb.append(arr[i]); } webLogBean.setHttp_user_agent(sb.toString()); } else { webLogBean.setHttp_user_agent(arr[11]); } if (Integer.parseInt(webLogBean.getStatus()) >= 400) {// 大於400,HTTP錯誤 webLogBean.setValid(false); } if("-invalid_time-".equals(webLogBean.getTime_local())){ webLogBean.setValid(false); } } else { webLogBean=null; } return webLogBean; } public static void filtStaticResource(WebLogBean bean, Set<String> pages) { if (!pages.contains(bean.getRequest())) { bean.setValid(false); } } //格式化時間方法 public static String formatDate(String time_local) { // 18/Sep/2013:06:49:18 SimpleDateFormat df1 = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US); SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US); try { return df2.format(df1.parse(time_local));//dfs2=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US); } catch (ParseException e) { return null; } } }
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class WebLogBean implements Writable { private boolean valid = true;// 判斷數據是否合法 private String remote_addr;// 記錄客戶端的ip地址 private String remote_user;// 記錄客戶端用戶名稱,忽略屬性"-" private String time_local;// 記錄訪問時間與時區 private String request;// 記錄請求的url與http協議 private String status;// 記錄請求狀態;成功是200 private String body_bytes_sent;// 記錄發送給客戶端文件主體內容大小 private String http_referer;// 用來記錄從那個頁面鏈接訪問過來的 private String http_user_agent;// 記錄客戶瀏覽器的相關信息 public void set(boolean valid,String remote_addr, String remote_user, String time_local, String request, String status, String body_bytes_sent, String http_referer, String http_user_agent) { this.valid = valid; this.remote_addr = remote_addr; this.remote_user = remote_user; this.time_local = time_local; this.request = request; this.status = status; this.body_bytes_sent = body_bytes_sent; this.http_referer = http_referer; this.http_user_agent = http_user_agent; } public String getRemote_addr() { return remote_addr; } public void setRemote_addr(String remote_addr) { this.remote_addr = remote_addr; } public String getRemote_user() { return remote_user; } public void setRemote_user(String remote_user) { this.remote_user = remote_user; } public String getTime_local() { return this.time_local; } public void setTime_local(String time_local) { this.time_local = time_local; } public String getRequest() { return request; } public void setRequest(String request) { this.request = request; } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } public String getBody_bytes_sent() { return body_bytes_sent; } public void setBody_bytes_sent(String body_bytes_sent) { this.body_bytes_sent = body_bytes_sent; } public String getHttp_referer() { return http_referer; } public void setHttp_referer(String http_referer) { this.http_referer = http_referer; } public String getHttp_user_agent() { return http_user_agent; } public void setHttp_user_agent(String http_user_agent) { this.http_user_agent = http_user_agent; } public boolean isValid() { return valid; } public void setValid(boolean valid) { this.valid = valid; } @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append(this.valid); sb.append("\001").append(this.getRemote_addr()); sb.append("\001").append(this.getRemote_user()); sb.append("\001").append(this.getTime_local()); sb.append("\001").append(this.getRequest()); sb.append("\001").append(this.getStatus()); sb.append("\001").append(this.getBody_bytes_sent()); sb.append("\001").append(this.getHttp_referer()); sb.append("\001").append(this.getHttp_user_agent()); return sb.toString(); } @Override public void readFields(DataInput in) throws IOException { this.valid = in.readBoolean(); this.remote_addr = in.readUTF(); this.remote_user = in.readUTF(); this.time_local = in.readUTF(); this.request = in.readUTF(); this.status = in.readUTF(); this.body_bytes_sent = in.readUTF(); this.http_referer = in.readUTF(); this.http_user_agent = in.readUTF(); } @Override public void write(DataOutput out) throws IOException { out.writeBoolean(this.valid); out.writeUTF(null==remote_addr?"":remote_addr); out.writeUTF(null==remote_user?"":remote_user); out.writeUTF(null==time_local?"":time_local); out.writeUTF(null==request?"":request); out.writeUTF(null==status?"":status); out.writeUTF(null==body_bytes_sent?"":body_bytes_sent); out.writeUTF(null==http_referer?"":http_referer); out.writeUTF(null==http_user_agent?"":http_user_agent); } }
3、點擊流模型數據梳理
由於大量的指標統計從點擊流模型中更容易得出,所以在預處理階段,可以使用mr程序來生成點擊流模型的數據。
3.1、點擊流模型 pageviews 表
Pageviews 表模型數據生成, 詳細見:ClickStreamPageView.java
/** * * 將清洗之后的日志梳理出點擊流pageviews模型數據 * * 輸入數據是清洗過后的結果數據 * * 區分出每一次會話,給每一次visit(session)增加了session-id(隨機uuid) * 梳理出每一次會話中所訪問的每個頁面(請求時間,url,停留時長,以及該頁面在這次session中的序號) * 保留referral_url,body_bytes_send,useragent * * * @author * */ public class ClickStreamPageView { static class ClickStreamMapper extends Mapper<LongWritable, Text, Text, WebLogBean> { Text k = new Text(); WebLogBean v = new WebLogBean(); protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\001"); if (fields.length < 9) return; //將切分出來的各字段set到weblogbean中 v.set("true".equals(fields[0]) ? true : false, fields[1], fields[2], fields[3], fields[4], fields[5], fields[6], fields[7], fields[8]); //只有有效記錄才進入后續處理 if (v.isValid()) { //此處用ip地址來標識用戶 k.set(v.getRemote_addr()); context.write(k, v); } } } static class ClickStreamReducer extends Reducer<Text, WebLogBean, NullWritable, Text> { Text v = new Text(); /* 輸入:<ip,[weblogbean,weblogbean] 同一個ip的所有請求,按照時間先后順序排序了 */ protected void reduce(Text key, Iterable<WebLogBean> values, Context context) throws IOException, InterruptedException { ArrayList<WebLogBean> requestList = new ArrayList<WebLogBean>(); // 先將一個用戶的所有訪問記錄中的時間拿出來排序 try { for (WebLogBean bean : values) { WebLogBean webLogBean = new WebLogBean(); try { BeanUtils.copyProperties(webLogBean, bean); } catch(Exception e) { e.printStackTrace(); } requestList.add(webLogBean); } //將bean按時間先后順序排序 Arrays.sort() Collections.sort(requestList, new Comparator<WebLogBean>() { //[b,a,] , c @Override public int compare(WebLogBean o1, WebLogBean o2) { try { Date d1 = toDate(o1.getTime_local()); Date d2 = toDate(o2.getTime_local()); if (d1 == null || d2 == null) return 0; return d1.compareTo(d2); } catch (Exception e) { e.printStackTrace(); return 0; } } }); /** * 以下邏輯為:從有序bean中分辨出各次visit,並對一次visit中所訪問的page按順序標號step * 核心思想: * 就是比較相鄰兩條記錄中的時間差,如果時間差<30分鍾,則該兩條記錄屬於同一個session * 否則,就屬於不同的session * */ int step = 1; String session = UUID.randomUUID().toString(); // 如果僅有1條數據,則直接輸出 if (1 == requestList.size()) { WebLogBean bean = requestList.get(0); // 設置默認停留時長為60s v.set(session+"\001"+key.toString()+"\001"+bean.getRemote_user() + "\001" + bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) + "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" + bean.getBody_bytes_sent() + "\001" + bean.getStatus()); context.write(NullWritable.get(), v); return; } for (int i = 0; i < requestList.size(); i++) { // 如果不止1條數據,則將第一條跳過不輸出,遍歷第二條時再輸出 if (i == 0) { continue; } /* beans集合 s1 false58.215.204.118-2013-09-18 06:51:35 0 ip1 s1 false58.215.204.118-2013-09-18 06:51:36 1 s2 false58.215.204.118-2013-09-18 07:51:36 2 */ WebLogBean bean1 = requestList.get(i - 1); WebLogBean bean2 = requestList.get(i); // 求近兩次時間差 long timeDiff = timeDiff(toDate(bean2.getTime_local()), toDate(bean1.getTime_local())); // 如果本次-上次時間差<30分鍾,則輸出前一次的頁面訪問信息 if (timeDiff < 30 * 60 * 1000) { v.set(session+"\001"+key.toString()+"\001"+bean1.getRemote_user() + "\001" + bean1.getTime_local() + "\001" + bean1.getRequest() + "\001" + step + "\001" + (timeDiff / 1000) + "\001" + bean1.getHttp_referer() + "\001" + bean1.getHttp_user_agent() + "\001" + bean1.getBody_bytes_sent() + "\001" + bean1.getStatus()); context.write(NullWritable.get(), v); step++; } else { // 如果本次-上次時間差>30分鍾,則輸出前一次的頁面訪問信息且將step重置,以分隔為新的visit v.set(session+"\001"+key.toString()+"\001"+bean1.getRemote_user() + "\001" + bean1.getTime_local() + "\001" + bean1.getRequest() + "\001" + (step) + "\001" + (60) + "\001" + bean1.getHttp_referer() + "\001" + bean1.getHttp_user_agent() + "\001" + bean1.getBody_bytes_sent() + "\001" + bean1.getStatus()); context.write(NullWritable.get(), v); // 輸出完上一條之后,重置step編號 step = 1; session = UUID.randomUUID().toString(); } // 如果此次遍歷的是最后一條,則將本條直接輸出 if (i == requestList.size() - 1) { // 設置默認停留市場為60s v.set(session+"\001"+key.toString()+"\001"+bean2.getRemote_user() + "\001" + bean2.getTime_local() + "\001" + bean2.getRequest() + "\001" + step + "\001" + (60) + "\001" + bean2.getHttp_referer() + "\001" + bean2.getHttp_user_agent() + "\001" + bean2.getBody_bytes_sent() + "\001" + bean2.getStatus()); context.write(NullWritable.get(), v); } } } catch (ParseException e) { e.printStackTrace(); } } private String toStr(Date date) { SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US); return df.format(date); } private Date toDate(String timeStr) throws ParseException { SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US); return df.parse(timeStr); } private long timeDiff(String time1, String time2) throws ParseException { Date d1 = toDate(time1); Date d2 = toDate(time2); return d1.getTime() - d2.getTime(); } private long timeDiff(Date time1, Date time2) throws ParseException { return time1.getTime() - time2.getTime(); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(ClickStreamPageView.class); job.setMapperClass(ClickStreamMapper.class); job.setReducerClass(ClickStreamReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(WebLogBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // FileInputFormat.setInputPaths(job, new Path(args[0])); // FileOutputFormat.setOutputPath(job, new Path(args[1])); FileInputFormat.setInputPaths(job, new Path("d:/weblog/output")); FileOutputFormat.setOutputPath(job, new Path("d:/weblog/pageviews")); job.waitForCompletion(true); } }
public class PageViewsBean implements Writable { private String session; private String remote_addr; private String timestr; private String request; private int step; private String staylong; private String referal; private String useragent; private String bytes_send; private String status; public void set(String session, String remote_addr, String useragent, String timestr, String request, int step, String staylong, String referal, String bytes_send, String status) { this.session = session; this.remote_addr = remote_addr; this.useragent = useragent; this.timestr = timestr; this.request = request; this.step = step; this.staylong = staylong; this.referal = referal; this.bytes_send = bytes_send; this.status = status; } public String getSession() { return session; } public void setSession(String session) { this.session = session; } public String getRemote_addr() { return remote_addr; } public void setRemote_addr(String remote_addr) { this.remote_addr = remote_addr; } public String getTimestr() { return timestr; } public void setTimestr(String timestr) { this.timestr = timestr; } public String getRequest() { return request; } public void setRequest(String request) { this.request = request; } public int getStep() { return step; } public void setStep(int step) { this.step = step; } public String getStaylong() { return staylong; } public void setStaylong(String staylong) { this.staylong = staylong; } public String getReferal() { return referal; } public void setReferal(String referal) { this.referal = referal; } public String getUseragent() { return useragent; } public void setUseragent(String useragent) { this.useragent = useragent; } public String getBytes_send() { return bytes_send; } public void setBytes_send(String bytes_send) { this.bytes_send = bytes_send; } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } @Override public void readFields(DataInput in) throws IOException { this.session = in.readUTF(); this.remote_addr = in.readUTF(); this.timestr = in.readUTF(); this.request = in.readUTF(); this.step = in.readInt(); this.staylong = in.readUTF(); this.referal = in.readUTF(); this.useragent = in.readUTF(); this.bytes_send = in.readUTF(); this.status = in.readUTF(); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(session); out.writeUTF(remote_addr); out.writeUTF(timestr); out.writeUTF(request); out.writeInt(step); out.writeUTF(staylong); out.writeUTF(referal); out.writeUTF(useragent); out.writeUTF(bytes_send); out.writeUTF(status); } }
3.2、點擊流模型 visit 信息表
注:“一次訪問”=“N 次連續請求”
直接從原始數據中用hql 語法得出每個人的“次”訪問信息比較困難,可先用mapreduce 程序分析原始數據得出“次”信息數據,然后再用hql 進行更多維度統計用 MR 程序從 pageviews 數據中,梳理出每一次 visit 的起止時間、頁面信息詳細代碼見工程:ClickStreamVisit.java
/** * 輸入數據:pageviews模型結果數據 * 從pageviews模型結果數據中進一步梳理出visit模型 * sessionid start-time out-time start-page out-page pagecounts ...... * * @author * */ public class ClickStreamVisit { // 以session作為key,發送數據到reducer static class ClickStreamVisitMapper extends Mapper<LongWritable, Text, Text, PageViewsBean> { PageViewsBean pvBean = new PageViewsBean(); Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //5abe467e-d500-4889-82c1-36695b7affbf101.226.167.201-2013-09-18 09:30:36/hadoop-mahout-roadmap/160"http://blog.fens.me/hadoop-mahout-roadmap/""Mozilla/4.0(compatible;MSIE8.0;WindowsNT6.1;Trident/4.0;SLCC2;.NETCLR2.0.50727;.NETCLR3.5.30729;.NETCLR3.0.30729;MediaCenterPC6.0;MDDR;.NET4.0C;.NET4.0E;.NETCLR1.1.4322;TabletPC2.0);360Spider"10335200 String line = value.toString(); String[] fields = line.split("\001"); int step = Integer.parseInt(fields[5]); //(String session, String remote_addr, String timestr, String request, int step, String staylong, String referal, String useragent, String bytes_send, String status) //299d6b78-9571-4fa9-bcc2-f2567c46df3472.46.128.140-2013-09-18 07:58:50/hadoop-zookeeper-intro/160"https://www.google.com/""Mozilla/5.0"14722200 pvBean.set(fields[0], fields[1], fields[2], fields[3],fields[4], step, fields[6], fields[7], fields[8], fields[9]); k.set(pvBean.getSession()); context.write(k, pvBean); } } static class ClickStreamVisitReducer extends Reducer<Text, PageViewsBean, NullWritable, VisitBean> { /** 2 1 3 session001,[PageViewsBean1,PageViewsBean2,PageViewsBean3] session001,[PageViewsBean2,PageViewsBean1,PageViewsBean3] */ protected void reduce(Text session, Iterable<PageViewsBean> pvBeans, Context context) throws IOException, InterruptedException { // 將pvBeans按照step排序 ArrayList<PageViewsBean> pvBeansList = new ArrayList<PageViewsBean>(); for (PageViewsBean pvBean : pvBeans) { PageViewsBean bean = new PageViewsBean(); try { BeanUtils.copyProperties(bean, pvBean); pvBeansList.add(bean); } catch (Exception e) { e.printStackTrace(); } } Collections.sort(pvBeansList, new Comparator<PageViewsBean>() { @Override public int compare(PageViewsBean o1, PageViewsBean o2) { return o1.getStep() > o2.getStep() ? 1 : -1; } }); // 取這次visit的首尾pageview記錄,將數據放入VisitBean中 VisitBean visitBean = new VisitBean(); // 取visit的首記錄 visitBean.setInPage(pvBeansList.get(0).getRequest()); visitBean.setInTime(pvBeansList.get(0).getTimestr()); // 取visit的尾記錄 visitBean.setOutPage(pvBeansList.get(pvBeansList.size() - 1).getRequest()); visitBean.setOutTime(pvBeansList.get(pvBeansList.size() - 1).getTimestr()); // visit訪問的頁面數 visitBean.setPageVisits(pvBeansList.size()); // 來訪者的ip visitBean.setRemote_addr(pvBeansList.get(0).getRemote_addr()); // 本次visit的referal visitBean.setReferal(pvBeansList.get(0).getReferal()); visitBean.setSession(session.toString()); context.write(NullWritable.get(), visitBean); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(ClickStreamVisit.class); job.setMapperClass(ClickStreamVisitMapper.class); job.setReducerClass(ClickStreamVisitReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(PageViewsBean.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(VisitBean.class); // FileInputFormat.setInputPaths(job, new Path(args[0])); // FileOutputFormat.setOutputPath(job, new Path(args[1])); FileInputFormat.setInputPaths(job, new Path("d:/weblog/pageviews")); FileOutputFormat.setOutputPath(job, new Path("d:/weblog/visitout")); boolean res = job.waitForCompletion(true); System.exit(res?0:1); } }
public class VisitBean implements Writable { private String session; private String remote_addr; private String inTime; private String outTime; private String inPage; private String outPage; private String referal; private int pageVisits; public void set(String session, String remote_addr, String inTime, String outTime, String inPage, String outPage, String referal, int pageVisits) { this.session = session; this.remote_addr = remote_addr; this.inTime = inTime; this.outTime = outTime; this.inPage = inPage; this.outPage = outPage; this.referal = referal; this.pageVisits = pageVisits; } public String getSession() { return session; } public void setSession(String session) { this.session = session; } public String getRemote_addr() { return remote_addr; } public void setRemote_addr(String remote_addr) { this.remote_addr = remote_addr; } public String getInTime() { return inTime; } public void setInTime(String inTime) { this.inTime = inTime; } public String getOutTime() { return outTime; } public void setOutTime(String outTime) { this.outTime = outTime; } public String getInPage() { return inPage; } public void setInPage(String inPage) { this.inPage = inPage; } public String getOutPage() { return outPage; } public void setOutPage(String outPage) { this.outPage = outPage; } public String getReferal() { return referal; } public void setReferal(String referal) { this.referal = referal; } public int getPageVisits() { return pageVisits; } public void setPageVisits(int pageVisits) { this.pageVisits = pageVisits; } @Override public void readFields(DataInput in) throws IOException { this.session = in.readUTF(); this.remote_addr = in.readUTF(); this.inTime = in.readUTF(); this.outTime = in.readUTF(); this.inPage = in.readUTF(); this.outPage = in.readUTF(); this.referal = in.readUTF(); this.pageVisits = in.readInt(); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(session); out.writeUTF(remote_addr); out.writeUTF(inTime); out.writeUTF(outTime); out.writeUTF(inPage); out.writeUTF(outPage); out.writeUTF(referal); out.writeInt(pageVisits); } @Override public String toString() { return session + "\001" + remote_addr + "\001" + inTime + "\001" + outTime + "\001" + inPage + "\001" + outPage + "\001" + referal + "\001" + pageVisits; } }
pom.xml
<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.4</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.4</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.4</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>2.4</version> <configuration> <archive> <manifest> <addClasspath>true</addClasspath> <classpathPrefix>lib/</classpathPrefix> <mainClass></mainClass> </manifest> </archive> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> <encoding>utf-8</encoding> </configuration> </plugin> </plugins> </build>