大數據技術之電商用戶行為分析
第1章 項目整體介紹
1.1 電商的用戶行為
電商平台中的用戶行為頻繁且較復雜,系統上線運行一段時間后,可以收集到大量的用戶行為數據,進而利用大數據技術進行深入挖掘和分析,得到感興趣的商業指標並增強對風險的控制。
電商用戶行為數據多樣,整體可以分為用戶行為習慣數據和業務行為數據兩大類。用戶的行為習慣數據包括了用戶的登錄方式、上線的時間點及時長、點擊和瀏覽頁面、頁面停留時間以及頁面跳轉等等,我們可以從中進行流量統計和熱門商品的統計,也可以深入挖掘用戶的特征;這些數據往往可以從web服務器日志中直接讀取到。而業務行為數據就是用戶在電商平台中針對每個業務(通常是某個具體商品)所作的操作,我們一般會在業務系統中相應的位置埋點,然后收集日志進行分析。業務行為數據又可以簡單分為兩類:一類是能夠明顯地表現出用戶興趣的行為,比如對商品的收藏、喜歡、評分和評價,我們可以從中對數據進行深入分析,得到用戶畫像,進而對用戶給出個性化的推薦商品列表,這個過程往往會用到機器學習相關的算法;另一類則是常規的業務操作,但需要着重關注一些異常狀況以做好風控,比如登錄和訂單支付。
1.2 項目主要模塊
基於對電商用戶行為數據的基本分類,我們可以發現主要有以下三個分析方向:
1. 熱門統計
利用用戶的點擊瀏覽行為,進行流量統計、近期熱門商品統計等。
2. 偏好統計
利用用戶的偏好行為,比如收藏、喜歡、評分等,進行用戶畫像分析,給出個性化的商品推薦列表。
3. 風險控制
利用用戶的常規業務行為,比如登錄、下單、支付等,分析數據,對異常情況進行報警提示。
本項目限於數據,我們只實現熱門統計和風險控制中的部分內容,將包括以下四大模塊:實時熱門商品統計、實時流量統計、惡意登錄監控和訂單支付失效監控。
由於對實時性要求較高,我們會用flink作為數據處理的框架。在項目中,我們將綜合運用flink的各種API,基於EventTime去處理基本的業務需求,並且靈活地使用底層的processFunction,基於狀態編程和CEP去處理更加復雜的情形。
1.3 數據源解析
我們准備了一份淘寶用戶行為數據集,保存為csv文件。本數據集包含了淘寶上某一天隨機一百萬用戶的所有行為(包括點擊、購買、收藏、喜歡)。數據集的每一行表示一條用戶行為,由用戶ID、商品ID、商品類目ID、行為類型和時間戳組成,並以逗號分隔。關於數據集中每一列的詳細描述如下:
字段名 |
數據類型 |
說明 |
userId |
Long |
加密后的用戶ID |
itemId |
Long |
加密后的商品ID |
categoryId |
Int |
加密后的商品所屬類別ID |
behavior |
String |
用戶行為類型,包括(‘pv’, ‘’buy, ‘cart’, ‘fav’) |
timestamp |
Long |
行為發生的時間戳,單位秒 |
另外,我們還可以拿到web服務器的日志數據,這里以apache服務器的一份log為例,每一行日志記錄了訪問者的IP、userId、訪問時間、訪問方法以及訪問的url,具體描述如下:
字段名 |
數據類型 |
說明 |
ip |
String |
訪問的 IP |
userId |
Long |
訪問的 user ID |
eventTime |
Long |
訪問時間 |
method |
String |
訪問方法 GET/POST/PUT/DELETE |
url |
String |
訪問的 url |
由於行為數據有限,在實時熱門商品統計模塊中可以使用UserBehavior數據集,而對於惡意登錄監控和訂單支付失效監控,我們只以示例數據來做演示。
第2章 實時熱門商品統計
首先要實現的是實時熱門商品統計,我們將會基於UserBehavior數據集來進行分析。
項目主體用Scala編寫,采用IDEA作為開發環境進行項目編寫,采用maven作為項目構建和管理工具。首先我們需要搭建項目框架。
2.1 創建Maven項目
2.1.1 項目框架搭建
打開IDEA,創建一個maven項目,命名為UserBehaviorAnalysis。由於包含了多個模塊,我們可以以UserBehaviorAnalysis作為父項目,並在其下建一個名為HotItemsAnalysis的子項目,用於實時統計熱門top N商品。
在UserBehaviorAnalysis下新建一個 maven module作為子項目,命名為HotItemsAnalysis。
父項目只是為了規范化項目結構,方便依賴管理,本身是不需要代碼實現的,所以UserBehaviorAnalysis下的src文件夾可以刪掉。
2.1.2 聲明項目中工具的版本信息
我們整個項目需要的工具的不同版本可能會對程序運行造成影響,所以應該在最外層的UserBehaviorAnalysis中聲明所有子模塊共用的版本信息。
在pom.xml中加入以下配置:
UserBehaviorAnalysis/pom.xml
<properties> <scala.binary.version>2.11</scala.binary.version> |
2.1.3 添加項目依賴
對於整個項目而言,所有模塊都會用到flink相關的組件,所以我們在UserBehaviorAnalysis中引入公有依賴:
UserBehaviorAnalysis/pom.xml
<dependencies> <dependency> <artifactId>kafka_${scala.binary.version}</artifactId> <version>${kafka.version}</version> </dependencies> |
同樣,對於maven項目的構建,可以引入公有的插件:
<build> jar-with-dependencies </descriptorRef> |
在HotItemsAnalysis子模塊中,我們並沒有引入更多的依賴,所以不需要改動pom文件。
2.1.4 數據准備
在src/main/目錄下,可以看到已有的默認源文件目錄是java,我們可以將其改名為scala。將數據文件UserBehavior.csv復制到資源文件目錄src/main/resources下,我們將從這里讀取數據。
至此,我們的准備工作都已完成,接下來可以寫代碼了。
2.2 模塊代碼實現
我們將實現一個“實時熱門商品”的需求,可以將“實時熱門商品”翻譯成程序員更好理解的需求:每隔5分鍾輸出最近一小時內點擊量最多的前N個商品。將這個需求進行分解我們大概要做這么幾件事情:
• 抽取出業務時間戳,告訴Flink框架基於業務時間做窗口
• 過濾出點擊行為數據
• 按一小時的窗口大小,每5分鍾統計一次,做滑動窗口聚合(Sliding Window)
• 按每個窗口聚合,輸出每個窗口中點擊量前N名的商品
2.2.1 程序主體
在src/main/scala下創建HotItems.scala文件,新建一個單例對象。定義樣例類UserBehavior和ItemViewCount,在main函數中創建StreamExecutionEnvironment 並做配置,然后從UserBehavior.csv文件中讀取數據,並包裝成UserBehavior類型。代碼如下:
HotItemsAnalysis/src/main/scala/HotItems.scala
case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)
object HotItems {
def main(args: Array[String]): Unit = {
// 創建一個 StreamExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 設定Time類型為EventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 為了打印到控制台的結果不亂序,我們配置全局的並發為1,這里改變並發對結果正確性沒有影響
env.setParallelism(1)
val stream = env
// 以window下為例,需替換成自己的路徑
.readTextFile("YOUR_PATH\\resources\\UserBehavior.csv")
.map(line => {
val linearray = line.split(",")
UserBehavior(linearray(0).toLong, linearray(1).toLong, linearray(2).toInt, linearray(3), linearray(4).toLong)
})
// 指定時間戳和watermark
.assignAscendingTimestamps(_.timestamp * 1000)
env.execute("Hot Items Job")
}
這里注意,我們需要統計業務時間上的每小時的點擊量,所以要基於EventTime來處理。那么如果讓Flink按照我們想要的業務時間來處理呢?這里主要有兩件事情要做。
第一件是告訴Flink我們現在按照EventTime模式進行處理,Flink默認使用ProcessingTime處理,所以我們要顯式設置如下:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
第二件事情是指定如何獲得業務時間,以及生成Watermark。Watermark是用來追蹤業務事件的概念,可以理解成EventTime世界中的時鍾,用來指示當前處理到什么時刻的數據了。由於我們的數據源的數據已經經過整理,沒有亂序,即事件的時間戳是單調遞增的,所以可以將每條數據的業務時間就當做Watermark。這里我們用 assignAscendingTimestamps來實現時間戳的抽取和Watermark的生成。
注:真實業務場景一般都是亂序的,所以一般不用assignAscendingTimestamps,而是使用BoundedOutOfOrdernessTimestampExtractor。
.assignAscendingTimestamps(_.timestamp * 1000)
這樣我們就得到了一個帶有時間標記的數據流了,后面就能做一些窗口的操作。
2.2.2 過濾出點擊事件
在開始窗口操作之前,先回顧下需求“每隔5分鍾輸出過去一小時內點擊量最多的前N個商品”。由於原始數據中存在點擊、購買、收藏、喜歡各種行為的數據,但是我們只需要統計點擊量,所以先使用filter將點擊行為數據過濾出來。
.filter(_.behavior == "pv")
2.2.3 設置滑動窗口,統計點擊量
由於要每隔5分鍾統計一次最近一小時每個商品的點擊量,所以窗口大小是一小時,每隔5分鍾滑動一次。即分別要統計[09:00, 10:00), [09:05, 10:05), [09:10, 10:10)…等窗口的商品點擊量。是一個常見的滑動窗口需求(Sliding Window)。
.keyBy("itemId")
.timeWindow(Time.minutes(60), Time.minutes(5))
.aggregate(new CountAgg(), new WindowResultFunction());
我們使用.keyBy("itemId")對商品進行分組,使用.timeWindow(Time size, Time slide)對每個商品做滑動窗口(1小時窗口,5分鍾滑動一次)。然后我們使用 .aggregate(AggregateFunction af, WindowFunction wf) 做增量的聚合操作,它能使用AggregateFunction提前聚合掉數據,減少state的存儲壓力。較之 .apply(WindowFunction wf) 會將窗口中的數據都存儲下來,最后一起計算要高效地多。這里的CountAgg實現了AggregateFunction接口,功能是統計窗口中的條數,即遇到一條數據就加一。
// COUNT統計的聚合函數實現,每出現一條記錄就加一
class CountAgg extends AggregateFunction[UserBehavior, Long, Long] {
override def createAccumulator(): Long = 0L
override def add(userBehavior: UserBehavior, acc: Long): Long = acc + 1
override def getResult(acc: Long): Long = acc
override def merge(acc1: Long, acc2: Long): Long = acc1 + acc2
}
聚合操作.aggregate(AggregateFunction af, WindowFunction wf)的第二個參數WindowFunction將每個key每個窗口聚合后的結果帶上其他信息進行輸出。我們這里實現的WindowResultFunction將<主鍵商品ID,窗口,點擊量>封裝成了ItemViewCount進行輸出。
// 商品點擊量(窗口操作的輸出類型)
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)
代碼如下:
// 用於輸出窗口的結果
class WindowResultFunction extends WindowFunction[Long, ItemViewCount, Tuple, TimeWindow] {
override def apply(key: Tuple, window: TimeWindow, aggregateResult: Iterable[Long],
collector: Collector[ItemViewCount]) : Unit = {
val itemId: Long = key.asInstanceOf[Tuple1[Long]].f0
val count = aggregateResult.iterator.next
collector.collect(ItemViewCount(itemId, window.getEnd, count))
}
}
現在我們就得到了每個商品在每個窗口的點擊量的數據流。
2.2.4 計算最熱門Top N商品
為了統計每個窗口下最熱門的商品,我們需要再次按窗口進行分組,這里根據ItemViewCount中的windowEnd進行keyBy()操作。然后使用ProcessFunction實現一個自定義的TopN函數TopNHotItems來計算點擊量排名前3名的商品,並將排名結果格式化成字符串,便於后續輸出。
.keyBy("windowEnd")
.process(new TopNHotItems(3)); // 求點擊量前3名的商品
ProcessFunction是Flink提供的一個low-level API,用於實現更高級的功能。它主要提供了定時器timer的功能(支持EventTime或ProcessingTime)。本案例中我們將利用timer來判斷何時收齊了某個window下所有商品的點擊量數據。由於Watermark的進度是全局的,在processElement方法中,每當收到一條數據ItemViewCount,我們就注冊一個windowEnd+1的定時器(Flink框架會自動忽略同一時間的重復注冊)。windowEnd+1的定時器被觸發時,意味着收到了windowEnd+1的Watermark,即收齊了該windowEnd下的所有商品窗口統計值。我們在onTimer()中處理將收集的所有商品及點擊量進行排序,選出TopN,並將排名信息格式化成字符串后進行輸出。
這里我們還使用了ListState<ItemViewCount>來存儲收到的每條ItemViewCount消息,保證在發生故障時,狀態數據的不丟失和一致性。ListState是Flink提供的類似Java List接口的State API,它集成了框架的checkpoint機制,自動做到了exactly-once的語義保證。
// 求某個窗口中前 N 名的熱門點擊商品,key 為窗口時間戳,輸出為 TopN 的結果字符串
class TopNHotItems(topSize: Int) extends KeyedProcessFunction[Tuple, ItemViewCount, String] {
private var itemState : ListState[ItemViewCount] = _
override def open(parameters: Configuration): Unit = {
super.open(parameters)
// 命名狀態變量的名字和狀態變量的類型
val itemsStateDesc = new ListStateDescriptor[ItemViewCount]("itemState-state", classOf[ItemViewCount])
// 定義狀態變量
itemState = getRuntimeContext.getListState(itemsStateDesc)
}
override def processElement(input: ItemViewCount, context: KeyedProcessFunction[Tuple, ItemViewCount, String]#Context, collector: Collector[String]): Unit = {
// 每條數據都保存到狀態中
itemState.add(input)
// 注冊 windowEnd+1 的 EventTime Timer, 當觸發時,說明收齊了屬於windowEnd窗口的所有商品數據
// 也就是當程序看到windowend + 1的水位線watermark時,觸發onTimer回調函數
context.timerService.registerEventTimeTimer(input.windowEnd + 1)
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
// 獲取收到的所有商品點擊量
val allItems: ListBuffer[ItemViewCount] = ListBuffer()
import scala.collection.JavaConversions._
for (item <- itemState.get) {
allItems += item
}
// 提前清除狀態中的數據,釋放空間
itemState.clear()
// 按照點擊量從大到小排序
val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topSize)
// 將排名信息格式化成 String, 便於打印
val result: StringBuilder = new StringBuilder
result.append("====================================\n")
result.append("時間: ").append(new Timestamp(timestamp - 1)).append("\n")
for(i <- sortedItems.indices){
val currentItem: ItemViewCount = sortedItems(i)
// e.g. No1: 商品ID=12224 瀏覽量=2413
result.append("No").append(i+1).append(":")
.append(" 商品ID=").append(currentItem.itemId)
.append(" 瀏覽量=").append(currentItem.count).append("\n")
}
result.append("====================================\n\n")
// 控制輸出頻率,模擬實時滾動結果
Thread.sleep(1000)
out.collect(result.toString)
}
}
最后我們可以在main函數中將結果打印輸出到控制台,方便實時觀測:
.print();
至此整個程序代碼全部完成,我們直接運行main函數,就可以在控制台看到不斷輸出的各個時間點統計出的熱門商品。
2.2.5 完整代碼
最終完整代碼如下:
case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)
object HotItems {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val stream = env
.readTextFile("YOUR_PATH\\resources\\UserBehavior.csv")
.map(line => {
val linearray = line.split(",")
UserBehavior(linearray(0).toLong, linearray(1).toLong, linearray(2).toInt, linearray(3), linearray(4).toLong)
})
.assignAscendingTimestamps(_.timestamp * 1000)
.filter(_.behavior=="pv")
.keyBy("itemId")
.timeWindow(Time.minutes(60), Time.minutes(5))
.aggregate(new CountAgg(), new WindowResultFunction())
.keyBy(1)
.process(new TopNHotItems(3))
.print()
env.execute("Hot Items Job")
}
// COUNT 統計的聚合函數實現,每出現一條記錄加一
class CountAgg extends AggregateFunction[UserBehavior, Long, Long] {
override def createAccumulator(): Long = 0L
override def add(userBehavior: UserBehavior, acc: Long): Long = acc + 1
override def getResult(acc: Long): Long = acc
override def merge(acc1: Long, acc2: Long): Long = acc1 + acc2
}
// 用於輸出窗口的結果
class WindowResultFunction extends WindowFunction[Long, ItemViewCount, Tuple, TimeWindow] {
override def apply(key: Tuple, window: TimeWindow, aggregateResult: Iterable[Long],
collector: Collector[ItemViewCount]) : Unit = {
val itemId: Long = key.asInstanceOf[Tuple1[Long]].f0
val count = aggregateResult.iterator.next
collector.collect(ItemViewCount(itemId, window.getEnd, count))
}
}
// 求某個窗口中前 N 名的熱門點擊商品,key 為窗口時間戳,輸出為 TopN 的結果字符串
class TopNHotItems(topSize: Int) extends KeyedProcessFunction[Tuple, ItemViewCount, String] {
private var itemState : ListState[ItemViewCount] = _
override def open(parameters: Configuration): Unit = {
super.open(parameters)
// 命名狀態變量的名字和狀態變量的類型
val itemsStateDesc = new ListStateDescriptor[ItemViewCount]("itemState-state", classOf[ItemViewCount])
// 從運行時上下文中獲取狀態並賦值
itemState = getRuntimeContext.getListState(itemsStateDesc)
}
override def processElement(input: ItemViewCount, context: KeyedProcessFunction[Tuple, ItemViewCount, String]#Context, collector: Collector[String]): Unit = {
// 每條數據都保存到狀態中
itemState.add(input)
// 注冊 windowEnd+1 的 EventTime Timer, 當觸發時,說明收齊了屬於windowEnd窗口的所有商品數據
// 也就是當程序看到windowend + 1的水位線watermark時,觸發onTimer回調函數
context.timerService.registerEventTimeTimer(input.windowEnd + 1)
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
// 獲取收到的所有商品點擊量
val allItems: ListBuffer[ItemViewCount] = ListBuffer()
import scala.collection.JavaConversions._
for (item <- itemState.get) {
allItems += item
}
// 提前清除狀態中的數據,釋放空間
itemState.clear()
// 按照點擊量從大到小排序
val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topSize)
// 將排名信息格式化成 String, 便於打印
val result: StringBuilder = new StringBuilder
result.append("====================================\n")
result.append("時間: ").append(new Timestamp(timestamp - 1)).append("\n")
for(i <- sortedItems.indices){
val currentItem: ItemViewCount = sortedItems(i)
// e.g. No1: 商品ID=12224 瀏覽量=2413
result.append("No").append(i+1).append(":")
.append(" 商品ID=").append(currentItem.itemId)
.append(" 瀏覽量=").append(currentItem.count).append("\n")
}
result.append("====================================\n\n")
// 控制輸出頻率,模擬實時滾動結果
Thread.sleep(1000)
out.collect(result.toString)
}
}
}
2.2.6 更換Kafka 作為數據源
實際生產環境中,我們的數據流往往是從Kafka獲取到的。如果要讓代碼更貼近生產實際,我們只需將source更換為Kafka即可:
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val stream = env
.addSource(new FlinkKafkaConsumer[String]("hotitems", new SimpleStringSchema(), properties))
當然,根據實際的需要,我們還可以將Sink指定為Kafka、ES、Redis或其它存儲,這里就不一一展開實現了。
第3章 實時流量統計
3.1 模塊創建和數據准備
在UserBehaviorAnalysis下新建一個 maven module作為子項目,命名為NetworkTrafficAnalysis。在這個子模塊中,我們同樣並沒有引入更多的依賴,所以也不需要改動pom文件。
在src/main/目錄下,將默認源文件目錄java改名為scala。將apache服務器的日志文件apache.log復制到資源文件目錄src/main/resources下,我們將從這里讀取數據。
3.2 代碼實現
我們現在要實現的模塊是 “實時流量統計”。對於一個電商平台而言,用戶登錄的入口流量、不同頁面的訪問流量都是值得分析的重要數據,而這些數據,可以簡單地從web服務器的日志中提取出來。我們在這里實現最基本的“頁面瀏覽數”的統計,也就是讀取服務器日志中的每一行log,統計在一段時間內用戶訪問url的次數。
具體做法為:每隔5秒,輸出最近10分鍾內訪問量最多的前N個URL。可以看出,這個需求與之前“實時熱門商品統計”非常類似,所以我們完全可以借鑒此前的代碼。
在src/main/scala下創建TrafficAnalysis.scala文件,新建一個單例對象。定義樣例類ApacheLogEvent,這是輸入的日志數據流;另外還有UrlViewCount,這是窗口操作統計的輸出數據類型。在main函數中創建StreamExecutionEnvironment 並做配置,然后從apache.log文件中讀取數據,並包裝成ApacheLogEvent類型。
需要注意的是,原始日志中的時間是“dd/MM/yyyy:HH:mm:ss”的形式,需要定義一個DateTimeFormat將其轉換為我們需要的時間戳格式:
.map(line => {
val linearray = line.split(" ")
val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
val timestamp = sdf.parse(linearray(3)).getTime
ApacheLogEvent(linearray(0), linearray(2), timestamp,
linearray(5), linearray(6))
})
完整代碼如下:
NetworkTrafficAnalysis/src/main/scala/TrafficAnalysis.scala
case class ApacheLogEvent(ip: String, userId: String, eventTime: Long, method: String, url: String)
case class UrlViewCount(url: String, windowEnd: Long, count: Long)
object TrafficAnalysis {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val stream = env
// 以window下為例,需替換成自己的路徑
.readTextFile("YOUR_PATH\\resources\\apache.log")
.map(line => {
val linearray = line.split(" ")
val simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
val timestamp = simpleDateFormat.parse(linearray(3)).getTime
ApacheLogEvent(linearray(0), linearray(2), timestamp, linearray(5), linearray(6))
})
.assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent]
(Time.milliseconds(1000)) {
override def extractTimestamp(t: ApacheLogEvent): Long = {
t.eventTime
}
})
.keyBy("url")
.timeWindow(Time.minutes(10), Time.seconds(5))
.aggregate(new CountAgg(), new WindowResultFunction())
.keyBy(1)
.process(new TopNHotUrls(5))
.print()
env.execute("Traffic Analysis Job")
}
class CountAgg extends AggregateFunction[ApacheLogEvent, Long, Long] {
override def createAccumulator(): Long = 0L
override def add(apacheLogEvent: ApacheLogEvent, acc: Long): Long = acc + 1
override def getResult(acc: Long): Long = acc
override def merge(acc1: Long, acc2: Long): Long = acc1 + acc2
}
class WindowResultFunction extends WindowFunction[Long, UrlViewCount, Tuple, TimeWindow] {
override def apply(key: Tuple, window: TimeWindow, aggregateResult: Iterable[Long], collector: Collector[UrlViewCount]) : Unit = {
val url: String = key.asInstanceOf[Tuple1[String]].f0
val count = aggregateResult.iterator.next
collector.collect(UrlViewCount(url, window.getEnd, count))
}
}
class TopNHotUrls(topsize: Int) extends KeyedProcessFunction[Tuple, UrlViewCount, String] {
private var urlState : ListState[UrlViewCount] = _
override def open(parameters: Configuration): Unit = {
super.open(parameters)
val urlStateDesc = new ListStateDescriptor[UrlViewCount]("urlState-state", classOf[UrlViewCount])
urlState = getRuntimeContext.getListState(urlStateDesc)
}
override def processElement(input: UrlViewCount, context: KeyedProcessFunction[Tuple, UrlViewCount, String]#Context, collector: Collector[String]): Unit = {
// 每條數據都保存到狀態中
urlState.add(input)
context.timerService.registerEventTimeTimer(input.windowEnd + 1)
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, UrlViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
// 獲取收到的所有URL訪問量
val allUrlViews: ListBuffer[UrlViewCount] = ListBuffer()
import scala.collection.JavaConversions._
for (urlView <- urlState.get) {
allUrlViews += urlView
}
// 提前清除狀態中的數據,釋放空間
urlState.clear()
// 按照訪問量從大到小排序
val sortedUrlViews = allUrlViews.sortBy(_.count)(Ordering.Long.reverse)
.take(topSize)
// 將排名信息格式化成 String, 便於打印
var result: StringBuilder = new StringBuilder
result.append("====================================\n")
result.append("時間: ").append(new Timestamp(timestamp - 1)).append("\n")
for (i <- sortedUrlViews.indices) {
val currentUrlView: UrlViewCount = sortedUrlViews(i)
// e.g. No1: URL=/blog/tags/firefox?flav=rss20 流量=55
result.append("No").append(i+1).append(":")
.append(" URL=").append(currentUrlView.url)
.append(" 流量=").append(currentUrlView.count).append("\n")
}
result.append("====================================\n\n")
// 控制輸出頻率,模擬實時滾動結果
Thread.sleep(1000)
out.collect(result.toString)
}
}
}
第4章 惡意登錄監控
4.1 模塊創建和數據准備
繼續在UserBehaviorAnalysis下新建一個 maven module作為子項目,命名為LoginFailDetect。在這個子模塊中,我們將會用到flink的CEP庫來實現事件流的模式匹配,所以需要在pom文件中引入CEP的相關依賴:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
同樣,在src/main/目錄下,將默認源文件目錄java改名為scala。
4.2 代碼實現
對於網站而言,用戶登錄並不是頻繁的業務操作。如果一個用戶短時間內頻繁登錄失敗,就有可能是出現了程序的惡意攻擊,比如密碼暴力破解。因此我們考慮,應該對用戶的登錄失敗動作進行統計,具體來說,如果同一用戶(可以是不同IP)在2秒之內連續兩次登錄失敗,就認為存在惡意登錄的風險,輸出相關的信息進行報警提示。這是電商網站、也是幾乎所有網站風控的基本一環。
4.2.1 狀態編程
由於同樣引入了時間,我們可以想到,最簡單的方法其實與之前的熱門統計類似,只需要按照用戶ID分流,然后遇到登錄失敗的事件時將其保存在ListState中,然后設置一個定時器,2秒后觸發。定時器觸發時檢查狀態中的登錄失敗事件個數,如果大於等於2,那么就輸出報警信息。
在src/main/scala下創建LoginFail.scala文件,新建一個單例對象。定義樣例類LoginEvent,這是輸入的登錄事件流。由於沒有現成的登錄數據,我們用幾條自定義的示例數據來做演示。
代碼如下:
LoginFailDetect/src/main/scala/LoginFail.scala
case class LoginEvent(userId: Long, ip: String, eventType: String, eventTime: Long)
object LoginFail {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val loginEventStream = env.fromCollection(List(
LoginEvent(1, "192.168.0.1", "fail", 1558430842),
LoginEvent(1, "192.168.0.2", "fail", 1558430843),
LoginEvent(1, "192.168.0.3", "fail", 1558430844),
LoginEvent(2, "192.168.10.10", "success", 1558430845)
))
.assignAscendingTimestamps(_.eventTime * 1000)
.keyBy(_.userId)
.process(new MatchFunction())
.print()
env.execute("Login Fail Detect Job")
}
class MatchFunction extends KeyedProcessFunction[Long, LoginEvent, LoginEvent] {
// 定義狀態變量
lazy val loginState: ListState[LoginEvent] = getRuntimeContext.getListState(
new ListStateDescriptor[LoginEvent]("saved login", classOf[LoginEvent]))
override def processElement(login: LoginEvent,
context: KeyedProcessFunction[Long, LoginEvent,
LoginEvent]#Context, out: Collector[LoginEvent]): Unit = {
if (login.eventType == "fail") {
loginState.add(login)
}
// 注冊定時器,觸發事件設定為2秒后
context.timerService.registerEventTimeTimer(login.eventTime + 2 * 1000)
}
override def onTimer(timestamp: Long,
ctx: KeyedProcessFunction[Long, LoginEvent,
LoginEvent]#OnTimerContext, out: Collector[LoginEvent]): Unit = {
val allLogins: ListBuffer[LoginEvent] = ListBuffer()
import scala.collection.JavaConversions._
for (login <- loginState.get) {
allLogins += login
}
loginState.clear()
if (allLogins.length > 1) {
out.collect(allLogins.head)
}
}
}
}
4.2.2 CEP編程
上一節的代碼實現中我們可以看到,直接把每次登錄失敗的數據存起來、設置定時器一段時間后再讀取,這種做法盡管簡單,但和我們開始的需求還是略有差異的。這種做法只能隔2秒之后去判斷一下這期間是否有多次失敗登錄,而不是在一次登錄失敗之后、再一次登錄失敗時就立刻報警。這個需求如果嚴格實現起來,相當於要判斷任意緊鄰的事件,是否符合某種模式。這聽起來就很復雜了,那有什么方式可以方便地實現呢?
很幸運,flink為我們提供了CEP(Complex Event Processing,復雜事件處理)庫,用於在流中篩選符合某種復雜模式的事件。接下來我們就基於CEP來完成這個模塊的實現。
在src/main/scala下繼續創建LoginFailWithCep.scala文件,新建一個單例對象。樣例類LoginEvent由於在LoginFail.scala已經定義,我們在同一個模塊中就不需要再定義了。
代碼如下:
LoginFailDetect/src/main/scala/LoginFailWithCep.scala
object LoginFailWithCep {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val loginEventStream = env.fromCollection(List(
LoginEvent(1, "192.168.0.1", "fail", 1558430842),
LoginEvent(1, "192.168.0.2", "fail", 1558430843),
LoginEvent(1, "192.168.0.3", "fail", 1558430844),
LoginEvent(2, "192.168.10.10", "success", 1558430845)
)).assignAscendingTimestamps(_.eventTime * 1000)
// 定義匹配模式
val loginFailPattern = Pattern.begin[LoginEvent]("begin")
.where(_.eventType == "fail")
.next("next")
.where(_.eventType == "fail")
.within(Time.seconds(2))
// 在數據流中匹配出定義好的模式
val patternStream = CEP.pattern(loginEventStream.keyBy(_.userId), loginFailPattern)
// .select方法傳入一個 pattern select function,當檢測到定義好的模式序列時就會調用
val loginFailDataStream = patternStream
.select((pattern: Map[String, Iterable[LoginEvent]]) => {
val first = pattern.getOrElse("begin", null).iterator.next()
val second = pattern.getOrElse("next", null).iterator.next()
(second.userId, second.ip, second.eventType)
})
// 將匹配到的符合條件的事件打印出來
loginFailDataStream.print()
env.execute("Login Fail Detect Job")
}
}
第5章 訂單支付實時監控
5.1 模塊創建和數據准備
同樣地,在UserBehaviorAnalysis下新建一個 maven module作為子項目,命名為OrderTimeoutDetect。在這個子模塊中,我們同樣將會用到flink的CEP庫來實現事件流的模式匹配,所以需要在pom文件中引入CEP的相關依賴:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
同樣,在src/main/目錄下,將默認源文件目錄java改名為scala。
5.2 代碼實現
在電商平台中,最終創造收入和利潤的是用戶下單購買的環節;更具體一點,是用戶真正完成支付動作的時候。用戶下單的行為可以表明用戶對商品的需求,但在現實中,並不是每次下單都會被用戶立刻支付。當拖延一段時間后,用戶支付的意願會降低。所以為了讓用戶更有緊迫感從而提高支付轉化率,同時也為了防范訂單支付環節的安全風險,電商網站往往會對訂單狀態進行監控,設置一個失效時間(比如15分鍾),如果下單后一段時間仍未支付,訂單就會被取消。
我們將會利用CEP庫來實現這個功能。我們先將事件流按照訂單號orderId分流,然后定義這樣的一個事件模式:在15分鍾內,事件“create”與“pay”嚴格緊鄰:
val orderPayPattern = Pattern.begin[OrderEvent]("begin")
.where(_.eventType == "create")
.next("next")
.where(_.eventType == "pay")
.within(Time.seconds(5))
這樣調用.select方法時,就可以同時獲取到匹配出的事件和超時未匹配的事件了。
在src/main/scala下繼續創建OrderTimeout.scala文件,新建一個單例對象。定義樣例類OrderEvent,這是輸入的訂單事件流;另外還有OrderResult,這是輸出顯示的訂單狀態結果。由於沒有現成的數據,我們還是用幾條自定義的示例數據來做演示。
完整代碼如下:
OrderTimeoutDetect/src/main/scala/OrderTimeout.scala
case class OrderEvent(orderId: Long, eventType: String, eventTime: Long)
case class OrderResult(orderId: Long, eventType: String)
object OrderTimeout {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val orderEventStream = env.fromCollection(List(
OrderEvent(1, "create", 1558430842),
OrderEvent(2, "create", 1558430843),
OrderEvent(2, "pay", 1558430844)
)).assignAscendingTimestamps(_.eventTime * 1000)
// 定義一個帶匹配時間窗口的模式
val orderPayPattern = Pattern.begin[OrderEvent]("begin")
.where(_.eventType == "create")
.next("next")
.where(_.eventType == "pay")
.within(Time.minutes(15))
// 定義一個輸出標簽
val orderTimeoutOutput = OutputTag[OrderResult]("orderTimeout")
// 訂單事件流根據 orderId 分流,然后在每一條流中匹配出定義好的模式
val patternStream = CEP.pattern(orderEventStream.keyBy("orderId"), orderPayPattern)
val complexResult = patternStream.select(orderTimeoutOutput) {
// 對於已超時的部分模式匹配的事件序列,會調用這個函數
(pattern: Map[String, Iterable[OrderEvent]], timestamp: Long) => {
val createOrder = pattern.get("begin")
OrderResult(createOrder.get.iterator.next().orderId, "timeout")
}
} {
// 檢測到定義好的模式序列時,就會調用這個函數
pattern: Map[String, Iterable[OrderEvent]] => {
val payOrder = pattern.get("next")
OrderResult(payOrder.get.iterator.next().orderId, "success")
}
}
// 拿到同一輸出標簽中的 timeout 匹配結果(流)
val timeoutResult = complexResult.getSideOutput(orderTimeoutOutput)
complexResult.print()
timeoutResult.print()
env.execute("Order Timeout Detect Job")
}
}