flink(七) 電商用戶行為分析(七)訂單支付實時監控之訂單超時、訂單交易匹配


1 簡介

  在電商網站中,訂單的支付作為直接與營銷收入掛鈎的一環,在業務流程中非常重要。對於訂單而言,為了正確控制業務流程,也為了增加用戶的支付意願,網
站一般會設置一個支付失效時間,超過一段時間不支付的訂單就會被取消。另外,對於訂單的支付,我們還應保證用戶支付的正確性,這可以通過第三方支付平台的
交易數據來做一個實時對賬。在接下來的內容中,我們將實現這兩個需求。

2 模塊創建和數據准備

  同樣地,在 UserBehaviorAnalysis 下新建一個 maven module 作為子項目,命名為 OrderTimeoutDetect。在這個子模塊中,我們同樣將會用到 flink 的 CEP 庫來實現
事件流的模式匹配,所以需要在 pom 文件中引入 CEP 的相關依賴:
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
package com.atguigu.orderpay_detect

import java.util

import org.apache.flink.cep.{PatternSelectFunction, PatternTimeoutFunction}
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time


// 輸入輸出的樣例類
case class OrderEvent(orderId:Long, eventType:String, txId:String, eventTime:Long)
case class OrderResult(orderId:Long, resultMsg: String)

object OrderTimeOut {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    // 從文件中讀取數據,並轉換為樣例類
    val resource = getClass.getResource("/OrderLog.csv")
    //val orderEventStream: DataStream[OrderEvent] = env.readTextFile(resource.getPath)
    val orderEventStream: DataStream[OrderEvent] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\OrderPayDetect\\src\\main\\resources\\OrderLog.csv")
      .map(data => {
        val dataArray = data.split(",")
        OrderEvent( dataArray(0).toLong, dataArray(1), dataArray(2), dataArray(3).toLong)
      })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(3)) {
        override def extractTimestamp(t: OrderEvent): Long = t.eventTime * 1000L
      })

    // 1 定義一個要匹配事件序列的模式
    val orderPayPattern = Pattern
      .begin[OrderEvent]("create").where(_.eventType == "create") // 首先是訂單的create事件
      .followedBy("pay").where(_.eventType == "pay") // 后面來的是訂單的pay事件
      .within(Time.minutes(15))

    // 2 將pattern應用在按照orderId分組的數據流上
    val patternStream = CEP.pattern(orderEventStream.keyBy(_.orderId), orderPayPattern)

    // 3 定義一個側輸出流標簽,用來標明超時事件的側輸出流
    val orderTimeOutOutputTag = new OutputTag[OrderResult]("order timeout")

    // 4 調用select方法,提取匹配事件和超時事件,分別進行轉換輸出
    val resultStream: DataStream[OrderResult] = patternStream
      .select(orderTimeOutOutputTag, new OrderTimeoutSelect(), new OrderPaySelect())

    // 5 打印輸出
    resultStream.print("payed")
    resultStream.getSideOutput(orderTimeOutOutputTag).print("timeout")

    env.execute(" order timeout detect job")

  }

}

// 自定義超時處理函數
class OrderTimeoutSelect() extends PatternTimeoutFunction[OrderEvent, OrderResult]{
  override def timeout(map: util.Map[String, util.List[OrderEvent]], l: Long): OrderResult = {
    val timeoutOrderId = map.get("create").iterator().next().orderId
    OrderResult(timeoutOrderId, "timeout at" + l)
  }
}

//自定義匹配處理函數
class OrderPaySelect() extends PatternSelectFunction[OrderEvent, OrderResult]{
  override def select(map: util.Map[String, util.List[OrderEvent]]): OrderResult = {
    val payedOrderId = map.get("pay").get(0).orderId
    OrderResult(payedOrderId, "payed successfully")
  }

}

withoutCEP

package com.atguigu.orderpay_detect

import com.atguigu.orderpay_detect.OrderTimeOut.getClass
import org.apache.flink.api.common.state._
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector


// 輸入輸出的樣例類
case class OrderEvent(orderId:Long, eventType:String, txId:String, eventTime:Long)
case class OrderResult(orderId:Long, resultMsg: String)

object OrderTimeoutWithoutCEP {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    // 從文件中讀取數據,並轉換為樣例類
    val resource = getClass.getResource("/OrderLog.csv")
    //val orderEventStream: DataStream[OrderEvent] = env.readTextFile(resource.getPath)
    val orderEventStream: DataStream[OrderEvent] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\OrderPayDetect\\src\\main\\resources\\OrderLog.csv")
      .map(data => {
        val dataArray = data.split(",")
        OrderEvent( dataArray(0).toLong, dataArray(1), dataArray(2), dataArray(3).toLong)
      })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(3)) {
        override def extractTimestamp(t: OrderEvent): Long = t.eventTime * 1000L
      })

    // 自定義Process Function 做精細化的流程控制

    val orderResultStream:DataStream[OrderResult] = orderEventStream
        .keyBy(_.orderId)
        .process( new OrderPayMatchDetect())

    // 打印輸出
    orderEventStream.print()
    orderResultStream.getSideOutput(new OutputTag[OrderResult]("timeout")).print("timeout")

    env.execute(" order timeout detect job")
  }
}

// 實現自定義KeyedProcessFunction, 主流輸出正常支付訂單,側輸出流輸出超時報警訂單

class OrderPayMatchDetect() extends KeyedProcessFunction[Long, OrderEvent, OrderResult]{
  // 定義狀態,用來保存是否來過create和pay事件的標識位,以及定時器事件戳
  lazy val isPayedState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-payed", classOf[Boolean]))
  lazy val isCreatedState: ValueState[Boolean] =  getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-created", classOf[Boolean]))
  lazy val timerTsState: ValueState[Long] = getRuntimeContext.getState( new ValueStateDescriptor[Long]("time-ts", classOf[Long]))

  val orderTimeoutOutputTag = new OutputTag[OrderResult]("timeout")



  override def processElement(value: OrderEvent, context: KeyedProcessFunction[Long, OrderEvent, OrderResult]#Context, collector: Collector[OrderResult]): Unit = {
    // 先取出當前狀態
    val isPayed = isPayedState.value()
    val isCreated = isCreatedState.value()
    val timerTs = timerTsState.value()


    // 判斷當前事件的類型,分成不同情況討論
    // 情況1:來的是create, 要繼續判斷之前是否有pay來過
    if (value.eventType == "create") {
      // 情況1.1: 如果已經pay過的話,匹配成功
      if (isPayed) {
        collector.collect(OrderResult(value.orderId, "payed successfully"))
        isPayedState.clear()
        timerTsState.clear()
        context.timerService().deleteEventTimeTimer(timerTs)
      }
      // 情況1.2:如果沒有pay過的話,那么注冊一個15分鍾的定時器,開始等待
      else {
        val ts = value.eventTime * 1000L + 15 * 60 * 1000L
        context.timerService().registerEventTimeTimer(ts)
        timerTsState.update(ts)
        isCreatedState.update(true)

      }
    }
    // 情況2:來的是pay,要繼續判斷是否來過create
    else if (value.eventType == "pay"){
    // 情況2.1; 如果create 已經來過,匹配成功,要繼續判斷間隔時間是否超過了15分鍾
      if( isCreated){
        // 情況2.1.1: 如果沒有超時,正常輸出結果到主流
        if(value.eventTime * 1000L < timerTs) {
          collector.collect(OrderResult(value.orderId, "payed successfully"))
        }else{
          // 情況2.1.2: 如果已經超時,輸出timeout報警到側輸出流
          context.output(orderTimeoutOutputTag, OrderResult(value.orderId, "payed but already timeout"))
        }
        // 不論哪種情況,有了輸出,清空狀態
        isCreatedState.clear()
        timerTsState.clear()
        context.timerService().deleteEventTimeTimer(timerTs)

      }
    // 情況2.2: 如果create沒來,需要等待亂序create,注冊一個當前pay時間戳的定時器
      else{
        val ts = value.eventTime *1000L
        context.timerService().registerEventTimeTimer(ts)
        timerTsState.update(ts)
        isPayedState.update(true)
      }
  }

  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#OnTimerContext, out: Collector[OrderResult]): Unit = {
    // 定時器觸發要判斷是哪種情況
    if( isPayedState.value()){
      // 如果pay過,那么說明create沒來,可能出現數據丟失異常情況
      ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey, "already payed but not found created log"))
    } else {
      // 如果沒有pay 過,那么說明真正15分鍾超時
      ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey, "order timeout"))
    }

    // 清理狀態
    isPayedState.clear()
    isCreatedState.clear()
    timerTsState.clear()
  }

}

3 來自兩條流的訂單交易匹配

  對於訂單支付事件,用戶支付完成其實並不算完,我們還得確認平台賬戶上是否到賬了。而往往這會來自不同的日志信息,所以我們要同時讀入兩條流的數據來
做 合 並 處 理 。 這 里 我 們 利 用 connect 將 兩 條 流 進 行 連 接 , 然 后 用 自 定 義 的CoProcessFunction 進行處理。
package com.atguigu.orderpay_detect

import com.atguigu.orderpay_detect.OrderTimeoutWithoutCEP.getClass
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

// 輸入輸出的樣例類
case class ReceiptEvent(txId:String, payChannel:String, timestamp:Long)
case class OrderEvent(orderId:Long, eventType:String, txId:String, eventTime:Long)

object OrderPayTxMatch {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    // 從文件中讀取數據,並轉換為樣例類
    val resource = getClass.getResource("/OrderLog.csv")
    //val orderEventStream: DataStream[OrderEvent] = env.readTextFile(resource.getPath)
    val orderEventStream: DataStream[OrderEvent] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\OrderPayDetect\\src\\main\\resources\\OrderLog.csv")
      .map(data => {
        val dataArray = data.split(",")
        OrderEvent(dataArray(0).toLong, dataArray(1), dataArray(2), dataArray(3).toLong)
      })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(3)) {
        override def extractTimestamp(t: OrderEvent): Long = t.eventTime * 1000L
      })
      .filter(_.eventType != "") // 只過濾出pay事件
      .keyBy(_.txId)

    // 從文件中讀取數據,並轉換為樣例類
    val resource2 = getClass.getResource("/OrderLog.csv")
    //val orderEventStream: DataStream[OrderEvent] = env.readTextFile(resource.getPath)
    val receiptEventStream: DataStream[ReceiptEvent] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\OrderPayDetect\\src\\main\\resources\\ReceiptLog.csv")
      .map(data => {
        val dataArray = data.split(",")
        ReceiptEvent(dataArray(0), dataArray(1), dataArray(2).toLong)
      })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ReceiptEvent](Time.seconds(3)) {
        override def extractTimestamp(t: ReceiptEvent): Long = t.timestamp * 1000L
      })
      .keyBy(_.txId)


    // 用connect連接兩條流,匹配事件進行處理
    val resultStream:DataStream[(OrderEvent, ReceiptEvent)] = orderEventStream
      .connect(receiptEventStream)
      .process(new OrderPayTxDetect())

    val unmatchedPays = new OutputTag[OrderEvent]("unmatched-pays")
    val unmatchedReceipts = new OutputTag[ReceiptEvent]("unmatched-receipts")

    resultStream.print("matched")
    resultStream.getSideOutput(unmatchedPays).print("unmatched-pays")
    resultStream.getSideOutput(unmatchedReceipts).print("unmatched-receipts")
    env.execute("order pay tx match job")

  }
}

// 自定義CoProcessFunction 實現兩條流數據的匹配檢驗
class OrderPayTxDetect() extends CoProcessFunction[OrderEvent, ReceiptEvent,(OrderEvent, ReceiptEvent)]{
  // 用兩個valueState 保存當前交易應對的支付事件和到賬事件
  lazy val payState: ValueState[OrderEvent] = getRuntimeContext.getState(new ValueStateDescriptor[OrderEvent]("pay", classOf[OrderEvent]))
  lazy val receiptState: ValueState[ReceiptEvent] = getRuntimeContext.getState(new ValueStateDescriptor[ReceiptEvent]("receipt", classOf[ReceiptEvent]))

  val unmatchedPays = new OutputTag[OrderEvent]("unmatched-pays")
  val unmatchedReceipts = new OutputTag[ReceiptEvent]("unmatched-receipts")


  override def processElement1(pay: OrderEvent, context: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, collector: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
    // pay 來了,考察是否有對應的receipt來過
    val receipt = receiptState.value()
    if(receipt !=null){
      collector.collect((pay, receipt))
      receiptState.clear()

    }else{
      // 如果receipt還沒有來,那么把pay存入狀態,注冊一個定時器等待5秒
      payState.update(pay)
      context.timerService().registerEventTimeTimer(pay.eventTime *1000L + 5000L)
    }

  }

  override def processElement2(receipt: ReceiptEvent, context: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, collector: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
    // receipt 來了,考察是否有對應的receipt來過
    val pay = payState.value()
    if(pay !=null){
      collector.collect((pay, receipt))
      payState.clear()

    }else{
      // 如果pay還沒有來,那么把pay存入狀態,注冊一個定時器等待3秒
      receiptState.update(receipt)
      context.timerService().registerEventTimeTimer(receipt.timestamp *1000L + 3000L)
    }
  }
// 定時觸發, 有兩種情況,所以要判斷當前有沒有pay和receipt
  override def onTimer(timestamp: Long, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#OnTimerContext, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
  // 如果pay不為空,說明receipt沒來,輸出unmatechedPays
    if(payState.value() != null){
      ctx.output(unmatchedPays,payState.value())
    }
    if(receiptState.value() != null){
      ctx.output( unmatchedReceipts, receiptState.value())
    }
    payState.clear()
    receiptState.clear()

  }
}

withJOIN

package com.atguigu.orderpay_detect

import com.atguigu.orderpay_detect.OrderPayTxMatch.getClass
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

// 輸入輸出的樣例類
case class ReceiptEvent(txId:String, payChannel:String, timestamp:Long)
case class OrderEvent(orderId:Long, eventType:String, txId:String, eventTime:Long)

object OrderPayTxMatchWithJoin {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    // 從文件中讀取數據,並轉換為樣例類
    val resource = getClass.getResource("/OrderLog.csv")
    //val orderEventStream: DataStream[OrderEvent] = env.readTextFile(resource.getPath)
    val orderEventStream: KeyedStream[OrderEvent, String] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\OrderPayDetect\\src\\main\\resources\\OrderLog.csv")
      .map(data => {
        val dataArray = data.split(",")
        OrderEvent(dataArray(0).toLong, dataArray(1), dataArray(2), dataArray(3).toLong)
      })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(3)) {
        override def extractTimestamp(t: OrderEvent): Long = t.eventTime * 1000L
      })
      .filter(_.eventType != "") // 只過濾出pay事件
      .keyBy(_.txId)

    // 從文件中讀取數據,並轉換為樣例類
    val resource2 = getClass.getResource("/OrderLog.csv")
    //val orderEventStream: DataStream[OrderEvent] = env.readTextFile(resource.getPath)
    val receiptEventStream: KeyedStream[ReceiptEvent, String] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\OrderPayDetect\\src\\main\\resources\\ReceiptLog.csv")
      .map(data => {
        val dataArray = data.split(",")
        ReceiptEvent(dataArray(0), dataArray(1), dataArray(2).toLong)
      })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ReceiptEvent](Time.seconds(3)) {
        override def extractTimestamp(t: ReceiptEvent): Long = t.timestamp * 1000L
      })
      .keyBy(_.txId)

    // 使用join連接兩條流
    val resultStream:DataStream[(OrderEvent, ReceiptEvent)] = orderEventStream
      .intervalJoin(receiptEventStream)
      .between(Time.seconds(-3),Time.seconds(5))
      .process(new OrderPayTxDetectWithJoin())

    resultStream.print()
    env.execute("order pay tx match with join job")

  }

}

// 自定義ProcessJoinFunction
class OrderPayTxDetectWithJoin() extends ProcessJoinFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]{
  override def processElement(left: OrderEvent, right: ReceiptEvent, context: ProcessJoinFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, collector: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
    collector.collect((left, right))
  }
}

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM