需求
對訂單信息流進行監控,15分鍾之內沒有支付的發出警告
Flink CEP 實現
import org.apache.flink.cep.scala.{CEP, PatternStream} import org.apache.flink.cep.scala.pattern.Pattern import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.util.Collector import scala.collection.Map object OrderTimeout { case class OrderEvent(orderId: String, eventType: String, eventTime: Long) def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val stream: KeyedStream[OrderEvent, String] = env .fromElements( OrderEvent("order_1", "create", 2000L), OrderEvent("order_2", "create", 3000L), OrderEvent("order_2", "pay", 4000L) ) .assignAscendingTimestamps(_.eventTime) .keyBy(_.orderId) //定義匹配模板 val pattern: Pattern[OrderEvent, OrderEvent] = Pattern .begin[OrderEvent]("create").where(_.eventType.equals("create")) .next("pay").where(_.eventType.equals("pay"))//嚴格近鄰 .within(Time.minutes(15))//15分鍾之內 //將流和匹配模板輸入,得到匹配后的流 val patternedStream: PatternStream[OrderEvent] = CEP.pattern(stream, pattern) // 用來輸出超時訂單的側輸出標簽 val orderTimeoutOutput: OutputTag[String] = new OutputTag[String]("timeout") // 用來處理超時訂單的函數 val timeoutFunc: (Map[String, Iterable[OrderEvent]], Long, Collector[String]) => Unit = (map: Map[String, Iterable[OrderEvent]], ts: Long, out: Collector[String]) => { println("ts" + ts) // 2s + 5s 5s的最大延遲時間 //這個名字是之前在定義模式時每個個體模式取得名字 val orderStart: OrderEvent = map("create").head // 等價於map.getOrElse("create", null).iterator.next() // 將報警信息發送到側輸出流去 out.collect(orderStart.orderId + "沒有支付!") } //處理沒有超時訂單的函數 //map是Scala的map,注意導包的准確!!! val selectFunc: (Map[String, Iterable[OrderEvent]], Collector[String]) => Unit = (map: Map[String, Iterable[OrderEvent]], out: Collector[String]) => { val order: OrderEvent = map("pay").head out.collect(order.orderId + "已經支付!") } val outputStream = patternedStream //柯里化,傳入三個參數 // 第一個參數:用來輸出超時事件的側輸出標簽 // 第二個參數:用來輸出超時事件的函數 // 第三個參數:用來輸出沒有超時的事件的函數 .flatSelect(outputTag = orderTimeoutOutput)(patternFlatTimeoutFunction = timeoutFunc)(patternFlatSelectFunction = selectFunc) outputStream.print() outputStream.getSideOutput(new OutputTag[String]("timeout")).print() env.execute() } }
Flink 底層API實現
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector object OrderTimeoutWithoutCep { case class OrderEvent(orderId: String, eventType: String, eventTime: Long) def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) println("111") val stream: DataStream[String] = env .fromElements( OrderEvent("order_1", "create", 2000L), OrderEvent("order_2", "create", 3000L), OrderEvent("order_2", "pay", 4000L), OrderEvent("order_1", "pay", 10000L) ) .setParallelism(1) .assignAscendingTimestamps(_.eventTime) .keyBy(_.orderId) .process(new OrderTimeoutFunc) val timeoutOutput = new OutputTag[String]("timeout") stream.getSideOutput(timeoutOutput).print() stream.print() env.execute() } class OrderTimeoutFunc extends KeyedProcessFunction[String, OrderEvent, String] { lazy val orderState: ValueState[OrderEvent] = getRuntimeContext.getState( new ValueStateDescriptor[OrderEvent]("saved order", classOf[OrderEvent]) ) override def processElement(value: OrderEvent, ctx: KeyedProcessFunction[String, OrderEvent, String]#Context, out: Collector[String]): Unit = { if (value.eventType.equals("create")) { // 到來的事件是下訂單事件 if (orderState.value() == null) { // 要判空,因為pay事件可能先到 orderState.update(value) // 將create事件存到狀態變量 ctx.timerService().registerEventTimeTimer(value.eventTime + 5000L) } } else { orderState.update(value) // 將pay事件保存到狀態變量 out.collect("已經支付的訂單ID是:" + value.orderId) } } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, OrderEvent, String]#OnTimerContext, out: Collector[String]): Unit = { val order: OrderEvent = orderState.value() //如果狀態仍然為create則報警 if (order != null && order.eventType.equals("create")) { ctx.output(new OutputTag[String]("timeout"), "超時訂單的ID為:" + order.orderId) } orderState.clear() } } }