flink(六) 電商用戶行為分析(六)惡意登錄監控之連續登陸超時


1 模塊創建和數據准備

  繼續在 UserBehaviorAnalysis 下新建一個 maven module 作為子項目,命名為LoginFailDetect。在這個子模塊中,我們將會用到 flink 的 CEP 庫來實現事件流的模
式匹配,所以需要在 pom 文件中引入 CEP 的相關依賴:
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
  對於網站而言,用戶登錄並不是頻繁的業務操作。如果一個用戶短時間內頻繁登錄失敗,就有可能是出現了程序的惡意攻擊,比如密碼暴力破解。因此我們考慮,
應該對用戶的登錄失敗動作進行統計,具體來說,如果同一用戶(可以是不同 IP)在 2 秒之內連續兩次登錄失敗,就認為存在惡意登錄的風險,輸出相關的信息進行
報警提示。這是電商網站、也是幾乎所有網站風控的基本一環。
 
2.1 狀態編程
  由於同樣引入了時間,我們可以想到,最簡單的方法其實與之前的熱門統計類似,只需要按照用戶 ID 分流,然后遇到登錄失敗的事件時將其保存在 ListState 中,
然后設置一個定時器,2 秒后觸發。定時器觸發時檢查狀態中的登錄失敗事件個數,如果大於等於 2,那么就輸出報警信息。
  在 src/main/scala 下創建 LoginFail.scala 文件,新建一個單例對象。定義樣例類LoginEvent,這是輸入的登錄事件流。登錄數據本應該從 UserBehavior 日志里提取,
由於 UserBehavior.csv 中沒有做相關埋點,我們從另一個文件 LoginLog.csv 中讀取登錄數據。
package com.atguigu

import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

//定義輸入輸出類
case class LoginEvent(userId:Long, ip:String, eventType:String, eventTime: Long)
case class Warning(userId:Long, firstFailTime:Long, lastFailTime:Long, warningMsg:String)

object LoginFail {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val resource = getClass.getResource("/LoginLog.csv")
    //val loginEventStream:DataStream[LoginEvent] = env.readTextFile(resource.getPath)

    val loginEventStream:DataStream[LoginEvent] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\LoginFailDetect\\src\\main\\resources\\LoginLog.csv")
      .map(data => {
        val dataArray = data.split(",")
        LoginEvent(dataArray(0).toLong, dataArray(1), dataArray(2), dataArray(3).toLong)
      })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEvent](Time.seconds(3)) {
        override def extractTimestamp(t: LoginEvent): Long = t.eventTime*1000L
      })

    val loginWarningStream:DataStream[Warning] = loginEventStream
      .keyBy(_.userId)
      .process( new LoginFailWarning(2))

    loginWarningStream.print()
    env.execute("login fail job")

  }

}

// 實現自定義的ProcessFunction
class LoginFailWarning(maxFailTime: Int) extends KeyedProcessFunction[Long, LoginEvent, Warning]{
  // 定義list狀態,用來保存2秒內所有的登錄失敗事件
  lazy val LoginFailListState: ListState[LoginEvent] = getRuntimeContext.getListState(new ListStateDescriptor[LoginEvent]("saved-logingfail",classOf[LoginEvent]))
  // 定義value狀態,用來保存定時器的時間戳
  lazy val timerTsState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("time-ts",classOf[Long]))

  override def processElement(value: LoginEvent, context: KeyedProcessFunction[Long, LoginEvent, Warning]#Context, collector: Collector[Warning]): Unit = {
    if(value.eventType == "fail"){
      LoginFailListState.add(value)
      if(timerTsState.value()==0){
        val ts = value.eventTime*1000L + 2000L
        context.timerService().registerEventTimeTimer(ts)
        timerTsState.update(ts)
      }
    }else{
      context.timerService().deleteEventTimeTimer(timerTsState.value())
      LoginFailListState.clear()
      timerTsState.clear()
    }
  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, LoginEvent, Warning]#OnTimerContext, out: Collector[Warning]): Unit = {
    val allLoginFailList:ListBuffer[LoginEvent] = new ListBuffer[LoginEvent]
    val iter = LoginFailListState.get().iterator()
    while(iter.hasNext){
      allLoginFailList += iter.next()
    }

    if(allLoginFailList.length >= maxFailTime){
      out.collect(Warning( ctx.getCurrentKey,
        allLoginFailList.head.eventTime,
        allLoginFailList.last.eventTime,
      "login fall in 2s for " + allLoginFailList.length + " times."))

    }

    LoginFailListState.clear()
    timerTsState.clear()


  }


}

CEP

package com.atguigu.loginfail_detect

import java.util

import com.atguigu.LoginFail.getClass
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer


//定義輸入輸出類
case class LoginEvent(userId:Long, ip:String, eventType:String, eventTime: Long)
case class Warning(userId:Long, firstFailTime:Long, lastFailTime:Long, warningMsg:String)

object LoginFailCEP {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val resource = getClass.getResource("/LoginLog.csv")
    //val loginEventStream:DataStream[LoginEvent] = env.readTextFile(resource.getPath)

    val loginEventStream:DataStream[LoginEvent] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\LoginFailDetect\\src\\main\\resources\\LoginLog.csv")
      .map(data => {
        val dataArray = data.split(",")
        LoginEvent(dataArray(0).toLong, dataArray(1), dataArray(2), dataArray(3).toLong)
      })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEvent](Time.seconds(3)) {
        override def extractTimestamp(t: LoginEvent): Long = t.eventTime*1000L
      })

    // 1.定義匹配的模式
    val loginFailPattern: Pattern[LoginEvent, LoginEvent] = Pattern
      .begin[LoginEvent]("firstFail").where(_.eventType == "fail")
      .next("secondFail").where(_.eventType == "fail")
      .within(Time.seconds(2))

    // 2 在分組之后的數據流上應用模式,等到一個PatternStream
    val patternStream = CEP.pattern(loginEventStream.keyBy(_.userId), loginFailPattern)

    // 3 將檢測到的事件序列,轉換輸出報警信息
    val loginFailStream: DataStream[Warning] = patternStream.select( new LoginFailDetect())

    // 4 打印輸出
    loginFailStream.print()

    env.execute("login fail job")
  }

}

// 自定義PatternSelectFunction, 用來檢測到的連續登陸失敗事件,包裝成報警信息輸出
class LoginFailDetect extends PatternSelectFunction[LoginEvent, Warning]{
  override def select(map: util.Map[String, util.List[LoginEvent]]): Warning = {
    // map 例存放的就是匹配到的一組事件,key是定義好的事件模式名稱
    val firstLoginFail = map.get("firstFail").get(0)
    val secondLoginFail = map.get("secondFail").get(0)
    Warning( firstLoginFail.userId, firstLoginFail.eventTime, secondLoginFail.eventTime, "login fail")

  }
}

 

 
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM