Flink_大數據技術之電商用戶行為分析


 

大數據技術之電商用戶行為分析

 

第1章 項目整體介紹

1.1 電商的用戶行為

電商平台中的用戶行為頻繁且較復雜,系統上線運行一段時間后,可以收集到大量的用戶行為數據,進而利用大數據技術進行深入挖掘和分析,得到感興趣的商業指標並增強對風險的控制。

電商用戶行為數據多樣,整體可以分為用戶行為習慣數據和業務行為數據兩大類。用戶的行為習慣數據包括了用戶的登錄方式、上線的時間點及時長、點擊和瀏覽頁面、頁面停留時間以及頁面跳轉等等,我們可以從中進行流量統計和熱門商品的統計,也可以深入挖掘用戶的特征;這些數據往往可以從web服務器日志中直接讀取到。而業務行為數據就是用戶在電商平台中針對每個業務(通常是某個具體商品)所作的操作,我們一般會在業務系統中相應的位置埋點,然后收集日志進行分析。業務行為數據又可以簡單分為兩類:一類是能夠明顯地表現出用戶興趣的行為,比如對商品的收藏、喜歡、評分和評價,我們可以從中對數據進行深入分析,得到用戶畫像,進而對用戶給出個性化的推薦商品列表,這個過程往往會用到機器學習相關的算法;另一類則是常規的業務操作,但需要着重關注一些異常狀況以做好風控,比如登錄和訂單支付。

 

1.2 項目主要模塊

基於對電商用戶行為數據的基本分類,我們可以發現主要有以下三個分析方向:

1. 熱門統計

利用用戶的點擊瀏覽行為,進行流量統計、近期熱門商品統計等。

2. 偏好統計

利用用戶的偏好行為,比如收藏、喜歡、評分等,進行用戶畫像分析,給出個性化的商品推薦列表。

3. 風險控制

利用用戶的常規業務行為,比如登錄、下單、支付等,分析數據,對異常情況進行報警提示。

本項目限於數據,我們只實現熱門統計和風險控制中的部分內容,將包括以下四大模塊:實時熱門商品統計、實時流量統計、惡意登錄監控和訂單支付失效監控。

 

由於對實時性要求較高,我們會用flink作為數據處理的框架。在項目中,我們將綜合運用flink的各種API,基於EventTime去處理基本的業務需求,並且靈活地使用底層的processFunction,基於狀態編程和CEP去處理更加復雜的情形。

1.3 數據源解析

我們准備了一份淘寶用戶行為數據集,保存為csv文件。本數據集包含了淘寶上某一天隨機一百萬用戶的所有行為(包括點擊、購買、收藏、喜歡)。數據集的每一行表示一條用戶行為,由用戶ID、商品ID、商品類目ID、行為類型和時間戳組成,並以逗號分隔。關於數據集中每一列的詳細描述如下:

 

字段名

數據類型

說明

userId

Long

加密后的用戶ID

itemId

Long

加密后的商品ID

categoryId

Int

加密后的商品所屬類別ID

behavior

String

用戶行為類型,包括(‘pv’, ‘’buy, ‘cart’, ‘fav’)

timestamp

Long

行為發生的時間戳,單位秒

另外,我們還可以拿到web服務器的日志數據,這里以apache服務器的一份log為例,每一行日志記錄了訪問者的IP、userId、訪問時間、訪問方法以及訪問的url,具體描述如下:

字段名

數據類型

說明

ip

String

訪問的 IP

userId

Long

訪問的 user ID

eventTime

Long

訪問時間

method

String

訪問方法 GET/POST/PUT/DELETE

url

String

訪問的 url

 

 

由於行為數據有限,在實時熱門商品統計模塊中可以使用UserBehavior數據集,而對於惡意登錄監控和訂單支付失效監控,我們只以示例數據來做演示。

第2章 實時熱門商品統計

首先要實現的是實時熱門商品統計,我們將會基於UserBehavior數據集來進行分析。

項目主體用Scala編寫,采用IDEA作為開發環境進行項目編寫,采用maven作為項目構建和管理工具。首先我們需要搭建項目框架。

2.1 創建Maven項目

2.1.1 項目框架搭建

打開IDEA,創建一個maven項目,命名為UserBehaviorAnalysis。由於包含了多個模塊,我們可以以UserBehaviorAnalysis作為父項目,並在其下建一個名為HotItemsAnalysis的子項目,用於實時統計熱門top N商品。

在UserBehaviorAnalysis下新建一個 maven module作為子項目,命名為HotItemsAnalysis。

父項目只是為了規范化項目結構,方便依賴管理,本身是不需要代碼實現的,所以UserBehaviorAnalysis下的src文件夾可以刪掉。

 

2.1.2 聲明項目中工具的版本信息

我們整個項目需要的工具的不同版本可能會對程序運行造成影響,所以應該在最外層的UserBehaviorAnalysis中聲明所有子模塊共用的版本信息。

在pom.xml中加入以下配置:

UserBehaviorAnalysis/pom.xml

<properties>
    <flink.version>1.7.2</flink.version>

<scala.binary.version>2.11</scala.binary.version>
    <kafka.version>2.2.0</kafka.version>
</properties>

 

2.1.3 添加項目依賴

對於整個項目而言,所有模塊都會用到flink相關的組件,所以我們在UserBehaviorAnalysis中引入公有依賴:

UserBehaviorAnalysis/pom.xml

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>

<dependency>
        <groupId>org.apache.kafka</groupId>

<artifactId>kafka_${scala.binary.version}</artifactId>

<version>${kafka.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

</dependencies>

同樣,對於maven項目的構建,可以引入公有的插件:

<build>
    <plugins>
        <!-- 該插件用於將Scala代碼編譯成class文件 -->
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.4.6</version>
            <executions>
                <execution>
                    <!-- 聲明綁定到mavencompile階段 -->
                    <goals>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.0.0</version>
            <configuration>
                <descriptorRefs>
                  <descriptorRef>

jar-with-dependencies

</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

在HotItemsAnalysis子模塊中,我們並沒有引入更多的依賴,所以不需要改動pom文件。

 

2.1.4 數據准備

在src/main/目錄下,可以看到已有的默認源文件目錄是java,我們可以將其改名為scala。將數據文件UserBehavior.csv復制到資源文件目錄src/main/resources下,我們將從這里讀取數據。

至此,我們的准備工作都已完成,接下來可以寫代碼了。

 

2.2 模塊代碼實現

我們將實現一個“實時熱門商品”的需求,可以將“實時熱門商品”翻譯成程序員更好理解的需求:每隔5分鍾輸出最近一小時內點擊量最多的前N個商品。將這個需求進行分解我們大概要做這么幾件事情:

• 抽取出業務時間戳,告訴Flink框架基於業務時間做窗口

• 過濾出點擊行為數據

• 按一小時的窗口大小,每5分鍾統計一次,做滑動窗口聚合(Sliding Window)

• 按每個窗口聚合,輸出每個窗口中點擊量前N名的商品

 

2.2.1 程序主體

在src/main/scala下創建HotItems.scala文件,新建一個單例對象。定義樣例類UserBehavior和ItemViewCount,在main函數中創建StreamExecutionEnvironment 並做配置,然后從UserBehavior.csv文件中讀取數據,並包裝成UserBehavior類型。代碼如下:

HotItemsAnalysis/src/main/scala/HotItems.scala

case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)

 

object HotItems {
  def main(args: Array[String]): Unit = {

// 創建一個 StreamExecutionEnvironment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 設定Time類型為EventTime

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // 為了打印到控制台的結果不亂序,我們配置全局的並發為1,這里改變並發對結果正確性沒有影響

env.setParallelism(1)
    val stream = env

// 以window下為例,需替換成自己的路徑
      .readTextFile("YOUR_PATH\\resources\\UserBehavior.csv")
      .map(line => {
        val linearray = line.split(",")
        UserBehavior(linearray(0).toLong, linearray(1).toLong, linearray(2).toInt, linearray(3), linearray(4).toLong)
      })

  // 指定時間戳和watermark

.assignAscendingTimestamps(_.timestamp * 1000)


    env.execute("Hot Items Job")
  }

這里注意,我們需要統計業務時間上的每小時的點擊量,所以要基於EventTime來處理。那么如果讓Flink按照我們想要的業務時間來處理呢?這里主要有兩件事情要做。

第一件是告訴Flink我們現在按照EventTime模式進行處理,Flink默認使用ProcessingTime處理,所以我們要顯式設置如下:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

第二件事情是指定如何獲得業務時間,以及生成Watermark。Watermark是用來追蹤業務事件的概念,可以理解成EventTime世界中的時鍾,用來指示當前處理到什么時刻的數據了。由於我們的數據源的數據已經經過整理,沒有亂序,即事件的時間戳是單調遞增的,所以可以將每條數據的業務時間就當做Watermark。這里我們用 assignAscendingTimestamps來實現時間戳的抽取和Watermark的生成。

注:真實業務場景一般都是亂序的,所以一般不用assignAscendingTimestamps,而是使用BoundedOutOfOrdernessTimestampExtractor。

.assignAscendingTimestamps(_.timestamp * 1000)

這樣我們就得到了一個帶有時間標記的數據流了,后面就能做一些窗口的操作。

 

2.2.2 過濾出點擊事件

在開始窗口操作之前,先回顧下需求“每隔5分鍾輸出過去一小時內點擊量最多的前N個商品”。由於原始數據中存在點擊、購買、收藏、喜歡各種行為的數據,但是我們只需要統計點擊量,所以先使用filter將點擊行為數據過濾出來。

.filter(_.behavior == "pv")

 

2.2.3 設置滑動窗口,統計點擊量

由於要每隔5分鍾統計一次最近一小時每個商品的點擊量,所以窗口大小是一小時,每隔5分鍾滑動一次。即分別要統計[09:00, 10:00), [09:05, 10:05), [09:10, 10:10)…等窗口的商品點擊量。是一個常見的滑動窗口需求(Sliding Window)。

    .keyBy("itemId")

    .timeWindow(Time.minutes(60), Time.minutes(5))

    .aggregate(new CountAgg(), new WindowResultFunction());

我們使用.keyBy("itemId")對商品進行分組,使用.timeWindow(Time size, Time slide)對每個商品做滑動窗口(1小時窗口,5分鍾滑動一次)。然后我們使用 .aggregate(AggregateFunction af, WindowFunction wf) 做增量的聚合操作,它能使用AggregateFunction提前聚合掉數據,減少state的存儲壓力。較之 .apply(WindowFunction wf) 會將窗口中的數據都存儲下來,最后一起計算要高效地多。這里的CountAgg實現了AggregateFunction接口,功能是統計窗口中的條數,即遇到一條數據就加一。

 

// COUNT統計的聚合函數實現,每出現一條記錄就加一
class CountAgg extends AggregateFunction[UserBehavior, Long, Long] {
  override def createAccumulator(): Long = 0L
  override def add(userBehavior: UserBehavior, acc: Long): Long = acc + 1
  override def getResult(acc: Long): Long = acc
  override def merge(acc1: Long, acc2: Long): Long = acc1 + acc2
}

 

聚合操作.aggregate(AggregateFunction af, WindowFunction wf)的第二個參數WindowFunction將每個key每個窗口聚合后的結果帶上其他信息進行輸出。我們這里實現的WindowResultFunction將<主鍵商品ID,窗口,點擊量>封裝成了ItemViewCount進行輸出。

// 商品點擊量(窗口操作的輸出類型)

case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)

代碼如下:

// 用於輸出窗口的結果
class WindowResultFunction extends WindowFunction[Long, ItemViewCount, Tuple, TimeWindow] {
  override def apply(key: Tuple, window: TimeWindow, aggregateResult: Iterable[Long],
                     collector: Collector[ItemViewCount]) : Unit = {
    val itemId: Long = key.asInstanceOf[Tuple1[Long]].f0
    val count = aggregateResult.iterator.next
    collector.collect(ItemViewCount(itemId, window.getEnd, count))
  }
}

現在我們就得到了每個商品在每個窗口的點擊量的數據流。

 

2.2.4 計算最熱門Top N商品

為了統計每個窗口下最熱門的商品,我們需要再次按窗口進行分組,這里根據ItemViewCount中的windowEnd進行keyBy()操作。然后使用ProcessFunction實現一個自定義的TopN函數TopNHotItems來計算點擊量排名前3名的商品,並將排名結果格式化成字符串,便於后續輸出。

    .keyBy("windowEnd")

    .process(new TopNHotItems(3));  // 求點擊量前3名的商品

ProcessFunction是Flink提供的一個low-level API,用於實現更高級的功能。它主要提供了定時器timer的功能(支持EventTime或ProcessingTime)。本案例中我們將利用timer來判斷何時收齊了某個window下所有商品的點擊量數據。由於Watermark的進度是全局的,在processElement方法中,每當收到一條數據ItemViewCount,我們就注冊一個windowEnd+1的定時器(Flink框架會自動忽略同一時間的重復注冊)。windowEnd+1的定時器被觸發時,意味着收到了windowEnd+1的Watermark,即收齊了該windowEnd下的所有商品窗口統計值。我們在onTimer()中處理將收集的所有商品及點擊量進行排序,選出TopN,並將排名信息格式化成字符串后進行輸出。

這里我們還使用了ListState<ItemViewCount>來存儲收到的每條ItemViewCount消息,保證在發生故障時,狀態數據的不丟失和一致性。ListState是Flink提供的類似Java List接口的State API,它集成了框架的checkpoint機制,自動做到了exactly-once的語義保證。

 

  // 求某個窗口中前 N 名的熱門點擊商品,key 為窗口時間戳,輸出為 TopN 的結果字符串
  class TopNHotItems(topSize: Int) extends KeyedProcessFunction[Tuple, ItemViewCount, String] {
    private var itemState : ListState[ItemViewCount] = _

    override def open(parameters: Configuration): Unit = {
      super.open(parameters)
      // 命名狀態變量的名字和狀態變量的類型
      val itemsStateDesc = new ListStateDescriptor[ItemViewCount]("itemState-state"classOf[ItemViewCount])
      // 定義狀態變量
      itemState = getRuntimeContext.getListState(itemsStateDesc)
    }


    override def processElement(input: ItemViewCount, context: KeyedProcessFunction[Tuple, ItemViewCount, String]#Context, collector: Collector[String]): Unit = {
      // 每條數據都保存到狀態中
      itemState.add(input)
      // 注冊 windowEnd+1  EventTime Timer, 當觸發時,說明收齊了屬於windowEnd窗口的所有商品數據
      // 也就是當程序看到windowend + 1的水位線watermark時,觸發onTimer回調函數
      context.timerService.registerEventTimeTimer(input.windowEnd + 1)
    }

    override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
      // 獲取收到的所有商品點擊量
      val allItems: ListBuffer[ItemViewCount] = ListBuffer()
      import scala.collection.JavaConversions._
      for (item <- itemState.get) {
        allItems += item
      }
      // 提前清除狀態中的數據,釋放空間
      itemState.clear()
      // 按照點擊量從大到小排序
      val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topSize)
      // 將排名信息格式化成 String, 便於打印
      val result: StringBuilder = new StringBuilder
      result.append("====================================\n")
      result.append("時間: ").append(new Timestamp(timestamp - 1)).append("\n")

      for(i <- sortedItems.indices){
        val currentItem: ItemViewCount = sortedItems(i)
        // e.g.  No1:  商品ID=12224  瀏覽量=2413
        result.append("No").append(i+1).append(":")

.append("  商品ID=").append(currentItem.itemId)

.append("  瀏覽量=").append(currentItem.count).append("\n")
      }
      result.append("====================================\n\n")
      // 控制輸出頻率,模擬實時滾動結果
      Thread.sleep(1000)
      out.collect(result.toString)
    }
  }

最后我們可以在main函數中將結果打印輸出到控制台,方便實時觀測:

.print();

至此整個程序代碼全部完成,我們直接運行main函數,就可以在控制台看到不斷輸出的各個時間點統計出的熱門商品。

 

2.2.5 完整代碼

最終完整代碼如下:

case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)

case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)

object HotItems {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)
    val stream = env
      .readTextFile("YOUR_PATH\\resources\\UserBehavior.csv")
      .map(line => {
        val linearray = line.split(",")
        UserBehavior(linearray(0).toLong, linearray(1).toLong, linearray(2).toInt, linearray(3), linearray(4).toLong)
      })
      .assignAscendingTimestamps(_.timestamp * 1000)
      .filter(_.behavior=="pv")
      .keyBy("itemId")
      .timeWindow(Time.minutes(60), Time.minutes(5))
      .aggregate(new CountAgg(), new WindowResultFunction())    
      .keyBy(1)
      .process(new TopNHotItems(3))
      .print()

    env.execute("Hot Items Job")
  }

  // COUNT 統計的聚合函數實現,每出現一條記錄加一
  class CountAgg extends AggregateFunction[UserBehavior, Long, Long] {
    override def createAccumulator(): Long = 0L
    override def add(userBehavior: UserBehavior, acc: Long): Long = acc + 1
    override def getResult(acc: Long): Long = acc
    override def merge(acc1: Long, acc2: Long): Long = acc1 + acc2
  }
  // 用於輸出窗口的結果
  class WindowResultFunction extends WindowFunction[Long, ItemViewCount, Tuple, TimeWindow] {
    override def apply(key: Tuple, window: TimeWindow, aggregateResult: Iterable[Long],
                       collector: Collector[ItemViewCount]) : Unit = {
      val itemId: Long = key.asInstanceOf[Tuple1[Long]].f0
      val count = aggregateResult.iterator.next
      collector.collect(ItemViewCount(itemId, window.getEnd, count))
    }
  }

 

// 求某個窗口中前 N 名的熱門點擊商品,key 為窗口時間戳,輸出為 TopN 的結果字符串
  class TopNHotItems(topSize: Int) extends KeyedProcessFunction[Tuple, ItemViewCount, String] {
    private var itemState : ListState[ItemViewCount] = _


    override def open(parameters: Configuration): Unit = {
      super.open(parameters)
      // 命名狀態變量的名字和狀態變量的類型
      val itemsStateDesc = new ListStateDescriptor[ItemViewCount]("itemState-state"classOf[ItemViewCount])
      // 從運行時上下文中獲取狀態並賦值
      itemState = getRuntimeContext.getListState(itemsStateDesc)
    }


    override def processElement(input: ItemViewCount, context: KeyedProcessFunction[Tuple, ItemViewCount, String]#Context, collector: Collector[String]): Unit = {
      // 每條數據都保存到狀態中
      itemState.add(input)
      // 注冊 windowEnd+1  EventTime Timer, 當觸發時,說明收齊了屬於windowEnd窗口的所有商品數據
      // 也就是當程序看到windowend + 1的水位線watermark時,觸發onTimer回調函數
      context.timerService.registerEventTimeTimer(input.windowEnd + 1)
    }

    override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
      // 獲取收到的所有商品點擊量
      val allItems: ListBuffer[ItemViewCount] = ListBuffer()
      import scala.collection.JavaConversions._
      for (item <- itemState.get) {
        allItems += item
      }
      // 提前清除狀態中的數據,釋放空間
      itemState.clear()
      // 按照點擊量從大到小排序
      val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topSize)
      // 將排名信息格式化成 String, 便於打印
      val result: StringBuilder = new StringBuilder
      result.append("====================================\n")
      result.append("時間: ").append(new Timestamp(timestamp - 1)).append("\n")

      for(i <- sortedItems.indices){
        val currentItem: ItemViewCount = sortedItems(i)
        // e.g.  No1:  商品ID=12224  瀏覽量=2413
        result.append("No").append(i+1).append(":")

.append("  商品ID=").append(currentItem.itemId)

.append("  瀏覽量=").append(currentItem.count).append("\n")
      }
      result.append("====================================\n\n")
      // 控制輸出頻率,模擬實時滾動結果
      Thread.sleep(1000)
      out.collect(result.toString)
    }
  }

}

2.2.6 更換Kafka 作為數據源

實際生產環境中,我們的數據流往往是從Kafka獲取到的。如果要讓代碼更貼近生產實際,我們只需將source更換為Kafka即可:

val properties = new Properties()
properties.setProperty("bootstrap.servers""localhost:9092")
properties.setProperty("group.id""consumer-group")
properties.setProperty("key.deserializer",

"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer",

"org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset""latest")


val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)


val stream = env
  .addSource(new FlinkKafkaConsumer[String]("hotitems"new SimpleStringSchema(), properties))

當然,根據實際的需要,我們還可以將Sink指定為Kafka、ES、Redis或其它存儲,這里就不一一展開實現了。

第3章 實時流量統計

3.1 模塊創建和數據准備

在UserBehaviorAnalysis下新建一個 maven module作為子項目,命名為NetworkTrafficAnalysis。在這個子模塊中,我們同樣並沒有引入更多的依賴,所以也不需要改動pom文件。

在src/main/目錄下,將默認源文件目錄java改名為scala。將apache服務器的日志文件apache.log復制到資源文件目錄src/main/resources下,我們將從這里讀取數據。

 

3.2 代碼實現

我們現在要實現的模塊是 “實時流量統計”。對於一個電商平台而言,用戶登錄的入口流量、不同頁面的訪問流量都是值得分析的重要數據,而這些數據,可以簡單地從web服務器的日志中提取出來。我們在這里實現最基本的“頁面瀏覽數”的統計,也就是讀取服務器日志中的每一行log,統計在一段時間內用戶訪問url的次數。

具體做法為:每隔5秒,輸出最近10分鍾內訪問量最多的前N個URL。可以看出,這個需求與之前“實時熱門商品統計”非常類似,所以我們完全可以借鑒此前的代碼。

在src/main/scala下創建TrafficAnalysis.scala文件,新建一個單例對象。定義樣例類ApacheLogEvent,這是輸入的日志數據流;另外還有UrlViewCount,這是窗口操作統計的輸出數據類型。在main函數中創建StreamExecutionEnvironment 並做配置,然后從apache.log文件中讀取數據,並包裝成ApacheLogEvent類型。

需要注意的是,原始日志中的時間是“dd/MM/yyyy:HH:mm:ss”的形式,需要定義一個DateTimeFormat將其轉換為我們需要的時間戳格式:

.map(line => {

val linearray = line.split(" ")

val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")

val timestamp = sdf.parse(linearray(3)).getTime

ApacheLogEvent(linearray(0), linearray(2), timestamp,

linearray(5), linearray(6))

})

完整代碼如下:

NetworkTrafficAnalysis/src/main/scala/TrafficAnalysis.scala

case class ApacheLogEvent(ip: String, userId: String, eventTime: Long, method: String, url: String)
case class UrlViewCount(url: String, windowEnd: Long, count: Long)

object TrafficAnalysis {


  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)
    val stream = env

// 以window下為例,需替換成自己的路徑
      .readTextFile("YOUR_PATH\\resources\\apache.log")
      .map(line => {

val linearray = line.split(" ")

val simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")

val timestamp = simpleDateFormat.parse(linearray(3)).getTime
        ApacheLogEvent(linearray(0), linearray(2), timestamp, linearray(5), linearray(6))
      })
      .assignTimestampsAndWatermarks(new 

BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent]

(Time.milliseconds(1000)) {

override def extractTimestamp(t: ApacheLogEvent): Long = {

t.eventTime

}

})
      .keyBy("url")
      .timeWindow(Time.minutes(10), Time.seconds(5))
      .aggregate(new CountAgg(), new WindowResultFunction())
      .keyBy(1)
      .process(new TopNHotUrls(5))
      .print()
    env.execute("Traffic Analysis Job")
  }

  class CountAgg extends AggregateFunction[ApacheLogEvent, Long, Long] {
    override def createAccumulator(): Long = 0L
    override def add(apacheLogEvent: ApacheLogEvent, acc: Long): Long = acc + 1
    override def getResult(acc: Long): Long = acc
    override def merge(acc1: Long, acc2: Long): Long = acc1 + acc2
  }

  class WindowResultFunction extends WindowFunction[Long, UrlViewCount, Tuple, TimeWindow] {
    override def apply(key: Tuple, window: TimeWindow, aggregateResult: Iterable[Long], collector: Collector[UrlViewCount]) : Unit = {
      val url: String = key.asInstanceOf[Tuple1[String]].f0
      val count = aggregateResult.iterator.next
      collector.collect(UrlViewCount(url, window.getEnd, count))
    }
  }

  class TopNHotUrls(topsize: Int) extends KeyedProcessFunction[Tuple, UrlViewCount, String] {
    private var urlState : ListState[UrlViewCount] = _

    override def open(parameters: Configuration): Unit = {
      super.open(parameters)
      val urlStateDesc = new ListStateDescriptor[UrlViewCount]("urlState-state"classOf[UrlViewCount])
      urlState = getRuntimeContext.getListState(urlStateDesc)
    }

    override def processElement(input: UrlViewCount, context: KeyedProcessFunction[Tuple, UrlViewCount, String]#Context, collector: Collector[String]): Unit = {

// 每條數據都保存到狀態中
      urlState.add(input)
      context.timerService.registerEventTimeTimer(input.windowEnd + 1)
    }

    override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, UrlViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {

// 獲取收到的所有URL訪問量
      val allUrlViews: ListBuffer[UrlViewCount] = ListBuffer()
      import scala.collection.JavaConversions._
      for (urlView <- urlState.get) {
        allUrlViews += urlView
      }
      // 提前清除狀態中的數據,釋放空間
      urlState.clear()
      // 按照訪問量從大到小排序
      val sortedUrlViews = allUrlViews.sortBy(_.count)(Ordering.Long.reverse)

.take(topSize)
      // 將排名信息格式化成 String, 便於打印
      var result: StringBuilder = new StringBuilder
      result.append("====================================\n")
      result.append("時間: ").append(new Timestamp(timestamp - 1)).append("\n")

      for (i <- sortedUrlViews.indices) {
        val currentUrlView: UrlViewCount = sortedUrlViews(i)

// e.g.  No1:  URL=/blog/tags/firefox?flav=rss20  流量=55
        result.append("No").append(i+1).append(":")

.append("  URL=").append(currentUrlView.url)

.append("  流量=").append(currentUrlView.count).append("\n")
      }
      result.append("====================================\n\n")
      // 控制輸出頻率,模擬實時滾動結果
      Thread.sleep(1000)
      out.collect(result.toString)
    }
  }
}

 

第4章 惡意登錄監控

4.1 模塊創建和數據准備

繼續在UserBehaviorAnalysis下新建一個 maven module作為子項目,命名為LoginFailDetect。在這個子模塊中,我們將會用到flink的CEP庫來實現事件流的模式匹配,所以需要在pom文件中引入CEP的相關依賴:

<dependency>
        <groupId>org.apache.flink</groupId>

<artifactId>flink-cep_${scala.binary.version}</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>
        <groupId>org.apache.flink</groupId>

<artifactId>flink-cep-scala_${scala.binary.version}</artifactId>

<version>${flink.version}</version>

</dependency>

同樣,在src/main/目錄下,將默認源文件目錄java改名為scala。

 

4.2 代碼實現

對於網站而言,用戶登錄並不是頻繁的業務操作。如果一個用戶短時間內頻繁登錄失敗,就有可能是出現了程序的惡意攻擊,比如密碼暴力破解。因此我們考慮,應該對用戶的登錄失敗動作進行統計,具體來說,如果同一用戶(可以是不同IP)在2秒之內連續兩次登錄失敗,就認為存在惡意登錄的風險,輸出相關的信息進行報警提示。這是電商網站、也是幾乎所有網站風控的基本一環。

4.2.1 狀態編程

由於同樣引入了時間,我們可以想到,最簡單的方法其實與之前的熱門統計類似,只需要按照用戶ID分流,然后遇到登錄失敗的事件時將其保存在ListState中,然后設置一個定時器,2秒后觸發。定時器觸發時檢查狀態中的登錄失敗事件個數,如果大於等於2,那么就輸出報警信息。

在src/main/scala下創建LoginFail.scala文件,新建一個單例對象。定義樣例類LoginEvent,這是輸入的登錄事件流。由於沒有現成的登錄數據,我們用幾條自定義的示例數據來做演示。

代碼如下:

LoginFailDetect/src/main/scala/LoginFail.scala

case class LoginEvent(userId: Long, ip: String, eventType: String, eventTime: Long)

object LoginFail {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    val loginEventStream = env.fromCollection(List(
      LoginEvent(1, "192.168.0.1""fail", 1558430842),
      LoginEvent(1, "192.168.0.2""fail", 1558430843),
      LoginEvent(1, "192.168.0.3""fail", 1558430844),
      LoginEvent(2, "192.168.10.10""success", 1558430845)
    ))
      .assignAscendingTimestamps(_.eventTime * 1000)
      .keyBy(_.userId)
      .process(new MatchFunction())
      .print()

    env.execute("Login Fail Detect Job")
  }

  class MatchFunction extends KeyedProcessFunction[Long, LoginEvent, LoginEvent] {


    // 定義狀態變量

lazy val loginState: ListState[LoginEvent] = getRuntimeContext.getListState(
      new ListStateDescriptor[LoginEvent]("saved login"classOf[LoginEvent]))

    override def processElement(login: LoginEvent,
                              context: KeyedProcessFunction[Long, LoginEvent, 

LoginEvent]#Context, out: Collector[LoginEvent]): Unit = {

      if (login.eventType == "fail") {
        loginState.add(login)
      }
      // 注冊定時器,觸發事件設定為2秒后
      context.timerService.registerEventTimeTimer(login.eventTime + 2 * 1000)
    }

    override def onTimer(timestamp: Long,
                    ctx: KeyedProcessFunction[Long, LoginEvent, 

LoginEvent]#OnTimerContext, out: Collector[LoginEvent]): Unit = {


      val allLogins: ListBuffer[LoginEvent] = ListBuffer()
      import scala.collection.JavaConversions._
      for (login <- loginState.get) {
        allLogins += login
      }
      loginState.clear()

      if (allLogins.length > 1) {
        out.collect(allLogins.head)
      }
    }
  }
}

4.2.2 CEP編程

上一節的代碼實現中我們可以看到,直接把每次登錄失敗的數據存起來、設置定時器一段時間后再讀取,這種做法盡管簡單,但和我們開始的需求還是略有差異的。這種做法只能隔2秒之后去判斷一下這期間是否有多次失敗登錄,而不是在一次登錄失敗之后、再一次登錄失敗時就立刻報警。這個需求如果嚴格實現起來,相當於要判斷任意緊鄰的事件,是否符合某種模式。這聽起來就很復雜了,那有什么方式可以方便地實現呢?

很幸運,flink為我們提供了CEP(Complex Event Processing,復雜事件處理)庫,用於在流中篩選符合某種復雜模式的事件。接下來我們就基於CEP來完成這個模塊的實現。

在src/main/scala下繼續創建LoginFailWithCep.scala文件,新建一個單例對象。樣例類LoginEvent由於在LoginFail.scala已經定義,我們在同一個模塊中就不需要再定義了。

代碼如下:

LoginFailDetect/src/main/scala/LoginFailWithCep.scala

object LoginFailWithCep {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    val loginEventStream = env.fromCollection(List(
      LoginEvent(1, "192.168.0.1""fail", 1558430842),
      LoginEvent(1, "192.168.0.2""fail", 1558430843),
      LoginEvent(1, "192.168.0.3""fail", 1558430844),
      LoginEvent(2, "192.168.10.10""success", 1558430845)
    )).assignAscendingTimestamps(_.eventTime * 1000)

// 定義匹配模式
    val loginFailPattern = Pattern.begin[LoginEvent]("begin")
      .where(_.eventType == "fail")
      .next("next")
      .where(_.eventType == "fail")
      .within(Time.seconds(2))

// 在數據流中匹配出定義好的模式
    val patternStream = CEP.pattern(loginEventStream.keyBy(_.userId), loginFailPattern)

    // .select方法傳入一個 pattern select function,當檢測到定義好的模式序列時就會調用
    val loginFailDataStream = patternStream
      .select((pattern: Map[String, Iterable[LoginEvent]]) => {
        val first = pattern.getOrElse("begin"null).iterator.next()
        val second = pattern.getOrElse("next"null).iterator.next()
        (second.userId, second.ip, second.eventType)
      })
    // 將匹配到的符合條件的事件打印出來
    loginFailDataStream.print()
    env.execute("Login Fail Detect Job")
  }
}

第5章 訂單支付實時監控

5.1 模塊創建和數據准備

同樣地,在UserBehaviorAnalysis下新建一個 maven module作為子項目,命名為OrderTimeoutDetect。在這個子模塊中,我們同樣將會用到flink的CEP庫來實現事件流的模式匹配,所以需要在pom文件中引入CEP的相關依賴:

<dependency>
        <groupId>org.apache.flink</groupId>

<artifactId>flink-cep_${scala.binary.version}</artifactId>

<version>${flink.version}</version>

</dependency>

<dependency>
        <groupId>org.apache.flink</groupId>

<artifactId>flink-cep-scala_${scala.binary.version}</artifactId>

<version>${flink.version}</version>

</dependency>

同樣,在src/main/目錄下,將默認源文件目錄java改名為scala。

 

5.2 代碼實現

在電商平台中,最終創造收入和利潤的是用戶下單購買的環節;更具體一點,是用戶真正完成支付動作的時候。用戶下單的行為可以表明用戶對商品的需求,但在現實中,並不是每次下單都會被用戶立刻支付。當拖延一段時間后,用戶支付的意願會降低。所以為了讓用戶更有緊迫感從而提高支付轉化率,同時也為了防范訂單支付環節的安全風險,電商網站往往會對訂單狀態進行監控,設置一個失效時間(比如15分鍾),如果下單后一段時間仍未支付,訂單就會被取消。

我們將會利用CEP庫來實現這個功能。我們先將事件流按照訂單號orderId分流,然后定義這樣的一個事件模式:在15分鍾內,事件“create”與“pay”嚴格緊鄰:

val orderPayPattern = Pattern.begin[OrderEvent]("begin")
  .where(_.eventType == "create")
  .next("next")
  .where(_.eventType == "pay")
  .within(Time.seconds(5))

這樣調用.select方法時,就可以同時獲取到匹配出的事件和超時未匹配的事件了。

在src/main/scala下繼續創建OrderTimeout.scala文件,新建一個單例對象。定義樣例類OrderEvent,這是輸入的訂單事件流;另外還有OrderResult,這是輸出顯示的訂單狀態結果。由於沒有現成的數據,我們還是用幾條自定義的示例數據來做演示。

完整代碼如下:

OrderTimeoutDetect/src/main/scala/OrderTimeout.scala

case class OrderEvent(orderId: Long, eventType: String, eventTime: Long)
case class OrderResult(orderId: Long, eventType: String)

object OrderTimeout {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val orderEventStream = env.fromCollection(List(
      OrderEvent(1, "create", 1558430842),
      OrderEvent(2, "create", 1558430843),
      OrderEvent(2, "pay", 1558430844)
    )).assignAscendingTimestamps(_.eventTime * 1000)

    // 定義一個帶匹配時間窗口的模式
    val orderPayPattern = Pattern.begin[OrderEvent]("begin")
      .where(_.eventType == "create")
      .next("next")
      .where(_.eventType == "pay")
      .within(Time.minutes(15))

    // 定義一個輸出標簽
    val orderTimeoutOutput = OutputTag[OrderResult]("orderTimeout")
    // 訂單事件流根據 orderId 分流,然后在每一條流中匹配出定義好的模式
    val patternStream = CEP.pattern(orderEventStream.keyBy("orderId"), orderPayPattern)

    val complexResult = patternStream.select(orderTimeoutOutput) {
      // 對於已超時的部分模式匹配的事件序列,會調用這個函數
      (pattern: Map[String, Iterable[OrderEvent]], timestamp: Long) => {
        val createOrder = pattern.get("begin")
        OrderResult(createOrder.get.iterator.next().orderId, "timeout")
      }
    } {
      // 檢測到定義好的模式序列時,就會調用這個函數
      pattern: Map[String, Iterable[OrderEvent]] => {
        val payOrder = pattern.get("next")
        OrderResult(payOrder.get.iterator.next().orderId, "success")
      }
    }
    // 拿到同一輸出標簽中的 timeout 匹配結果(流)
    val timeoutResult = complexResult.getSideOutput(orderTimeoutOutput)

    complexResult.print()
    timeoutResult.print()


    env.execute("Order Timeout Detect Job")
  }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM