一、研究背景
互聯網行業越來越重視自家客戶的一些行為偏好了,無論是電商行業還是金融行業,基於用戶行為可以做出很多東西,電商行業可以歸納出用戶偏好為用戶推薦商品,金融行業可以把用戶行為作為反欺詐的一個點,本文主要介紹其中一個重要的功能點,基於行為日志統計用戶行為路徑,為運營人員提供更好的運營決策。可以實現和成熟產品如adobe analysis類似的用戶行為路徑分析。最終效果如圖。使用的是開源大數據可視化工具。如圖所示,用戶行為路徑的數據非常巨大,uv指標又不能提前計算好(時間段未定),如果展示5級,一個頁面的數據量就是10的5次方,如果有幾千個頁面,數據量是無法估量的,所以只能進行實時計算,而Spark非常適合迭代計算,基於這樣的考慮,Spark是不錯的選擇。
二、解決方案
1.流程描述
客戶搜索某一起始頁面的行為路徑明細數據時,RPC請求到后台,調用spark-submit腳本啟動spark程序,Spark程序實時計算並返回數據,前端Java解析數據並展現。
2.准備工作
1.首先要有行為數據啦,用戶行為日志數據必須包含必須包含以下四個字段,訪問時間、設備指紋、會話id、頁面名稱,其中頁面名稱可以自行定義,用來標示一種或者一類頁面,每次用戶請求的時候上報此字段,服務器端收集並存儲,此頁面名稱最好不要有重復,為后續分析打下基礎。
2.然后對行為日志進行一級清洗(基於Hive),將數據統一清洗成如下格式。設備指紋是我另一個研究的項目,還沒時間貼出來。會話id就是可以定義一個會話超時時間,即20分鍾用戶如果沒有任何動作,等20分鍾過后再點擊頁面就認為這是下個一會話id,可通過cookie來控制此會話id。
設備指紋 | 會話id | 頁面路徑(按時間升序 | 時間 |
fpid1 | sessionid1 | A_B_C_D_E_F_G | 2017-01-13 |
A、B、C代表頁面名稱,清洗過程采用row_number函數,concat_ws函數,具體用法可以百度。清洗完之后落地到hive表,后續會用到。T+1清洗此數據。
3.弄清楚遞歸的定義
遞歸算法是一種直接或者間接調用自身函數或者方法的算法。Java遞歸算法是基於Java語言實現的遞歸算法。遞歸算法的實質是把問題分解成規模縮小的同類問題的子問題,然后遞歸調用方法來表示問題的解。遞歸算法對解決一大類問題很有效,它可以使算法簡潔和易於理解。遞歸算法,其實說白了,就是程序的自身調用。它表現在一段程序中往往會遇到調用自身的那樣一種coding策略,這樣我們就可以利用大道至簡的思想,把一個大的復雜的問題層層轉換為一個小的和原問題相似的問題來求解的這樣一種策略。遞歸往往能給我們帶來非常簡潔非常直觀的代碼形勢,從而使我們的編碼大大簡化,然而遞歸的思維確實很我們的常規思維相逆的,我們通常都是從上而下的思維問題, 而遞歸趨勢從下往上的進行思維。這樣我們就能看到我們會用很少的語句解決了非常大的問題,所以遞歸策略的最主要體現就是小的代碼量解決了非常復雜的問題。
遞歸算法解決問題的特點:
1)遞歸就是方法里調用自身。
2)在使用遞增歸策略時,必須有一個明確的遞歸結束條件,稱為遞歸出口。
3)遞歸算法解題通常顯得很簡潔,但遞歸算法解題的運行效率較低。所以一般不提倡用遞歸算法設計程序。
4)在遞歸調用的過程當中系統為每一層的返回點、局部量等開辟了棧來存儲。遞歸次數過多容易造成棧溢出等,所以一般不提倡用遞歸算法設計程序。
在做遞歸算法的時候,一定要把握住出口,也就是做遞歸算法必須要有一個明確的遞歸結束條件。這一點是非常重要的。其實這個出口是非常好理解的,就是一個條件,當滿足了這個條件的時候我們就不再遞歸了。
4.多叉樹的基本知識
三、Spark處理
流程概述:
1.構建一個多叉樹的類,類主要屬性描述,name全路徑如A_B_C,childList兒子鏈表,多叉樹的構建和遞歸參考了這里
2.按時間范圍讀取上一步預處理的數據,遞歸計算每一級頁面的屬性指標,並根據頁面路徑插入到初始化的Node類根節點中。
3.遞歸遍歷上一步初始化的根節點對象,並替換其中的name的id為名稱,其中借助Spark DataFrame查詢數據。
4.將root對象轉化成json格式,返回前端。
附上代碼如下。
import java.util import com.google.gson.Gson import org.apache.spark.SparkContext import org.apache.log4j.{Level, Logger => LG} import org.apache.spark.sql.DataFrame import org.apache.spark.sql.hive.HiveContext /** * 用戶行為路徑實時計算實現 * Created by chouyarn on 2016/12/12. */ /** * 樹結構類 * * @param name 頁面路徑 * @param visit 訪次 * @param pv pv * @param uv uv * @param childList 兒子鏈表 */ class Node( var name: String, var path:Any, var visit: Any, var pv: Any, var uv: Any, var childList: util.ArrayList[Node]) extends Serializable { /** * 添加子節點 * * @param node 子節點對象 * @return */ def addNode(node: Node) = { childList.add(node) } /** * 遍歷節點,深度優先 */ def traverse(): Unit = { if (childList.isEmpty) return // node. val childNum = childList.size for (i <- 0 to childNum - 1) { val child: Node = childList.get(i) child.name = child.name.split("_").last//去除前邊絕對路徑 child.traverse() } } /** * 遍歷節點,深度優先 */ def traverse(pages:DataFrame): Unit = { if (childList.isEmpty||childList.size()==0) return // node. val childNum = childList.size for (i <- 0 to childNum - 1) { val child: Node = childList.get(i) child.name = child.name.split("_").last val id =pages.filter("page_id='"+child.name+"'").select("page_name").first().getString(0)//替換id為name child.name = id child.traverse(pages) } } /** * 動態插入節點 * * @param node 節點對象 * @return */ def insertNode(node: Node): Boolean = { val insertName = node.name if (insertName.stripSuffix("_" + insertName.split("_").last).equals(name)) { // node.name=node.name.split("_").last addNode(node) true } else { val childList1 = childList val childNum = childList1.size var insetFlag = false for (i <- 0 to childNum - 1) { val childNode = childList1.get(i) insetFlag = childNode.insertNode(node) if (insetFlag == true) true } false } } } /** * 處理類 */ class Path extends CleanDataWithRDD { LG.getRootLogger.setLevel(Level.ERROR)//控制spark日志輸出級別 val sc: SparkContext = SparkUtil.createSparkContextYarn("path") val hiveContext = new HiveContext(sc) override def handleData(conf: Map[String, String]): Unit = { val num = conf.getOrElse("depth", 5)//路徑深度 val pageName = conf.getOrElse("pageName", "")//頁面名稱 // val pageName = "A_C" val src = conf.getOrElse("src", "")//標示來源pc or wap val pageType = conf.getOrElse("pageType", "")//向前或者向后路徑 val startDate = conf.getOrElse("startDate", "")//開始日期 val endDate = conf.getOrElse("endDate", "")//結束日期 // 保存log緩存以保證后續使用 val log = hiveContext.sql(s"select fpid,sessionid,path " + s"from specter.t_pagename_path_sparksource " + s"where day between '$startDate' and '$endDate' and path_type=$pageType and src='$src' ") .map(s => { (s.apply(0) + "_" + s.apply(1) + "_" + s.apply(2)) }).repartition(10).persist() val pages=hiveContext.sql("select page_id,page_name from specter.code_pagename").persist()//緩存頁面字典表 // 本地測試數據 // val log = sc.parallelize(Seq("fpid1_sessionid1_A_B", // "fpid2_sessionid2_A_C_D_D_B_A_D_A_F_B", // "fpid1_sessionid1_A_F_A_C_D_A_B_A_V_A_N")) var root: Node = null /** * 遞歸將計算的節點放入樹結構 * * @param pageName 頁面名稱 */ def compute(pageName: String): Unit = { val currenRegex = pageName.r //頁面的正則表達式 val containsRdd = log.filter(_.contains(pageName)).persist() //包含頁面名稱的RDD,后續步驟用到 val currentpv = containsRdd.map(s => {//計算pv currenRegex findAllIn (s) }).map(_.mkString(",")) .flatMap(_.toString.split(",")) .filter(_.size > 0) .count() val tempRdd = containsRdd.map(_.split("_")).persist() //分解后的RDD val currentuv = tempRdd.map(_.apply(0)).distinct().count() //頁面uv val currentvisit = tempRdd.map(_.apply(1)).distinct().count() //頁面訪次 // 初始化根節點或添加節點 if (root == null) { root = new Node(pageName,pageName.hashCode, currentvisit, currentpv, currentuv, new util.ArrayList[Node]()) } else { root.insertNode(new Node(pageName,pageName.hashCode, currentvisit, currentpv, currentuv, new util.ArrayList[Node]())) } if (pageName.split("_").size == 5||tempRdd.isEmpty()) {//遞歸出口 return } else { // 確定下個頁面名稱正則表達式 val nextRegex = s"""${pageName}_[0-9a-z]{8}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{4}-[0-9a-z]{12}""".r // 本地測試 // val nextRegex =s"""${pageName}_[A-Z]""".r val nextpvMap = containsRdd.map(s => {//下一級路徑的pv數top9 nextRegex findAllIn (s) }).map(_.mkString(",")) .flatMap(_.toString.split(",")) .filter(_.size > 0) .map(s => (s.split("_").last, 1)) .filter(!_._1.contains(pageName.split("_")(0))) .reduceByKey(_ + _).sortBy(_._2, false).take(9).toMap nextpvMap.keySet.foreach(key => {//遞歸計算 compute(pageName + "_" + key) }) } } //觸發計算 compute(pageName) val gson: Gson = new Gson() root.traverse(pages) root.name=pages.filter("page_id='"+pageName+"'").select("page_name").first().getString(0) println(gson.toJson(root))//轉化成JSON並打印,Alibaba fsatjson不可用,還是google得厲害。 } override def stop(): Unit = { sc.stop() } } object Path { def main(args: Array[String]): Unit = { // println("ss".hashCode) var num=5 try { num=args(5).toInt }catch { case e:Exception => } val map = Map("pageName" -> args(0), "pageType" -> args(1), "startDate" -> args(2), "endDate" -> args(3), "src" -> args(4), "depth" -> num.toString) val path = new Path() path.handleData(map) } }
四、總結
Spark基本是解決了實時計算行為路徑的問題,缺點就是延遲稍微有點高,因為提交Job之后要向集群申請資源,申請資源和啟動就耗費將近30秒,后續這塊可以優化。據說spark-jobserver提供一個restful接口,為Job預啟動容器,博主沒時間研究有興趣的可以研究下啦。
fastjson在對復雜對象的轉換中不如Google 的Gson。
使用遞歸要慎重,要特別注意出口條件,若出口不明確,很有可能導致死循環。