Flink 實現訂單支付實時監控


需求

  對訂單信息流進行監控,15分鍾之內沒有支付的發出警告

Flink CEP 實現

import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

import scala.collection.Map

object OrderTimeout {
  case class OrderEvent(orderId: String, eventType: String, eventTime: Long)

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    val stream: KeyedStream[OrderEvent, String] = env
      .fromElements(
        OrderEvent("order_1", "create", 2000L),
        OrderEvent("order_2", "create", 3000L),
        OrderEvent("order_2", "pay", 4000L)
      )
      .assignAscendingTimestamps(_.eventTime)
      .keyBy(_.orderId)

    //定義匹配模板
    val pattern: Pattern[OrderEvent, OrderEvent] = Pattern
      .begin[OrderEvent]("create").where(_.eventType.equals("create"))
      .next("pay").where(_.eventType.equals("pay"))//嚴格近鄰
      .within(Time.minutes(15))//15分鍾之內


    //將流和匹配模板輸入,得到匹配后的流
    val patternedStream: PatternStream[OrderEvent] = CEP.pattern(stream, pattern)

    // 用來輸出超時訂單的側輸出標簽
    val orderTimeoutOutput: OutputTag[String] = new OutputTag[String]("timeout")

    // 用來處理超時訂單的函數
    val timeoutFunc: (Map[String, Iterable[OrderEvent]], Long, Collector[String]) => Unit = (map: Map[String, Iterable[OrderEvent]], ts: Long, out: Collector[String]) => {
      println("ts" + ts) // 2s + 5s  5s的最大延遲時間

      //這個名字是之前在定義模式時每個個體模式取得名字
      val orderStart: OrderEvent = map("create").head  // 等價於map.getOrElse("create", null).iterator.next()
      // 將報警信息發送到側輸出流去
      out.collect(orderStart.orderId + "沒有支付!")
    }
    //處理沒有超時訂單的函數
    //map是Scala的map,注意導包的准確!!!
    val selectFunc: (Map[String, Iterable[OrderEvent]], Collector[String]) => Unit = (map: Map[String, Iterable[OrderEvent]], out: Collector[String]) => {
      val order: OrderEvent = map("pay").head
      out.collect(order.orderId + "已經支付!")
    }


    val outputStream = patternedStream
      //柯里化,傳入三個參數
      // 第一個參數:用來輸出超時事件的側輸出標簽
      // 第二個參數:用來輸出超時事件的函數
      // 第三個參數:用來輸出沒有超時的事件的函數
      .flatSelect(outputTag = orderTimeoutOutput)(patternFlatTimeoutFunction = timeoutFunc)(patternFlatSelectFunction = selectFunc)

    outputStream.print()
    outputStream.getSideOutput(new OutputTag[String]("timeout")).print()

    env.execute()
  }
}

Flink 底層API實現

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object OrderTimeoutWithoutCep {
  case class OrderEvent(orderId: String, eventType: String, eventTime: Long)

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
   env.setParallelism(1)
    println("111")

    val stream: DataStream[String] = env
      .fromElements(
        OrderEvent("order_1", "create", 2000L),
        OrderEvent("order_2", "create", 3000L),
        OrderEvent("order_2", "pay", 4000L),
        OrderEvent("order_1", "pay", 10000L)
      )
      .setParallelism(1)
      .assignAscendingTimestamps(_.eventTime)
      .keyBy(_.orderId)
      .process(new OrderTimeoutFunc)

    val timeoutOutput = new OutputTag[String]("timeout")
    stream.getSideOutput(timeoutOutput).print()
    stream.print()
    env.execute()
  }

  class OrderTimeoutFunc extends KeyedProcessFunction[String, OrderEvent, String] {
    lazy val orderState: ValueState[OrderEvent] = getRuntimeContext.getState(
      new ValueStateDescriptor[OrderEvent]("saved order", classOf[OrderEvent])
    )

    override def processElement(value: OrderEvent, ctx: KeyedProcessFunction[String, OrderEvent, String]#Context, out: Collector[String]): Unit = {
      if (value.eventType.equals("create")) {  // 到來的事件是下訂單事件
        if (orderState.value() == null) { // 要判空,因為pay事件可能先到
          orderState.update(value) // 將create事件存到狀態變量
          ctx.timerService().registerEventTimeTimer(value.eventTime + 5000L)
        }
      } else {
        orderState.update(value) // 將pay事件保存到狀態變量
        out.collect("已經支付的訂單ID是:" + value.orderId)
      }
    }

    override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, OrderEvent, String]#OnTimerContext, out: Collector[String]): Unit = {
      val order: OrderEvent = orderState.value()
      //如果狀態仍然為create則報警
      if (order != null && order.eventType.equals("create")) {
        ctx.output(new OutputTag[String]("timeout"), "超時訂單的ID為:" + order.orderId)
      }
      orderState.clear()
    }
  }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM