尚硅谷大數據技術之電商用戶行為分析
第1章 項目整體介紹
1.1 電商的用戶行為
電商平台中的用戶行為頻繁且較復雜,系統上線運行一段時間后,可以收集到大量的用戶行為數據,進而利用大數據技術進行深入挖掘和分析,得到感興趣的商業指標並增強對風險的控制。
電商用戶行為數據多樣,整體可以分為用戶行為習慣數據和業務行為數據兩大類。用戶的行為習慣數據包括了用戶的登錄方式、上線的時間點及時長、點擊和瀏覽頁面、頁面停留時間以及頁面跳轉等等,我們可以從中進行流量統計和熱門商品的統計,也可以深入挖掘用戶的特征;這些數據往往可以從web服務器日志中直接讀取到。而業務行為數據就是用戶在電商平台中針對每個業務(通常是某個具體商品)所作的操作,我們一般會在業務系統中相應的位置埋點,然后收集日志進行分析。業務行為數據又可以簡單分為兩類:一類是能夠明顯地表現出用戶興趣的行為,比如對商品的收藏、喜歡、評分和評價,我們可以從中對數據進行深入分析,得到用戶畫像,進而對用戶給出個性化的推薦商品列表,這個過程往往會用到機器學習相關的算法;另一類則是常規的業務操作,但需要着重關注一些異常狀況以做好風控,比如登錄和訂單支付。
1.2 項目主要模塊
基於對電商用戶行為數據的基本分類,我們可以發現主要有以下三個分析方向:
- 熱門統計
利用用戶的點擊瀏覽行為,進行流量統計、近期熱門商品統計等。
- 偏好統計
利用用戶的偏好行為,比如收藏、喜歡、評分等,進行用戶畫像分析,給出個性化的商品推薦列表。
- 風險控制
利用用戶的常規業務行為,比如登錄、下單、支付等,分析數據,對異常情況進行報警提示。
本項目限於數據,我們只實現熱門統計和風險控制中的部分內容,將包括以下五大模塊:實時熱門商品統計、實時流量統計、市場營銷商業指標統計、惡意登錄監控和訂單支付失效監控,其中細分為以下9個具體指標:
由於對實時性要求較高,我們會用flink作為數據處理的框架。在項目中,我們將綜合運用flink的各種API,基於EventTime去處理基本的業務需求,並且靈活地使用底層的processFunction,基於狀態編程和CEP去處理更加復雜的情形。
1.3 數據源解析
我們准備了一份淘寶用戶行為數據集,保存為csv文件。本數據集包含了淘寶上某一天隨機一百萬用戶的所有行為(包括點擊、購買、收藏、喜歡)。數據集的每一行表示一條用戶行為,由用戶ID、商品ID、商品類目ID、行為類型和時間戳組成,並以逗號分隔。關於數據集中每一列的詳細描述如下:
字段名 |
數據類型 |
說明 |
userId |
Long |
加密后的用戶ID |
itemId |
Long |
加密后的商品ID |
categoryId |
Int |
加密后的商品所屬類別ID |
behavior |
String |
用戶行為類型,包括(‘pv’, ‘’buy, ‘cart’, ‘fav’) |
timestamp |
Long |
行為發生的時間戳,單位秒 |
另外,我們還可以拿到web服務器的日志數據,這里以apache服務器的一份log為例,每一行日志記錄了訪問者的IP、userId、訪問時間、訪問方法以及訪問的url,具體描述如下:
字段名 |
數據類型 |
說明 |
ip |
String |
訪問的 IP |
userId |
Long |
訪問的 user ID |
eventTime |
Long |
訪問時間 |
method |
String |
訪問方法 GET/POST/PUT/DELETE |
url |
String |
訪問的 url |
由於行為數據有限,在實時熱門商品統計模塊中可以使用UserBehavior數據集,而對於惡意登錄監控和訂單支付失效監控,我們只以示例數據來做演示。
第2章 實時熱門商品統計
首先要實現的是實時熱門商品統計,我們將會基於UserBehavior數據集來進行分析。
項目主體用Scala編寫,采用IDEA作為開發環境進行項目編寫,采用maven作為項目構建和管理工具。首先我們需要搭建項目框架。
2.1 創建Maven項目
2.1.1 項目框架搭建
打開IDEA,創建一個maven項目,命名為UserBehaviorAnalysis。由於包含了多個模塊,我們可以以UserBehaviorAnalysis作為父項目,並在其下建一個名為HotItemsAnalysis的子項目,用於實時統計熱門top N商品。
在UserBehaviorAnalysis下新建一個 maven module作為子項目,命名為HotItemsAnalysis。
父項目只是為了規范化項目結構,方便依賴管理,本身是不需要代碼實現的,所以UserBehaviorAnalysis下的src文件夾可以刪掉。
2.1.2 聲明項目中工具的版本信息
我們整個項目需要的工具的不同版本可能會對程序運行造成影響,所以應該在最外層的UserBehaviorAnalysis中聲明所有子模塊共用的版本信息。
在pom.xml中加入以下配置:
UserBehaviorAnalysis/pom.xml
<properties> <scala.binary.version>2.11</scala.binary.version> |
2.1.3 添加項目依賴
對於整個項目而言,所有模塊都會用到flink相關的組件,所以我們在UserBehaviorAnalysis中引入公有依賴:
UserBehaviorAnalysis/pom.xml
<dependencies> <dependency> <artifactId>kafka_${scala.binary.version}</artifactId> <version>${kafka.version}</version> </dependencies> |
同樣,對於maven項目的構建,可以引入公有的插件:
<build> jar-with-dependencies </descriptorRef> |
在HotItemsAnalysis子模塊中,我們並沒有引入更多的依賴,所以不需要改動pom文件。
2.1.4 數據准備
在src/main/目錄下,可以看到已有的默認源文件目錄是java,我們可以將其改名為scala。將數據文件UserBehavior.csv復制到資源文件目錄src/main/resources下,我們將從這里讀取數據。
至此,我們的准備工作都已完成,接下來可以寫代碼了。
2.2 模塊代碼實現
我們將實現一個“實時熱門商品”的需求,可以將“實時熱門商品”翻譯成程序員更好理解的需求:每隔5分鍾輸出最近一小時內點擊量最多的前N個商品。將這個需求進行分解我們大概要做這么幾件事情:
- 抽取出業務時間戳,告訴Flink框架基於業務時間做窗口
- 過濾出點擊行為數據
- 按一小時的窗口大小,每5分鍾統計一次,做滑動窗口聚合(Sliding Window)
- 按每個窗口聚合,輸出每個窗口中點擊量前N名的商品
2.2.1 程序主體
在src/main/scala下創建HotItems.scala文件,新建一個單例對象。定義樣例類UserBehavior和ItemViewCount,在main函數中創建StreamExecutionEnvironment 並做配置,然后從UserBehavior.csv文件中讀取數據,並包裝成UserBehavior類型。代碼如下:
HotItemsAnalysis/src/main/scala/HotItems.scala
case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)
object HotItems {
def main(args: Array[String]): Unit = {
// 創建一個 StreamExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 設定Time類型為EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 為了打印到控制台的結果不亂序,我們配置全局的並發為1,這里改變並發對結果正確性沒有影響
env.setParallelism(1)
val stream = env
// 以window下為例,需替換成自己的路徑
.readTextFile("YOUR_PATH\\resources\\UserBehavior.csv")
.map(line => {
val linearray = line.split(",")
UserBehavior(linearray(0).toLong, linearray(1).toLong, linearray(2).toInt, linearray(3), linearray(4).toLong)
})
// 指定時間戳和watermark
.assignAscendingTimestamps(_.timestamp * 1000)
env.execute("Hot Items Job")
}
這里注意,我們需要統計業務時間上的每小時的點擊量,所以要基於EventTime來處理。那么如果讓Flink按照我們想要的業務時間來處理呢?這里主要有兩件事情要做。
第一件是告訴Flink我們現在按照EventTime模式進行處理,Flink默認使用ProcessingTime處理,所以我們要顯式設置如下:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
第二件事情是指定如何獲得業務時間,以及生成Watermark。Watermark是用來追蹤業務事件的概念,可以理解成EventTime世界中的時鍾,用來指示當前處理到什么時刻的數據了。由於我們的數據源的數據已經經過整理,沒有亂序,即事件的時間戳是單調遞增的,所以可以將每條數據的業務時間就當做Watermark。這里我們用 assignAscendingTimestamps來實現時間戳的抽取和Watermark的生成。
注:真實業務場景一般都是亂序的,所以一般不用assignAscendingTimestamps,而是使用BoundedOutOfOrdernessTimestampExtractor。
.assignAscendingTimestamps(_.timestamp * 1000)
這樣我們就得到了一個帶有時間標記的數據流了,后面就能做一些窗口的操作。
2.2.2 過濾出點擊事件
在開始窗口操作之前,先回顧下需求“每隔5分鍾輸出過去一小時內點擊量最多的前N個商品”。由於原始數據中存在點擊、購買、收藏、喜歡各種行為的數據,但是我們只需要統計點擊量,所以先使用filter將點擊行為數據過濾出來。
.filter(_.behavior == "pv")
2.2.3 設置滑動窗口,統計點擊量
由於要每隔5分鍾統計一次最近一小時每個商品的點擊量,所以窗口大小是一小時,每隔5分鍾滑動一次。即分別要統計[09:00, 10:00), [09:05, 10:05), [09:10, 10:10)…等窗口的商品點擊量。是一個常見的滑動窗口需求(Sliding Window)。
.keyBy("itemId")
.timeWindow(Time.minutes(60), Time.minutes(5))
.aggregate(new CountAgg(), new WindowResultFunction());
我們使用.keyBy("itemId")對商品進行分組,使用.timeWindow(Time size, Time slide)對每個商品做滑動窗口(1小時窗口,5分鍾滑動一次)。然后我們使用 .aggregate(AggregateFunction af, WindowFunction wf) 做增量的聚合操作,它能使用AggregateFunction提前聚合掉數據,減少state的存儲壓力。較之 .apply(WindowFunction wf) 會將窗口中的數據都存儲下來,最后一起計算要高效地多。這里的CountAgg實現了AggregateFunction接口,功能是統計窗口中的條數,即遇到一條數據就加一。
// COUNT統計的聚合函數實現,每出現一條記錄就加一
class CountAgg extends AggregateFunction[UserBehavior, Long, Long] {
override def createAccumulator(): Long = 0L
override def add(userBehavior: UserBehavior, acc: Long): Long = acc + 1
override def getResult(acc: Long): Long = acc
override def merge(acc1: Long, acc2: Long): Long = acc1 + acc2
}
聚合操作.aggregate(AggregateFunction af, WindowFunction wf)的第二個參數WindowFunction將每個key每個窗口聚合后的結果帶上其他信息進行輸出。我們這里實現的WindowResultFunction將<主鍵商品ID,窗口,點擊量>封裝成了ItemViewCount進行輸出。
// 商品點擊量(窗口操作的輸出類型)
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)
代碼如下:
// 用於輸出窗口的結果
class WindowResultFunction extends WindowFunction[Long, ItemViewCount, Tuple, TimeWindow] {
override def apply(key: Tuple, window: TimeWindow, aggregateResult: Iterable[Long],
collector: Collector[ItemViewCount]) : Unit = {
val itemId: Long = key.asInstanceOf[Tuple1[Long]].f0
val count = aggregateResult.iterator.next
collector.collect(ItemViewCount(itemId, window.getEnd, count))
}
}
現在我們就得到了每個商品在每個窗口的點擊量的數據流。
2.2.4 計算最熱門Top N商品
為了統計每個窗口下最熱門的商品,我們需要再次按窗口進行分組,這里根據ItemViewCount中的windowEnd進行keyBy()操作。然后使用ProcessFunction實現一個自定義的TopN函數TopNHotItems來計算點擊量排名前3名的商品,並將排名結果格式化成字符串,便於后續輸出。
.keyBy("windowEnd")
.process(new TopNHotItems(3)); // 求點擊量前3名的商品
ProcessFunction是Flink提供的一個low-level API,用於實現更高級的功能。它主要提供了定時器timer的功能(支持EventTime或ProcessingTime)。本案例中我們將利用timer來判斷何時收齊了某個window下所有商品的點擊量數據。由於Watermark的進度是全局的,在processElement方法中,每當收到一條數據ItemViewCount,我們就注冊一個windowEnd+1的定時器(Flink框架會自動忽略同一時間的重復注冊)。windowEnd+1的定時器被觸發時,意味着收到了windowEnd+1的Watermark,即收齊了該windowEnd下的所有商品窗口統計值。我們在onTimer()中處理將收集的所有商品及點擊量進行排序,選出TopN,並將排名信息格式化成字符串后進行輸出。
這里我們還使用了ListState<ItemViewCount>來存儲收到的每條ItemViewCount消息,保證在發生故障時,狀態數據的不丟失和一致性。ListState是Flink提供的類似Java List接口的State API,它集成了框架的checkpoint機制,自動做到了exactly-once的語義保證。
// 求某個窗口中前 N 名的熱門點擊商品,key 為窗口時間戳,輸出為 TopN 的結果字符串
class TopNHotItems(topSize: Int) extends KeyedProcessFunction[Tuple, ItemViewCount, String] {
private var itemState : ListState[ItemViewCount] = _
override def open(parameters: Configuration): Unit = {
super.open(parameters)
// 命名狀態變量的名字和狀態變量的類型
val itemsStateDesc = new ListStateDescriptor[ItemViewCount]("itemState-state", classOf[ItemViewCount])
// 定義狀態變量
itemState = getRuntimeContext.getListState(itemsStateDesc)
}
override def processElement(input: ItemViewCount, context: KeyedProcessFunction[Tuple, ItemViewCount, String]#Context, collector: Collector[String]): Unit = {
// 每條數據都保存到狀態中
itemState.add(input)
// 注冊 windowEnd+1 的 EventTime Timer, 當觸發時,說明收齊了屬於windowEnd窗口的所有商品數據
// 也就是當程序看到windowend + 1的水位線watermark時,觸發onTimer回調函數
context.timerService.registerEventTimeTimer(input.windowEnd + 1)
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
// 獲取收到的所有商品點擊量
val allItems: ListBuffer[ItemViewCount] = ListBuffer()
import scala.collection.JavaConversions._
for (item <- itemState.get) {
allItems += item
}
// 提前清除狀態中的數據,釋放空間
itemState.clear()
// 按照點擊量從大到小排序
val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topSize)
// 將排名信息格式化成 String, 便於打印
val result: StringBuilder = new StringBuilder
result.append("====================================\n")
result.append("時間: ").append(new Timestamp(timestamp - 1)).append("\n")
for(i <- sortedItems.indices){
val currentItem: ItemViewCount = sortedItems(i)
// e.g. No1: 商品ID=12224 瀏覽量=2413
result.append("No").append(i+1).append(":")
.append(" 商品ID=").append(currentItem.itemId)
.append(" 瀏覽量=").append(currentItem.count).append("\n")
}
result.append("====================================\n\n")
// 控制輸出頻率,模擬實時滾動結果
Thread.sleep(1000)
out.collect(result.toString)
}
}
最后我們可以在main函數中將結果打印輸出到控制台,方便實時觀測:
.print();
至此整個程序代碼全部完成,我們直接運行main函數,就可以在控制台看到不斷輸出的各個時間點統計出的熱門商品。
2.2.5 完整代碼
最終完整代碼如下:
case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)
object HotItems {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val stream = env
.readTextFile("YOUR_PATH\\resources\\UserBehavior.csv")
.map(line => {
val linearray = line.split(",")
UserBehavior(linearray(0).toLong, linearray(1).toLong, linearray(2).toInt, linearray(3), linearray(4).toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000)
.filter(_.behavior=="pv")
.keyBy("itemId")
.timeWindow(Time.minutes(60), Time.minutes(5))
.aggregate(new CountAgg(), new WindowResultFunction())
.keyBy(1)
.process(new TopNHotItems(3))
.print()
env.execute("Hot Items Job")
}
// COUNT 統計的聚合函數實現,每出現一條記錄加一
class CountAgg extends AggregateFunction[UserBehavior, Long, Long] {
override def createAccumulator(): Long = 0L
override def add(userBehavior: UserBehavior, acc: Long): Long = acc + 1
override def getResult(acc: Long): Long = acc
override def merge(acc1: Long, acc2: Long): Long = acc1 + acc2
}
// 用於輸出窗口的結果
class WindowResultFunction extends WindowFunction[Long, ItemViewCount, Tuple, TimeWindow] {
override def apply(key: Tuple, window: TimeWindow, aggregateResult: Iterable[Long],
collector: Collector[ItemViewCount]) : Unit = {
val itemId: Long = key.asInstanceOf[Tuple1[Long]].f0
val count = aggregateResult.iterator.next
collector.collect(ItemViewCount(itemId, window.getEnd, count))
}
}
// 求某個窗口中前 N 名的熱門點擊商品,key 為窗口時間戳,輸出為 TopN 的結果字符串
class TopNHotItems(topSize: Int) extends KeyedProcessFunction[Tuple, ItemViewCount, String] {
private var itemState : ListState[ItemViewCount] = _
override def open(parameters: Configuration): Unit = {
super.open(parameters)
// 命名狀態變量的名字和狀態變量的類型
val itemsStateDesc = new ListStateDescriptor[ItemViewCount]("itemState-state", classOf[ItemViewCount])
// 從運行時上下文中獲取狀態並賦值
itemState = getRuntimeContext.getListState(itemsStateDesc)
}
override def processElement(input: ItemViewCount, context: KeyedProcessFunction[Tuple, ItemViewCount, String]#Context, collector: Collector[String]): Unit = {
// 每條數據都保存到狀態中
itemState.add(input)
// 注冊 windowEnd+1 的 EventTime Timer, 當觸發時,說明收齊了屬於windowEnd窗口的所有商品數據
// 也就是當程序看到windowend + 1的水位線watermark時,觸發onTimer回調函數
context.timerService.registerEventTimeTimer(input.windowEnd + 1)
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
// 獲取收到的所有商品點擊量
val allItems: ListBuffer[ItemViewCount] = ListBuffer()
import scala.collection.JavaConversions._
for (item <- itemState.get) {
allItems += item
}
// 提前清除狀態中的數據,釋放空間
itemState.clear()
// 按照點擊量從大到小排序
val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topSize)
// 將排名信息格式化成 String, 便於打印
val result: StringBuilder = new StringBuilder
result.append("====================================\n")
result.append("時間: ").append(new Timestamp(timestamp - 1)).append("\n")
for(i <- sortedItems.indices){
val currentItem: ItemViewCount = sortedItems(i)
// e.g. No1: 商品ID=12224 瀏覽量=2413
result.append("No").append(i+1).append(":")
.append(" 商品ID=").append(currentItem.itemId)
.append(" 瀏覽量=").append(currentItem.count).append("\n")
}
result.append("====================================\n\n")
// 控制輸出頻率,模擬實時滾動結果
Thread.sleep(1000)
out.collect(result.toString)
}
}
}
2.2.6 更換Kafka 作為數據源
實際生產環境中,我們的數據流往往是從Kafka獲取到的。如果要讓代碼更貼近生產實際,我們只需將source更換為Kafka即可:
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val stream = env
.addSource(new FlinkKafkaConsumer[String]("hotitems", new SimpleStringSchema(), properties))
當然,根據實際的需要,我們還可以將Sink指定為Kafka、ES、Redis或其它存儲,這里就不一一展開實現了。
第3章 實時流量統計
3.1 模塊創建和數據准備
在UserBehaviorAnalysis下新建一個 maven module作為子項目,命名為NetworkFlowAnalysis。在這個子模塊中,我們同樣並沒有引入更多的依賴,所以也不需要改動pom文件。
在src/main/目錄下,將默認源文件目錄java改名為scala。將apache服務器的日志文件apache.log復制到資源文件目錄src/main/resources下,我們將從這里讀取數據。
當然,我們也可以仍然用UserBehavior.csv作為數據源,這時我們分析的就不是每一次對服務器的訪問請求了,而是具體的頁面瀏覽(“pv”)操作。
3.2 基於服務器log的熱門頁面瀏覽量統計
我們現在要實現的模塊是 “實時流量統計”。對於一個電商平台而言,用戶登錄的入口流量、不同頁面的訪問流量都是值得分析的重要數據,而這些數據,可以簡單地從web服務器的日志中提取出來。
我們在這里先實現“熱門頁面瀏覽數”的統計,也就是讀取服務器日志中的每一行log,統計在一段時間內用戶訪問每一個url的次數,然后排序輸出顯示。
具體做法為:每隔5秒,輸出最近10分鍾內訪問量最多的前N個URL。可以看出,這個需求與之前“實時熱門商品統計”非常類似,所以我們完全可以借鑒此前的代碼。
在src/main/scala下創建NetworkFlow.scala文件,新建一個單例對象。定義樣例類ApacheLogEvent,這是輸入的日志數據流;另外還有UrlViewCount,這是窗口操作統計的輸出數據類型。在main函數中創建StreamExecutionEnvironment 並做配置,然后從apache.log文件中讀取數據,並包裝成ApacheLogEvent類型。
需要注意的是,原始日志中的時間是“dd/MM/yyyy:HH:mm:ss”的形式,需要定義一個DateTimeFormat將其轉換為我們需要的時間戳格式:
.map(line => {
val linearray = line.split(" ")
val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
val timestamp = sdf.parse(linearray(3)).getTime
ApacheLogEvent(linearray(0), linearray(2), timestamp,
linearray(5), linearray(6))
})
完整代碼如下:
NetworkFlowAnalysis/src/main/scala/NetworkFlow.scala
case class ApacheLogEvent(ip: String, userId: String, eventTime: Long, method: String, url: String)
case class UrlViewCount(url: String, windowEnd: Long, count: Long)
object NetworkFlow{
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val stream = env
// 以window下為例,需替換成自己的路徑
.readTextFile("YOUR_PATH\\resources\\apache.log")
.map(line => {
val linearray = line.split(" ")
val simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
val timestamp = simpleDateFormat.parse(linearray(3)).getTime
ApacheLogEvent(linearray(0), linearray(2), timestamp, linearray(5), linearray(6))
})
.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent]
(Time.milliseconds(1000)) {
override def extractTimestamp(t: ApacheLogEvent): Long = {
t.eventTime
}
})
.filter( data => {
val pattern = "^((?!\\.(css|js)$).)*$".r
(pattern findFirstIn data.url).nonEmpty
} )
.keyBy("url")
.timeWindow(Time.minutes(10), Time.seconds(5))
.aggregate(new CountAgg(), new WindowResultFunction())
.keyBy(1)
.process(new TopNHotUrls(5))
.print()
env.execute("Network Flow Job")
}
class CountAgg extends AggregateFunction[ApacheLogEvent, Long, Long] {
override def createAccumulator(): Long = 0L
override def add(apacheLogEvent: ApacheLogEvent, acc: Long): Long = acc + 1
override def getResult(acc: Long): Long = acc
override def merge(acc1: Long, acc2: Long): Long = acc1 + acc2
}
class WindowResultFunction extends WindowFunction[Long, UrlViewCount, Tuple, TimeWindow] {
override def apply(key: Tuple, window: TimeWindow, aggregateResult: Iterable[Long], collector: Collector[UrlViewCount]) : Unit = {
val url: String = key.asInstanceOf[Tuple1[String]].f0
val count = aggregateResult.iterator.next
collector.collect(UrlViewCount(url, window.getEnd, count))
}
}
class TopNHotUrls(topsize: Int) extends KeyedProcessFunction[Tuple, UrlViewCount, String] {
private var urlState : ListState[UrlViewCount] = _
override def open(parameters: Configuration): Unit = {
super.open(parameters)
val urlStateDesc = new ListStateDescriptor[UrlViewCount]("urlState-state", classOf[UrlViewCount])
urlState = getRuntimeContext.getListState(urlStateDesc)
}
override def processElement(input: UrlViewCount, context: KeyedProcessFunction[Tuple, UrlViewCount, String]#Context, collector: Collector[String]): Unit = {
// 每條數據都保存到狀態中
urlState.add(input)
context.timerService.registerEventTimeTimer(input.windowEnd + 1)
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, UrlViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
// 獲取收到的所有URL訪問量
val allUrlViews: ListBuffer[UrlViewCount] = ListBuffer()
import scala.collection.JavaConversions._
for (urlView <- urlState.get) {
allUrlViews += urlView
}
// 提前清除狀態中的數據,釋放空間
urlState.clear()
// 按照訪問量從大到小排序
val sortedUrlViews = allUrlViews.sortBy(_.count)(Ordering.Long.reverse)
.take(topSize)
// 將排名信息格式化成 String, 便於打印
var result: StringBuilder = new StringBuilder
result.append("====================================\n")
result.append("時間: ").append(new Timestamp(timestamp - 1)).append("\n")
for (i <- sortedUrlViews.indices) {
val currentUrlView: UrlViewCount = sortedUrlViews(i)
// e.g. No1: URL=/blog/tags/firefox?flav=rss20 流量=55
result.append("No").append(i+1).append(":")
.append(" URL=").append(currentUrlView.url)
.append(" 流量=").append(currentUrlView.count).append("\n")
}
result.append("====================================\n\n")
// 控制輸出頻率,模擬實時滾動結果
Thread.sleep(1000)
out.collect(result.toString)
}
}
}
3.3 基於埋點日志數據的網絡流量統計
我們發現,從web服務器log中得到的url,往往更多的是請求某個資源地址(/*.js、/*.css),如果要針對頁面進行統計往往還需要進行過濾。而在實際電商應用中,相比每個單獨頁面的訪問量,我們可能更加關心整個電商網站的網絡流量。這個指標,除了合並之前每個頁面的統計結果之外,還可以通過統計埋點日志數據中的“pv”行為來得到。
3.3.1 網站總瀏覽量(PV)的統計
衡量網站流量一個最簡單的指標,就是網站的頁面瀏覽量(Page View,PV)。用戶每次打開一個頁面便記錄1次PV,多次打開同一頁面則瀏覽量累計。一般來說,PV與來訪者的數量成正比,但是PV並不直接決定頁面的真實來訪者數量,如同一個來訪者通過不斷的刷新頁面,也可以制造出非常高的PV。
我們知道,用戶瀏覽頁面時,會從瀏覽器向網絡服務器發出一個請求(Request),網絡服務器接到這個請求后,會將該請求對應的一個網頁(Page)發送給瀏覽器,從而產生了一個PV。所以我們的統計方法,可以是從web服務器的日志中去提取對應的頁面訪問然后統計,就向上一節中的做法一樣;也可以直接從埋點日志中提取用戶發來的頁面請求,從而統計出總瀏覽量。
所以,接下來我們用UserBehavior.csv作為數據源,實現一個網站總瀏覽量的統計。我們可以設置滾動時間窗口,實時統計每小時內的網站PV。
在src/main/scala下創建PageView.scala文件,具體代碼如下:
NetworkFlowAnalysis/src/main/scala/PageView.scala
case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)
object PageView {
def main(args: Array[String]): Unit = {
val resourcesPath = getClass.getResource("/UserBehaviorTest.csv")
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val stream = env.readTextFile(resourcesPath.getPath)
.map(data => {
val dataArray = data.split(",")
UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000)
.filter(_.behavior == "pv")
.map(x => ("pv", 1))
.keyBy(_._1)
.timeWindow(Time.seconds(60 * 60))
.sum(1)
.print()
env.execute("Page View Job")
}
}
3.3.2 網站獨立訪客數(UV)的統計
在上節的例子中,我們統計的是所有用戶對頁面的所有瀏覽行為,也就是說,同一用戶的瀏覽行為會被重復統計。而在實際應用中,我們往往還會關注,在一段時間內到底有多少不同的用戶訪問了網站。
另外一個統計流量的重要指標是網站的獨立訪客數(Unique Visitor,UV)。UV指的是一段時間(比如一小時)內訪問網站的總人數,1天內同一訪客的多次訪問只記錄為一個訪客。通過IP和cookie一般是判斷UV值的兩種方式。當客戶端第一次訪問某個網站服務器的時候,網站服務器會給這個客戶端的電腦發出一個Cookie,通常放在這個客戶端電腦的C盤當中。在這個Cookie中會分配一個獨一無二的編號,這其中會記錄一些訪問服務器的信息,如訪問時間,訪問了哪些頁面等等。當你下次再訪問這個服務器的時候,服務器就可以直接從你的電腦中找到上一次放進去的Cookie文件,並且對其進行一些更新,但那個獨一無二的編號是不會變的。
當然,對於UserBehavior數據源來說,我們直接可以根據userId來區分不同的用戶。
在src/main/scala下創建UniqueVisitor.scala文件,具體代碼如下:
NetworkFlowAnalysis/src/main/scala/UniqueVisitor.scala
case class UvCount(windowEnd: Long, count: Long)
object UniqueVisitor {
def main(args: Array[String]): Unit = {
val resourcesPath = getClass.getResource("/UserBehaviorTest.csv")
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val stream = env
.readTextFile(resourcesPath.getPath)
.map(line => {
val linearray = line.split(",")
UserBehavior(linearray(0).toLong, linearray(1).toLong, linearray(2).toInt, linearray(3), linearray(4).toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000)
.filter(_.behavior == "pv")
.timeWindowAll(Time.seconds(60 * 60))
.apply(new UvCountByWindow())
.print()
env.execute("Unique Visitor Job")
}
}
class UvCountByWindow extends AllWindowFunction[UserBehavior, UvCount, TimeWindow] {
override def apply(window: TimeWindow,
input: Iterable[UserBehavior],
out: Collector[UvCount]): Unit = {
val s: collection.mutable.Set[Long] = collection.mutable.Set()
var idSet = Set[Long]()
for ( userBehavior <- input) {
idSet += userBehavior.userId
}
out.collect(UvCount(window.getEnd, idSet.size))
}
}
3.3.3 使用布隆過濾器的UV統計
在上節的例子中,我們把所有數據的userId都存在了窗口計算的狀態里,在窗口收集數據的過程中,狀態會不斷增大。一般情況下,只要不超出內存的承受范圍,這種做法也沒什么問題;但如果我們遇到的數據量很大呢?
把所有數據暫存放到內存里,顯然不是一個好注意。我們會想到,可以利用redis這種內存級k-v數據庫,為我們做一個緩存。但如果我們遇到的情況非常極端,數據大到驚人呢?比如上億級的用戶,要去重計算UV。
如果放到redis中,億級的用戶id(每個20字節左右的話)可能需要幾G甚至幾十G的空間來存儲。當然放到redis中,用集群進行擴展也不是不可以,但明顯代價太大了。
一個更好的想法是,其實我們不需要完整地存儲用戶ID的信息,只要知道他在不在就行了。所以其實我們可以進行壓縮處理,用一位(bit)就可以表示一個用戶的狀態。這個思想的具體實現就是布隆過濾器(Bloom Filter)。
本質上布隆過濾器是一種數據結構,比較巧妙的概率型數據結構(probabilistic data structure),特點是高效地插入和查詢,可以用來告訴你 “某樣東西一定不存在或者可能存在”。
它本身是一個很長的二進制向量,既然是二進制的向量,那么顯而易見的,存放的不是0,就是1。相比於傳統的 List、Set、Map 等數據結構,它更高效、占用空間更少,但是缺點是其返回的結果是概率性的,而不是確切的。
我們的目標就是,利用某種方法(一般是Hash函數)把每個數據,對應到一個位圖的某一位上去;如果數據存在,那一位就是1,不存在則為0。
接下來我們就來具體實現一下。
注意這里我們用到了redis連接存取數據,所以需要加入redis客戶端的依賴:
<dependencies>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.8.1</version>
</dependency>
</dependencies>
在src/main/scala下創建UniqueVisitor.scala文件,具體代碼如下:
NetworkFlowAnalysis/src/main/scala/UvWithBloom.scala
object UvWithBloomFilter {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val resourcesPath = getClass.getResource("/UserBehaviorTest.csv")
val stream = env
.readTextFile(resourcesPath.getPath)
.map(data => {
val dataArray = data.split(",")
UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000)
.filter(_.behavior == "pv")
.map(data => ("dummyKey", data.userId))
.keyBy(_._1)
.timeWindow(Time.seconds(60 * 60))
.trigger(new MyTrigger()) // 自定義窗口觸發規則
.process(new UvCountWithBloom()) // 自定義窗口處理規則
stream.print()
env.execute("Unique Visitor with bloom Job")
}
}
// 自定義觸發器
class MyTrigger() extends Trigger[(String, Long), TimeWindow] {
override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}
override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}
override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {
}
override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
// 每來一條數據,就觸發窗口操作並清空
TriggerResult.FIRE_AND_PURGE
}
}
// 自定義窗口處理函數
class UvCountWithBloom() extends ProcessWindowFunction[(String, Long), UvCount, String, TimeWindow] {
// 創建redis連接
lazy val jedis = new Jedis("localhost", 6379)
lazy val bloom = new Bloom(1 << 29)
override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = {
val storeKey = context.window.getEnd.toString
var count = 0L
if (jedis.hget("count", storeKey) != null) {
count = jedis.hget("count", storeKey).toLong
}
val userId = elements.last._2.toString
val offset = bloom.hash(userId, 61)
val isExist = jedis.getbit(storeKey, offset)
if (!isExist) {
jedis.setbit(storeKey, offset, true)
jedis.hset("count", storeKey, (count + 1).toString)
out.collect(UvCount(storeKey.toLong, count + 1))
} else {
out.collect(UvCount(storeKey.toLong, count))
}
}
}
// 定義一個布隆過濾器
class Bloom(size: Long) extends Serializable {
private val cap = size
def hash(value: String, seed: Int): Long = {
var result = 0
for (i <- 0 until value.length) {
// 最簡單的hash算法,每一位字符的ascii碼值,乘以seed之后,做疊加
result = result * seed + value.charAt(i)
}
(cap - 1) & result
}
}
第4章 市場營銷商業指標統計分析
4.1 模塊創建和數據准備
繼續在UserBehaviorAnalysis下新建一個 maven module作為子項目,命名為MarketAnalysis。
這個模塊中我們沒有現成的數據,所以會用自定義的測試源來產生測試數據流,或者直接用生成測試數據文件。
4.2 APP市場推廣統計
隨着智能手機的普及,在如今的電商網站中已經有越來越多的用戶來自移動端,相比起傳統瀏覽器的登錄方式,手機APP成為了更多用戶訪問電商網站的首選。對於電商企業來說,一般會通過各種不同的渠道對自己的APP進行市場推廣,而這些渠道的統計數據(比如,不同網站上廣告鏈接的點擊量、APP下載量)就成了市場營銷的重要商業指標。
首先我們考察分渠道的市場推廣統計。在src/main/scala下創建AppMarketingByChannel.scala文件。由於沒有現成的數據,所以我們需要自定義一個測試源來生成用戶行為的事件流。
4.2.1 自定義測試數據源
定義一個源數據的樣例類MarketingUserBehavior,再定義一個SourceFunction,用於產生用戶行為源數據,命名為SimulatedEventSource:
case class MarketingUserBehavior(userId: Long, behavior: String, channel: String, timestamp: Long)
class SimulatedEventSource extends RichParallelSourceFunction[MarketingUserBehavior] {
var running = true
val channelSet: Seq[String] = Seq("AppStore", "XiaomiStore", "HuaweiStore", "weibo", "wechat", "tieba")
val behaviorTypes: Seq[String] = Seq("BROWSE", "CLICK", "PURCHASE", "UNINSTALL")
val rand: Random = Random
override def run(ctx: SourceContext[MarketingUserBehavior]): Unit = {
val maxElements = Long.MaxValue
var count = 0L
while (running && count < maxElements) {
val id = UUID.randomUUID().toString.toLong
val behaviorType = behaviorTypes(rand.nextInt(behaviorTypes.size))
val channel = channelSet(rand.nextInt(channelSet.size))
val ts = System.currentTimeMillis()
ctx.collectWithTimestamp(MarketingUserBehavior(id, behaviorType, channel, ts), ts)
count += 1
TimeUnit.MILLISECONDS.sleep(5L)
}
}
override def cancel(): Unit = running = false
}
4.2.2 分渠道統計
另外定義一個窗口處理的輸出結果樣例類MarketingViewCount,並自定義ProcessWindowFunction進行處理,代碼如下:
case class MarketingCountView(windowStart: Long, windowEnd: Long, channel: String, behavior: String, count: Long)
object AppMarketingByChannel {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val stream: DataStream[MarketingUserBehavior] = env.addSource(new SimulatedEventSource)
.assignAscendingTimestamps(_.timestamp)
stream
.filter(_.behavior != "UNINSTALL")
.map(data => {
((data.channel, data.behavior), 1L)
})
.keyBy(_._1)
.timeWindow(Time.hours(1), Time.seconds(1))
.process(new MarketingCountByChannel())
.print()
env.execute(getClass.getSimpleName)
}
}
class MarketingCountByChannel() extends ProcessWindowFunction[((String, String), Long), MarketingViewCount, (String, String), TimeWindow] {
override def process(key: (String, String),
context: Context,
elements: Iterable[((String, String), Long)],
out: Collector[MarketingViewCount]): Unit = {
val startTs = context.window.getStart
val endTs = context.window.getEnd
val channel = key._1
val behaviorType = key._2
val count = elements.size
out.collect( MarketingViewCount(formatTs(startTs), formatTs(endTs), channel, behaviorType, count) )
}
private def formatTs (ts: Long) = {
val df = new SimpleDateFormat ("yyyy/MM/dd-HH:mm:ss")
df.format (new Date (ts) )
}
}
4.2.3 不分渠道(總量)統計
同樣我們還可以考察不分渠道的市場推廣統計,這樣得到的就是所有渠道推廣的總量。在src/main/scala下創建AppMarketingStatistics.scala文件,代碼如下:
object AppMarketingStatistics {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val stream: DataStream[MarketingUserBehavior] = env.addSource(new SimulatedEventSource)
.assignAscendingTimestamps(_.timestamp)
stream
.filter(_.behavior != "UNINSTALL")
.map(data => {
("dummyKey", 1L)
})
.keyBy(_._1)
.timeWindow(Time.hours(1), Time.seconds(1))
.process(new MarketingCountTotal())
.print()
env.execute(getClass.getSimpleName)
}
}
class MarketingCountTotal() extends ProcessWindowFunction[(String, Long), MarketingViewCount, String, TimeWindow]{
override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[MarketingViewCount]): Unit = {
val startTs = context.window.getStart
val endTs = context.window.getEnd
val count = elements.size
out.collect( MarketingViewCount(formatTs(startTs), formatTs(endTs), "total", "total", count) )
}
private def formatTs (ts: Long) = {
val df = new SimpleDateFormat ("yyyy/MM/dd-HH:mm:ss")
df.format (new Date (ts) )
}
}
4.3 頁面廣告分析
電商網站的市場營銷商業指標中,除了自身的APP推廣,還會考慮到頁面上的廣告投放(包括自己經營的產品和其它網站的廣告)。所以廣告相關的統計分析,也是市場營銷的重要指標。
對於廣告的統計,最簡單也最重要的就是頁面廣告的點擊量,網站往往需要根據廣告點擊量來制定定價策略和調整推廣方式,而且也可以借此收集用戶的偏好信息。更加具體的應用是,我們可以根據用戶的地理位置進行划分,從而總結出不同省份用戶對不同廣告的偏好,這樣更有助於廣告的精准投放。
4.3.1 頁面廣告點擊量統計
接下來我們就進行頁面廣告按照省份划分的點擊量的統計。在src/main/scala下創建AdStatisticsByGeo.scala文件。同樣由於沒有現成的數據,我們定義一些測試數據,放在AdClickLog.csv中,用來生成用戶點擊廣告行為的事件流。
在代碼中我們首先定義源數據的樣例類AdClickLog,以及輸出統計數據的樣例類CountByProvince。主函數中先以province進行keyBy,然后開一小時的時間窗口,滑動距離為5秒,統計窗口內的點擊事件數量。具體代碼實現如下:
case class AdClickLog(userId: Long, adId: Long, province: String, city: String, timestamp: Long)
case class CountByProvince(windowEnd: String, province: String, count: Long)
object AdStatisticsByGeo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val adLogStream: DataStream[AdClickLog] = env.readTextFile("YOURPATH\\resources\\AdClickLog.csv")
.map(data => {
val dataArray = data.split(",")
AdClickLog(dataArray(0).toLong, dataArray(1).toLong, dataArray(2), dataArray(3), dataArray(4).toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000L)
val adCountStream = adLogStream
.keyBy(_.province)
.timeWindow(Time.minutes(60), Time.seconds(5))
.aggregate(new CountAgg(), new CountResult())
.print()
env.execute("ad statistics job")
}
}
class CountAgg() extends AggregateFunction[AdClickLog, Long, Long]{
override def add(value: AdClickLog, accumulator: Long): Long = accumulator + 1L
override def createAccumulator(): Long = 0L
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a + b
}
class CountResult() extends WindowFunction[Long, CountByProvince, String, TimeWindow]{
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[CountByProvince]): Unit = {
out.collect(CountByProvince(formatTs(window.getEnd), key, input.iterator.next()))
}
private def formatTs (ts: Long) = {
val df = new SimpleDateFormat ("yyyy/MM/dd-HH:mm:ss")
df.format (new Date (ts) )
}
}
4.3.2 黑名單過濾
上節我們進行的點擊量統計,同一用戶的重復點擊是會疊加計算的。在實際場景中,同一用戶確實可能反復點開同一個廣告,這也說明了用戶對廣告更大的興趣;但是如果用戶在一段時間非常頻繁地點擊廣告,這顯然不是一個正常行為,有刷點擊量的嫌疑。所以我們可以對一段時間內(比如一天內)的用戶點擊行為進行約束,如果對同一個廣告點擊超過一定限額(比如100次),應該把該用戶加入黑名單並報警,此后其點擊行為不應該再統計。
具體代碼實現如下:
case class AdClickLog(userId: Long, adId: Long, province: String, city: String, timestamp: Long)
case class CountByProvince(windowEnd: String, province: String, count: Long)
case class BlackListWarning(userId: Long, adId: Long, msg: String)
object AdStatisticsByGeo {
val blackListOutputTag = new OutputTag[BlackListWarning]("blacklist")
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val adLogStream: DataStream[AdClickLog] = env.readTextFile("D:\\Projects\\BigData\\UserBehaviorAnalysis\\MarketAnalysis\\src\\main\\resources\\AdClickLog.csv")
.map(data => {
val dataArray = data.split(",")
AdClickLog(dataArray(0).toLong, dataArray(1).toLong, dataArray(2), dataArray(3), dataArray(4).toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000L)
val filterBlackListStream = adLogStream
.keyBy(logData => (logData.userId, logData.adId))
.process(new FilterBlackListUser(100))
val adCountStream = filterBlackListStream
.keyBy(_.province)
.timeWindow(Time.minutes(60), Time.seconds(5))
.aggregate(new countAgg(), new countResult())
.print()
filterBlackListStream.getSideOutput(blackListOutputTag)
.print("black list")
env.execute("ad statistics job")
}
class FilterBlackListUser(maxCount: Long) extends KeyedProcessFunction[(Long, Long), AdClickLog, AdClickLog] {
// 保存當前用戶對當前廣告的點擊量
lazy val countState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("count-state", classOf[Long]))
// 標記當前(用戶,廣告)作為key是否第一次發送到黑名單
lazy val firstSent: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("firstsent-state", classOf[Boolean]))
// 保存定時器觸發的時間戳,屆時清空重置狀態
lazy val resetTime: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("resettime-state", classOf[Long]))
override def processElement(value: AdClickLog, ctx: KeyedProcessFunction[(Long, Long), AdClickLog, AdClickLog]#Context, out: Collector[AdClickLog]): Unit = {
val curCount = countState.value()
// 如果是第一次處理,注冊一個定時器,每天 00:00 觸發清除
if( curCount == 0 ){
val ts = (ctx.timerService().currentProcessingTime() / (24*60*60*1000) + 1) * (24*60*60*1000)
resetTime.update(ts)
ctx.timerService().registerProcessingTimeTimer(ts)
}
// 如果計數已經超過上限,則加入黑名單,用側輸出流輸出報警信息
if( curCount > maxCount ){
if( !firstSent.value() ){
firstSent.update(true)
ctx.output( blackListOutputTag, BlackListWarning(value.userId, value.adId, "Click over " + maxCount + " times today.") )
}
return
}
// 點擊計數加1
countState.update(curCount + 1)
out.collect( value )
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[(Long, Long), AdClickLog, AdClickLog]#OnTimerContext, out: Collector[AdClickLog]): Unit = {
if( timestamp == resetTime.value() ){
firstSent.clear()
countState.clear()
}
}
}
}
class countAgg() extends AggregateFunction[AdClickLog, Long, Long] {
override def add(value: AdClickLog, accumulator: Long): Long = accumulator + 1L
override def createAccumulator(): Long = 0L
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a + b
}
class countResult() extends WindowFunction[Long, CountByProvince, String, TimeWindow] {
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[CountByProvince]): Unit = {
out.collect(CountByProvince(formatTs(window.getEnd), key, input.iterator.next()))
}
private def formatTs(ts: Long) = {
val df = new SimpleDateFormat("yyyy/MM/dd-HH:mm:ss")
df.format(new Date(ts))
}
}
第5章 惡意登錄監控
5.1 模塊創建和數據准備
繼續在UserBehaviorAnalysis下新建一個 maven module作為子項目,命名為LoginFailDetect。在這個子模塊中,我們將會用到flink的CEP庫來實現事件流的模式匹配,所以需要在pom文件中引入CEP的相關依賴:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
同樣,在src/main/目錄下,將默認源文件目錄java改名為scala。
5.2 代碼實現
對於網站而言,用戶登錄並不是頻繁的業務操作。如果一個用戶短時間內頻繁登錄失敗,就有可能是出現了程序的惡意攻擊,比如密碼暴力破解。因此我們考慮,應該對用戶的登錄失敗動作進行統計,具體來說,如果同一用戶(可以是不同IP)在2秒之內連續兩次登錄失敗,就認為存在惡意登錄的風險,輸出相關的信息進行報警提示。這是電商網站、也是幾乎所有網站風控的基本一環。
5.2.1 狀態編程
由於同樣引入了時間,我們可以想到,最簡單的方法其實與之前的熱門統計類似,只需要按照用戶ID分流,然后遇到登錄失敗的事件時將其保存在ListState中,然后設置一個定時器,2秒后觸發。定時器觸發時檢查狀態中的登錄失敗事件個數,如果大於等於2,那么就輸出報警信息。
在src/main/scala下創建LoginFail.scala文件,新建一個單例對象。定義樣例類LoginEvent,這是輸入的登錄事件流。登錄數據本應該從UserBehavior日志里提取,由於UserBehavior.csv中沒有做相關埋點,我們從另一個文件LoginLog.csv中讀取登錄數據。
代碼如下:
LoginFailDetect/src/main/scala/LoginFail.scala
case class LoginEvent(userId: Long, ip: String, eventType: String, eventTime: Long)
object LoginFail {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val loginEventStream = env.readTextFile("YOUR_PATH\\resources\\LoginLog.csv")
.map( data => {
val dataArray = data.split(",")
LoginEvent(dataArray(0).toLong, dataArray(1), dataArray(2), dataArray(3).toLong)
})
.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent]
(Time.milliseconds(3000)) {
override def extractTimestamp(element: ApacheLogEvent): Long ={
element.eventTime * 1000L
}
})
.keyBy(_.userId)
.process(new MatchFunction())
.print()
env.execute("Login Fail Detect Job")
}
class MatchFunction extends KeyedProcessFunction[Long, LoginEvent, LoginEvent] {
// 定義狀態變量
lazy val loginState: ListState[LoginEvent] = getRuntimeContext.getListState(
new ListStateDescriptor[LoginEvent]("saved login", classOf[LoginEvent]))
override def processElement(login: LoginEvent,
context: KeyedProcessFunction[Long, LoginEvent,
LoginEvent]#Context, out: Collector[LoginEvent]): Unit = {
if (login.eventType == "fail") {
loginState.add(login)
}
// 注冊定時器,觸發事件設定為2秒后
context.timerService.registerEventTimeTimer(login.eventTime * 1000 + 2 * 1000)
}
override def onTimer(timestamp: Long,
ctx: KeyedProcessFunction[Long, LoginEvent,
LoginEvent]#OnTimerContext, out: Collector[LoginEvent]): Unit = {
val allLogins: ListBuffer[LoginEvent] = ListBuffer()
import scala.collection.JavaConversions._
for (login <- loginState.get) {
allLogins += login
}
loginState.clear()
if (allLogins.length > 1) {
out.collect(allLogins.head)
}
}
}
}
5.2.2 狀態編程的改進
上一節的代碼實現中我們可以看到,直接把每次登錄失敗的數據存起來、設置定時器一段時間后再讀取,這種做法盡管簡單,但和我們開始的需求還是略有差異的。這種做法只能隔2秒之后去判斷一下這期間是否有多次失敗登錄,而不是在一次登錄失敗之后、再一次登錄失敗時就立刻報警。這個需求如果嚴格實現起來,相當於要判斷任意緊鄰的事件,是否符合某種模式。
於是我們可以想到,這個需求其實可以不用定時器觸發,直接在狀態中存取上一次登錄失敗的事件,每次都做判斷和比對,就可以實現最初的需求。
上節的代碼MatchFunction中刪掉onTimer,processElement改為:
override def processElement(value: LoginEvent, ctx: KeyedProcessFunction[Long, LoginEvent, Warning]#Context, out: Collector[Warning]): Unit = {
// 首先按照type做篩選,如果success直接清空,如果fail再做處理
if ( value.eventType == "fail" ){
// 如果已經有登錄失敗的數據,那么就判斷是否在兩秒內
val iter = loginState.get().iterator()
if ( iter.hasNext ){
val firstFail = iter.next()
// 如果兩次登錄失敗時間間隔小於2秒,輸出報警
if ( value.eventTime < firstFail.eventTime + 2 ){
out.collect( Warning( value.userId, firstFail.eventTime, value.eventTime, "login fail in 2 seconds." ) )
}
// 把最近一次的登錄失敗數據,更新寫入state中
val failList = new util.ArrayList[LoginEvent]()
failList.add(value)
loginState.update( failList )
} else {
// 如果state中沒有登錄失敗的數據,那就直接添加進去
loginState.add(value)
}
} else
loginState.clear()
}
5.2.3 CEP編程
上一節我們通過對狀態編程的改進,去掉了定時器,在process function中做了更多的邏輯處理,實現了最初的需求。不過這種方法里有很多的條件判斷,而我們目前僅僅實現的是檢測“連續2次登錄失敗”,這是最簡單的情形。如果需要檢測更多次,內部邏輯顯然會變得非常復雜。那有什么方式可以方便地實現呢?
很幸運,flink為我們提供了CEP(Complex Event Processing,復雜事件處理)庫,用於在流中篩選符合某種復雜模式的事件。接下來我們就基於CEP來完成這個模塊的實現。
在src/main/scala下繼續創建LoginFailWithCep.scala文件,新建一個單例對象。樣例類LoginEvent由於在LoginFail.scala已經定義,我們在同一個模塊中就不需要再定義了。
代碼如下:
LoginFailDetect/src/main/scala/LoginFailWithCep.scala
object LoginFailWithCep {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val loginEventStream = env.readTextFile("YOUR_PATH\\resources\\LoginLog.csv")
.map( data => {
val dataArray = data.split(",")
LoginEvent(dataArray(0).toLong, dataArray(1), dataArray(2), dataArray(3).toLong)
})
.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent]
(Time.milliseconds(3000)) {
override def extractTimestamp(element: ApacheLogEvent): Long ={
element.eventTime * 1000L
}
})
// 定義匹配模式
val loginFailPattern = Pattern.begin[LoginEvent]("begin")
.where(_.eventType == "fail")
.next("next")
.where(_.eventType == "fail")
.within(Time.seconds(2))
// 在數據流中匹配出定義好的模式
val patternStream = CEP.pattern(loginEventStream.keyBy(_.userId), loginFailPattern)
// .select方法傳入一個 pattern select function,當檢測到定義好的模式序列時就會調用
val loginFailDataStream = patternStream
.select((pattern: Map[String, Iterable[LoginEvent]]) => {
val first = pattern.getOrElse("begin", null).iterator.next()
val second = pattern.getOrElse("next", null).iterator.next()
(second.userId, second.ip, second.eventType)
})
// 將匹配到的符合條件的事件打印出來
loginFailDataStream.print()
env.execute("Login Fail Detect Job")
}
}
第6章 訂單支付實時監控
在電商網站中,訂單的支付作為直接與營銷收入掛鈎的一環,在業務流程中非常重要。對於訂單而言,為了正確控制業務流程,也為了增加用戶的支付意願,網站一般會設置一個支付失效時間,超過一段時間不支付的訂單就會被取消。另外,對於訂單的支付,我們還應保證用戶支付的正確性,這可以通過第三方支付平台的交易數據來做一個實時對賬。在接下來的內容中,我們將實現這兩個需求。
6.1 模塊創建和數據准備
同樣地,在UserBehaviorAnalysis下新建一個 maven module作為子項目,命名為OrderTimeoutDetect。在這個子模塊中,我們同樣將會用到flink的CEP庫來實現事件流的模式匹配,所以需要在pom文件中引入CEP的相關依賴:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
同樣,在src/main/目錄下,將默認源文件目錄java改名為scala。
6.2 代碼實現
在電商平台中,最終創造收入和利潤的是用戶下單購買的環節;更具體一點,是用戶真正完成支付動作的時候。用戶下單的行為可以表明用戶對商品的需求,但在現實中,並不是每次下單都會被用戶立刻支付。當拖延一段時間后,用戶支付的意願會降低。所以為了讓用戶更有緊迫感從而提高支付轉化率,同時也為了防范訂單支付環節的安全風險,電商網站往往會對訂單狀態進行監控,設置一個失效時間(比如15分鍾),如果下單后一段時間仍未支付,訂單就會被取消。
6.2.1 使用CEP實現
我們首先還是利用CEP庫來實現這個功能。我們先將事件流按照訂單號orderId分流,然后定義這樣的一個事件模式:在15分鍾內,事件“create”與“pay”非嚴格緊鄰:
val orderPayPattern = Pattern.begin[OrderEvent]("begin")
.where(_.eventType == "create")
.followedBy("follow")
.where(_.eventType == "pay")
.within(Time.seconds(5))
這樣調用.select方法時,就可以同時獲取到匹配出的事件和超時未匹配的事件了。
在src/main/scala下繼續創建OrderTimeout.scala文件,新建一個單例對象。定義樣例類OrderEvent,這是輸入的訂單事件流;另外還有OrderResult,這是輸出顯示的訂單狀態結果。訂單數據也本應該從UserBehavior日志里提取,由於UserBehavior.csv中沒有做相關埋點,我們從另一個文件OrderLog.csv中讀取登錄數據。
完整代碼如下:
OrderTimeoutDetect/src/main/scala/OrderTimeout.scala
case class OrderEvent(orderId: Long, eventType: String, eventTime: Long)
case class OrderResult(orderId: Long, eventType: String)
object OrderTimeout {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val orderEventStream = env.readTextFile("YOUR_PATH\\resources\\OrderLog.csv")
.map( data => {
val dataArray = data.split(",")
OrderEvent(dataArray(0).toLong, dataArray(1), dataArray(3).toLong)
})
.assignAscendingTimestamps(_.eventTime * 1000)
// 定義一個帶匹配時間窗口的模式
val orderPayPattern = Pattern.begin[OrderEvent]("begin")
.where(_.eventType == "create")
.followedBy("follow")
.where(_.eventType == "pay")
.within(Time.minutes(15))
// 定義一個輸出標簽
val orderTimeoutOutput = OutputTag[OrderResult]("orderTimeout")
// 訂單事件流根據 orderId 分流,然后在每一條流中匹配出定義好的模式
val patternStream = CEP.pattern(orderEventStream.keyBy("orderId"), orderPayPattern)
val completedResult = patternStream.select(orderTimeoutOutput) {
// 對於已超時的部分模式匹配的事件序列,會調用這個函數
(pattern: Map[String, Iterable[OrderEvent]], timestamp: Long) => {
val createOrder = pattern.get("begin")
OrderResult(createOrder.get.iterator.next().orderId, "timeout")
}
} {
// 檢測到定義好的模式序列時,就會調用這個函數
pattern: Map[String, Iterable[OrderEvent]] => {
val payOrder = pattern.get("follow")
OrderResult(payOrder.get.iterator.next().orderId, "success")
}
}
// 拿到同一輸出標簽中的 timeout 匹配結果(流)
val timeoutResult = completedResult.getSideOutput(orderTimeoutOutput)
completedResult.print()
timeoutResult.print()
env.execute("Order Timeout Detect Job")
}
}
6.2.2 使用Process Function實現
我們同樣可以利用Process Function,自定義實現檢測訂單超時的功能。為了簡化問題,我們只考慮超時報警的情形,在pay事件超時未發生的情況下,輸出超時報警信息。
一個簡單的思路是,可以在訂單的create事件到來后注冊定時器,15分鍾后觸發;然后再用一個布爾類型的Value狀態來作為標識位,表明pay事件是否發生過。如果pay事件已經發生,狀態被置為true,那么就不再需要做什么操作;而如果pay事件一直沒來,狀態一直為false,到定時器觸發時,就應該輸出超時報警信息。
具體代碼實現如下:
OrderTimeoutDetect/src/main/scala/OrderTimeoutWithoutCep.scala
object OrderTimeoutWithoutCep {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val orderEventStream = env.readTextFile("YOUR_PATH\\resources\\OrderLog.csv")
.map( data => {
val dataArray = data.split(",")
OrderEvent(dataArray(0).toLong, dataArray(1), dataArray(3).toLong)
})
.assignAscendingTimestamps(_.eventTime * 1000)
.keyBy(_.orderId)
// 自定義一個 process function,進行order的超時檢測,輸出超時報警信息
val timeoutWarningStream = orderEventStream
.process(new OrderTimeoutAlert)
timeoutWarningStream.print()
env.execute()
}
class OrderTimeoutAlert extends KeyedProcessFunction[Long, OrderEvent, OrderResult] {
lazy val isPayedState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("ispayed-state", classOf[Boolean]))
override def processElement(value: OrderEvent, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#Context, out: Collector[OrderResult]): Unit = {
val isPayed = isPayedState.value()
if (value.eventType == "create" && !isPayed) {
ctx.timerService().registerEventTimeTimer(value.eventTime * 1000L + 15 * 60 * 1000L)
} else if (value.eventType == "pay") {
isPayedState.update(true)
}
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#OnTimerContext, out: Collector[OrderResult]): Unit = {
val isPayed = isPayedState.value()
if (!isPayed) {
out.collect(OrderResult(ctx.getCurrentKey, "order timeout"))
}
isPayedState.clear()
}
}
}
6.3 來自兩條流的訂單交易匹配
對於訂單支付事件,用戶支付完成其實並不算完,我們還得確認平台賬戶上是否到賬了。而往往這會來自不同的日志信息,所以我們要同時讀入兩條流的數據來做合並處理。這里我們利用connect將兩條流進行連接,然后用自定義的CoProcessFunction進行處理。
具體代碼如下:
TxMatchDetect/src/main/scala/TxMatch
case class OrderEvent( orderId: Long, eventType: String, txId: String, eventTime: Long )
case class ReceiptEvent( txId: String, payChannel: String, eventTime: Long )
object TxMatch {
val unmatchedPays = new OutputTag[OrderEvent]("unmatchedPays")
val unmatchedReceipts = new OutputTag[ReceiptEvent]("unmatchedReceipts")
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val orderEventStream = env.readTextFile("YOUR_PATH\\resources\\OrderLog.csv")
.map( data => {
val dataArray = data.split(",")
OrderEvent(dataArray(0).toLong, dataArray(1), dataArray(2), dataArray(3).toLong)
})
.filter(_.txId != "")
.assignAscendingTimestamps(_.eventTime * 1000L)
.keyBy(_.txId)
val receiptEventStream = env.readTextFile("YOUR_PATH\\resources\\ReceiptLog.csv")
.map( data => {
val dataArray = data.split(",")
ReceiptEvent(dataArray(0), dataArray(1), dataArray(2).toLong)
})
.assignAscendingTimestamps(_.eventTime * 1000L)
.keyBy(_.txId)
val processedStream = orderEventStream
.connect(receiptEventStream)
.process(new TxMatchDetection)
processedStream.getSideOutput(unmatchedPays).print("unmatched pays")
processedStream.getSideOutput(unmatchedReceipts).print("unmatched receipts")
processedStream.print("processed")
env.execute()
}
class TxMatchDetection extends CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]{
lazy val payState: ValueState[OrderEvent] = getRuntimeContext.getState(new ValueStateDescriptor[OrderEvent]("pay-state",classOf[OrderEvent]) )
lazy val receiptState: ValueState[ReceiptEvent] = getRuntimeContext.getState(new ValueStateDescriptor[ReceiptEvent]("receipt-state", classOf[ReceiptEvent]) )
override def processElement1(pay: OrderEvent, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
val receipt = receiptState.value()
if( receipt != null ){
receiptState.clear()
out.collect((pay, receipt))
} else{
payState.update(pay)
ctx.timerService().registerEventTimeTimer(pay.eventTime * 1000L)
}
}
override def processElement2(receipt: ReceiptEvent, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
val payment = payState.value()
if( payment != null ){
payState.clear()
out.collect((payment, receipt))
} else{
receiptState.update(receipt)
ctx.timerService().registerEventTimeTimer(receipt.eventTime * 1000L)
}
}
override def onTimer(timestamp: Long, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#OnTimerContext, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
if ( payState.value() != null ){
ctx.output(unmatchedPays, payState.value())
}
if ( receiptState.value() != null ){
ctx.output(unmatchedReceipts, receiptState.value())
}
payState.clear()
receiptState.clear()
}
}
}
附錄 電商常見指標匯總
1. 電商指標整理
現在的電子商務:
1、大多買家通過搜索找到所買物品,而非電商網站的內部導航,搜索關鍵字更為重要;
2、電商商家通過推薦引擎來預測買家可能需要的商品。推薦引擎以歷史上具有類似購買記錄的買家數據以及用戶自身的購買記錄為基礎,向用戶提供推薦信息;
3、電商商家時刻優化網站性能,如A/B Test划分來訪流量,並區別對待來源不同的訪客,進而找到最優的產品、內容和價格;
4、購買流程早在買家訪問網站前,即在社交網絡、郵件以及在線社區中便已開始,即長漏斗流程(以一條推文、一段視頻或一個鏈接開始,以購買交易結束)。
相關數據指標:關鍵詞和搜索詞、推薦接受率、郵件列表/短信鏈接點入率
2. 電商8類基本指標
1)總體運營指標:從流量、訂單、總體銷售業績、整體指標進行把控,起碼對運營的電商平台有個大致了解,到底運營的怎么樣,是虧是賺。
2)站流量指標:即對訪問你網站的訪客進行分析,基於這些數據可以對網頁進行改進,以及對訪客的行為進行分析等等。
3)銷售轉化指標:分析從下單到支付整個過程的數據,幫助你提升商品轉化率。也可以對一些頻繁異常的數據展開分析。
4)客戶價值指標:這里主要就是分析客戶的價值,可以建立RFM價值模型,找出那些有價值的客戶,精准營銷等等。
5)商品類指標:主要分析商品的種類,那些商品賣得好,庫存情況,以及可以建立關聯模型,分析那些商品同時銷售的幾率比較高,而進行捆綁銷售,有點像啤酒和尿布的故事。
6)市場營銷活動指標,主要監控某次活動給電商網站帶來的效果,以及監控廣告的投放指標。
7)風控類指標:分析賣家評論,以及投訴情況,發現問題,改正問題
8)市場競爭指標:主要分析市場份額以及網站排名,進一步進行調整