用戶行為分析
統計分析
點擊、瀏覽;
熱門商品、近期熱門商品、分類熱門商品,流量統計;
偏好統計
收藏、喜歡、評分、打標簽;
用戶畫像、推薦列表(結合特征工程和機器學習算法)
風險控制
下訂單、支付、登錄
刷單監控,訂單實效監控,惡意登錄(短時間內頻繁登錄失敗)監控
1. 實時流量統計
1.1. 實時熱門商品統計HotItemsAnalysis
每隔 5 分鍾輸出最近一小時內點擊量最多的前 N 個商品。
抽取出業務時間戳,告訴 Flink 框架基於業務時間做窗口
• 過濾出點擊行為數據
• 按一小時的窗口大小,每 5 分鍾統計一次,做滑動窗口聚合( Sliding Window)
• 按每個窗口聚合,輸出每個窗口中點擊量前 N 名的商品
1.2. 實時流量統計 NetworkFlowAnalysis
"實時流量統計" 對於一個電商平台而言,用戶登錄的入口流量、不同頁面的訪問流量 都是值得分析的重要數據,
而這些數據,可以簡單地從 web 服務器的日志中提取出來。
熱門頁面瀏覽數的統計NetworkFlow
讀取服務器日志中的每一行log統計在一段時間內用戶訪問每一個url的次數,然后排序輸出顯示。
具體做法為:
每隔 5 秒,輸出最近 10 分鍾內訪問量最多的前 N 個 URL。可以看出,這個需求與之前“實時熱門商品統計”非常類似,
所以我們完全可以借鑒此前的代碼。
網站總瀏覽量PV的統計-PageView
衡量網站流量一個最簡單的指標,就是網站的頁面瀏覽量(Page View PV );
用戶每次打開一個頁面便記錄 1 次 PV ,多次打開同一頁面則瀏覽量累計。一般來說PV 與來訪者的數量成正比,但是 PV 並不直接決定頁面的真實來訪者數量,
如同一個來訪者通過不斷的刷新頁面,也可以制造出非常高的 PV 。
用戶瀏覽頁面時,會從瀏覽器向網絡服務器 發出一個請求 Request網絡服務器接到這個請求后,會將該請求對應的一個網頁( Page )發送給瀏覽器
從而產生了一個 PV。所以我們的統計方法,可以是從 web 服務器的日志中去提取對應的頁面訪問然后統計,就向上一節中的做法一樣;也可以直接從埋點日志中提
取用戶發來的頁面請求,從而統計出總瀏覽量。
實現一個網站總瀏覽量的統計。可以設置滾動時間窗口,實時統計每小時內的網站 PV
獨立訪客數UV的統計-UniqueVisitor
上例中,我們統計的是所有用戶對頁面的所有瀏覽行為,也就是說,同一用戶的瀏覽行為會被重復統計。而在實際應用中,往往還會關注,在一段時間內到底有多少不同的用戶訪問了網站。
另外一個統計流量的重要指標是網站的獨立訪客數(Unique Visitor UV )。 UV指的是一段時間(比如一小時)內訪問網站的總人數, 1天內同一訪客的多次訪問
只記錄為一個訪客。通過 IP 和 cookie 判斷 UV值的兩種方式。 當客戶端第一次訪問某個網站服務器的時候,網站服務器會給這個客戶端的電腦發出一個 Cookie
通常放在這個客戶端電腦的C 盤當中。在這個 Cookie 中會分配一個獨一無二的編號,這其中會記錄一些訪問服務器的信息,如訪問時間,訪問了哪些頁面等等。
當你下次再訪問這個服務器的時候,服務器就可以直接從你的電腦中找到上一次放進去的
Cookie 文件,並且對其進行一些更新,但那個獨一無二的編號是不會變的。
此例中可以根據 userId 來區分不同的用戶。
使用布隆過濾器查重-過濾的UV統計
上例中,把所有數據的userId 都存在了窗口計算的狀態里,在窗口收集數據的過程中,狀態會不斷增大。一般情況下,只要不超出內存的承受范圍,
這種做法也沒什么問題;但如果我們遇到的數據量很大呢?把所有數據暫存放到內存里,顯然不是一個好注意。我們會想到,可以利用 redis這種內存級 k v 數據庫,為我們做一個緩存。
但如果我們遇到的情況非常極端,數據大到驚人呢?比如上億級的用戶,要去重計算 UV 。
如果放到redis 中,億級的用戶id (每個20字節左右的話)可能需要幾G甚至幾十G的空間來存儲。當然放到 redis 中,用集群進行擴展也不是不可以,但明顯
代價太大了。一個更好的想法是,其實我們不需要完整地存儲用戶ID 的信息,只要知道他在不在就行了。所以其實我們可以進行壓縮處理,用一位(bit)就可以表示一個用戶
的狀態。這個思想的具體實現就是布隆過濾器(Bloom Filter)。
本質上布隆過濾器是一種數據結構,比較巧妙的概率型數據結構(probabilisticdata structure ),
特點是高效地插入和查詢,可以用來告訴你 “某樣東西一定不存在或者可能存在”。
它本身是一個很長的二進制向量,既然是二進制的向量,那么顯而易見的,存放的不是 0 ,就是 1 。 相比於傳統的 List 、 Set 、 Map 等數據結構,它更高效、占用空間更少,
但是缺點是其返回的結果是概率性的,而不是確切的。
我們的目標就是,利用某種方法(一般是Hash 函數)把每個數據,對應到一個位圖的某一位上去;如果數據存在,那一位就是1,不存在則為 0 。
判斷當前最大的時間戳 <= 當前的watermark,就返回一個TriggerResult.FIRE(觸發);否則就注冊一個定時器(關窗的操作)
TriggerResult的類型:CONTINUE-什么都不做繼續處理窗口;FIRE觸發窗口的計算操作但並不會關閉窗口清除它的狀態;PURGE清除窗口的狀態;FIRE_AND_PURGE觸發並清除掉;
redis:
2. 市場營銷商業指標統計分析
2.1. APP市場推廣統計
* 對於電商企業來說,一般會通過各種不同的渠道對自己的APP進行市場推廣,而這些渠道的統計數據(比如,不同網站上廣告鏈接的點擊量、APP下載量)就成了市場營銷的重要商業指標。
* 首先考察分渠道的市場推廣統計。
分渠道統計
AppMarketingByChannel.scala
自定義ProcessWindowFunction 進行處理
不分渠道(總量)統計
同樣我們還可以考察不分渠道的市場推廣統計,這樣得到的就是所有渠道推廣的總量 AppMarketing.scala 。
2.2 頁面廣告分析
* 電商網站的市場營銷商業指標中,除了自身的APP 推廣,還會考慮到頁面上的廣告投放(包括自己經營的產品和其它網站的廣告)。 所以廣告相關的統計分析,也是市場營銷的重要指標。
* 對於廣告的統計,最簡單也最重要的就是頁面廣告的點擊量,網站往往需要根據廣告點擊量來制定定價策略和調整推廣方式,而且也可以借此收集用戶的偏好信息。
* 更加具體的應用是,我們可以根據用戶的地理位置進行划分,從而總結出不同省份用戶對不同廣告的偏好,這樣更有助於廣告的精准投放。
頁面廣告點擊量統計
* 接下來我們就進行頁面廣告按照省份划分的點擊量的統計。AdStatisticsByGeo .scala 文件 。
* 自定義一些測試數據AdClickLog,用來生成用戶點擊廣告行為的事件流。
* 主函數以 province 進行 keyBy ,然后開一小時的時間窗口,滑動距離為5秒,統計窗口內的點擊事件數量。
*
* 廣告點擊量統計,同一用戶的重復點擊是會疊加計算的。在實際場景中,同一用戶確實可能反復點開同一個廣告,這也說明了用戶對廣告更大的興趣;
* 但是如果用戶在一段時間非常頻繁地點擊廣告,這顯然不是一個正常行為,有刷點擊量的嫌疑。所以我們可以對一段時間內(比如一天內)的用戶點擊行為進行約束,
* 如果對同一個廣告點擊超過一定限額(比如 100 次),應該把該用戶加入黑名單並報警,此后其點擊行為不應該再統計。
黑名單過濾
3.惡意登錄監控
狀態編程
==========================惡意登錄監控==================
* 對於網站而言,用戶登錄並不是頻繁的業務操作。如果一個用戶短時間內頻繁登錄失敗,就有可能是出現了程序的惡意攻擊,比如密碼暴力破解。因此我們考慮,
* 應該對用戶的登錄失敗動作進行統計,具體來說,如果同一用戶(可以是不同 IP)在2秒之內連續兩次登錄失敗,就認為存在惡意登錄的風險,輸出相關的信息進行
* 報警提示。這是電商網站、也是幾乎所有 網站風控的基本一環。
* 1. 狀態編程的方式實現:LoginFail
* 由於同樣引入了時間,我們可以想到,最簡單的方法其實與之前的熱門統計類似,只需要按照用戶 ID 分流,然后遇到登錄失敗的事件時將其保存在 ListState 中,
* 然后設置一個定時器,2秒后觸發。定時器觸發時檢查狀態中的登錄失敗事件個數,如果大於等於2,那么就輸出報警信息。
*
* 新建一個單例對象。 定義樣例類LoginEvent ,這是輸入的登錄事件流。登錄數據本應該從UserBehavior日志里提取
* 由於UserBehavior.csv中沒有做相關埋點,從另一個文件 LoginLog.csv 中讀取登錄數據 。
*
*
* 2. 優化操作:
* 第一次的代碼實現中我們可以看到,直接把每次登錄失敗的數據存起來、設置定時器一段時間后再讀取,這種做法盡管簡單,但和我們開始的需求還是略有差異
* 的。這種做法只能隔 2 秒之后去判斷一下這期間是否有多次失敗登錄,而不是在一次登錄失敗之后、再一次登錄失敗時就立刻報警。這個需求如果嚴格實現起來,相
* 當於要判斷任意緊鄰的事件,是否符合某種模式。於是我們可以想到,這個需求其實可以不用定時器觸發,直接在狀態中存取上一次登錄失敗的事件,每次都做判斷和比對,就可以實現最初的需求。
* 在代碼MatchFunction中刪掉onTimer processElement
*
CEP編程
* 我們通過對狀態編程的改進,去掉了定時器,在 process function 中做了
* 更多的邏輯處理,實現了最初的需求。不過這種方法里有很多的條件判斷,目前僅僅實現的是檢測“連續2次登錄失敗”,這是最簡單的情形。
* 如果需要檢測更多次,內部邏輯顯然會變得非常復雜。那有什么方式可以方便地實現呢?
* flink為我們提供了CEP Complex Event Processing ,復雜事件處理庫,用於在流中篩選符合某種復雜模式的事件。
* 基於 CEP 來完成這個模塊的實現。
4. 訂單支付實時監控
使用CEP的實現
========================訂單支付實時監控=========================
在電商網站中,訂單的支付作為直接與營銷收入掛鈎的一環,在業務流程中非常重要。對於訂單而言,為了正確控制業務流程,也為了增加用戶的支付意願,網
站一般會設置一個支付失效時間,超過一段時間不支付的訂單就會被取消。另外,對於訂單的支付,我們還應保證用戶支付的正確性,這可以通過第三方支付平台的
對於訂單的支付,我們還應保證用戶支付的正確性,這可以通過第三方支付平台的交易數據來做一個實時對賬。
將實現這兩個需求。交易數據來做一個實時對賬。
* 在電商平台中最終創造收入和利潤的是用戶下單購買的環節;更具體一點,是用戶真正完成支付動作的時候。用戶下單的行為可以表明用戶對商品的需求,但
* 在現實中,並不是每次下單都會被用戶立刻支付。當拖延一段時間后,用戶支付的意願會降低。所以為了讓用戶更有緊迫感從而提高支付轉化率,同時也為了防范訂
* 單支付環節的安全風險,電商網站往往會對訂單狀態進行監控,設置一個失效時間(比如 15 分鍾),如果下單后一段時間仍未支付,訂單就會被 取消。
* 使用 CEP 實現
* 利用 CEP 庫來實現這個功能。我們先將事件流按照訂單號orderId分流,
* 定義這樣的一個事件模式:在15分鍾內,事件“create”與pay非嚴格緊鄰,這樣調用.select 方法時,就可以同時獲取到匹配出的事件和超時未匹配的事件。
* 1. CEP實現訂單超時報警
狀態編程的實現
* 2. 用狀態編程來實現:
* 我們同樣可以利用Process Function ,自定義實現檢測訂單超時的功能。為了簡化問題,我們只考慮超時報警的情形,在 pay 事件超時未發生的情況下,輸出超時報警信息。
* 一個簡單的思路是,可以在訂單的create 事件到來后注冊定時器,15分鍾后觸發;然后再用一個布爾類型的 Value 狀態來作為標識位,表明 pay 事件是否發生過。
* 如果 pay 事件已經發生,狀態被置為 true ,那么就不再需要 做什么操作;而如果 pay事件一直沒來,狀態一直為 false,到定時器觸發時,就應該輸出超時報警信息。
* 現在只考慮兩種情況:①來一個create,來一個pay create后邊有pay就正常匹配,如果沒來就超時報警
* 亂序的數據,有可能create和pay的先后順序
* 超時報警的情況: 遇到create設一個定時器,遇到pay改一個狀態(或者不刪定時器,直接設定一個狀態看有沒有pay來過,有則定時器觸發時說是正常的,沒有就超時報警
雙流的訂單交易
-----來自兩條流的訂單交易匹配----------
* 對於訂單支付事件,用戶支付完成其實並不算完,我們還得確認平台賬戶上是否到賬了。而往往這會來自不同的日志信息,所以我們要同時讀入兩條流的數據來
* 做合並處理。這里我們利用 connect 將兩條流進行連接,
* 1. 用自定義的CoProcessFunction 進行處理。
* 2. 雙流join
* window join(Tumbling Window Join、 Sliding Window Join)適用於兩條流join,后邊還要開窗口的分析
*Interval join(區間join)適用於傳感器報警(溫度煙霧出現異常,它倆時間得匹配上在同一時間范圍內同時出現,溫度又升高的很快)
* Join中當做狀態保存起來
*此需求是兩條流匹配上就可以了
統計類:讀取數據、做簡單包裝轉換map、filter、按某個字段分組,開窗,做聚合
排序| TopN:再做一個ProcessFunction,把所有數據都收集到排序輸出;
以上是基於DataStreamAPI,也可以用高級API、TableAPI和FlinkSQL
業務流程中的狀態做檢測輸出和警告:自定義編程、狀態
事件邏輯、風控:CEP