1 簡介
在電商網站中,訂單的支付作為直接與營銷收入掛鈎的一環,在業務流程中非常重要。對於訂單而言,為了正確控制業務流程,也為了增加用戶的支付意願,網
站一般會設置一個支付失效時間,超過一段時間不支付的訂單就會被取消。另外,對於訂單的支付,我們還應保證用戶支付的正確性,這可以通過第三方支付平台的
交易數據來做一個實時對賬。在接下來的內容中,我們將實現這兩個需求。
2 模塊創建和數據准備
同樣地,在 UserBehaviorAnalysis 下新建一個 maven module 作為子項目,命名為 OrderTimeoutDetect。在這個子模塊中,我們同樣將會用到 flink 的 CEP 庫來實現
事件流的模式匹配,所以需要在 pom 文件中引入 CEP 的相關依賴:
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep-scala_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> </dependencies>
package com.atguigu.orderpay_detect import java.util import org.apache.flink.cep.{PatternSelectFunction, PatternTimeoutFunction} import org.apache.flink.cep.scala.CEP import org.apache.flink.cep.scala.pattern.Pattern import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time // 輸入輸出的樣例類 case class OrderEvent(orderId:Long, eventType:String, txId:String, eventTime:Long) case class OrderResult(orderId:Long, resultMsg: String) object OrderTimeOut { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) // 從文件中讀取數據,並轉換為樣例類 val resource = getClass.getResource("/OrderLog.csv") //val orderEventStream: DataStream[OrderEvent] = env.readTextFile(resource.getPath) val orderEventStream: DataStream[OrderEvent] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\OrderPayDetect\\src\\main\\resources\\OrderLog.csv") .map(data => { val dataArray = data.split(",") OrderEvent( dataArray(0).toLong, dataArray(1), dataArray(2), dataArray(3).toLong) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(3)) { override def extractTimestamp(t: OrderEvent): Long = t.eventTime * 1000L }) // 1 定義一個要匹配事件序列的模式 val orderPayPattern = Pattern .begin[OrderEvent]("create").where(_.eventType == "create") // 首先是訂單的create事件 .followedBy("pay").where(_.eventType == "pay") // 后面來的是訂單的pay事件 .within(Time.minutes(15)) // 2 將pattern應用在按照orderId分組的數據流上 val patternStream = CEP.pattern(orderEventStream.keyBy(_.orderId), orderPayPattern) // 3 定義一個側輸出流標簽,用來標明超時事件的側輸出流 val orderTimeOutOutputTag = new OutputTag[OrderResult]("order timeout") // 4 調用select方法,提取匹配事件和超時事件,分別進行轉換輸出 val resultStream: DataStream[OrderResult] = patternStream .select(orderTimeOutOutputTag, new OrderTimeoutSelect(), new OrderPaySelect()) // 5 打印輸出 resultStream.print("payed") resultStream.getSideOutput(orderTimeOutOutputTag).print("timeout") env.execute(" order timeout detect job") } } // 自定義超時處理函數 class OrderTimeoutSelect() extends PatternTimeoutFunction[OrderEvent, OrderResult]{ override def timeout(map: util.Map[String, util.List[OrderEvent]], l: Long): OrderResult = { val timeoutOrderId = map.get("create").iterator().next().orderId OrderResult(timeoutOrderId, "timeout at" + l) } } //自定義匹配處理函數 class OrderPaySelect() extends PatternSelectFunction[OrderEvent, OrderResult]{ override def select(map: util.Map[String, util.List[OrderEvent]]): OrderResult = { val payedOrderId = map.get("pay").get(0).orderId OrderResult(payedOrderId, "payed successfully") } }
withoutCEP
package com.atguigu.orderpay_detect import com.atguigu.orderpay_detect.OrderTimeOut.getClass import org.apache.flink.api.common.state._ import org.apache.flink.cep.scala.CEP import org.apache.flink.cep.scala.pattern.Pattern import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.util.Collector // 輸入輸出的樣例類 case class OrderEvent(orderId:Long, eventType:String, txId:String, eventTime:Long) case class OrderResult(orderId:Long, resultMsg: String) object OrderTimeoutWithoutCEP { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) // 從文件中讀取數據,並轉換為樣例類 val resource = getClass.getResource("/OrderLog.csv") //val orderEventStream: DataStream[OrderEvent] = env.readTextFile(resource.getPath) val orderEventStream: DataStream[OrderEvent] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\OrderPayDetect\\src\\main\\resources\\OrderLog.csv") .map(data => { val dataArray = data.split(",") OrderEvent( dataArray(0).toLong, dataArray(1), dataArray(2), dataArray(3).toLong) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(3)) { override def extractTimestamp(t: OrderEvent): Long = t.eventTime * 1000L }) // 自定義Process Function 做精細化的流程控制 val orderResultStream:DataStream[OrderResult] = orderEventStream .keyBy(_.orderId) .process( new OrderPayMatchDetect()) // 打印輸出 orderEventStream.print() orderResultStream.getSideOutput(new OutputTag[OrderResult]("timeout")).print("timeout") env.execute(" order timeout detect job") } } // 實現自定義KeyedProcessFunction, 主流輸出正常支付訂單,側輸出流輸出超時報警訂單 class OrderPayMatchDetect() extends KeyedProcessFunction[Long, OrderEvent, OrderResult]{ // 定義狀態,用來保存是否來過create和pay事件的標識位,以及定時器事件戳 lazy val isPayedState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-payed", classOf[Boolean])) lazy val isCreatedState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("is-created", classOf[Boolean])) lazy val timerTsState: ValueState[Long] = getRuntimeContext.getState( new ValueStateDescriptor[Long]("time-ts", classOf[Long])) val orderTimeoutOutputTag = new OutputTag[OrderResult]("timeout") override def processElement(value: OrderEvent, context: KeyedProcessFunction[Long, OrderEvent, OrderResult]#Context, collector: Collector[OrderResult]): Unit = { // 先取出當前狀態 val isPayed = isPayedState.value() val isCreated = isCreatedState.value() val timerTs = timerTsState.value() // 判斷當前事件的類型,分成不同情況討論 // 情況1:來的是create, 要繼續判斷之前是否有pay來過 if (value.eventType == "create") { // 情況1.1: 如果已經pay過的話,匹配成功 if (isPayed) { collector.collect(OrderResult(value.orderId, "payed successfully")) isPayedState.clear() timerTsState.clear() context.timerService().deleteEventTimeTimer(timerTs) } // 情況1.2:如果沒有pay過的話,那么注冊一個15分鍾的定時器,開始等待 else { val ts = value.eventTime * 1000L + 15 * 60 * 1000L context.timerService().registerEventTimeTimer(ts) timerTsState.update(ts) isCreatedState.update(true) } } // 情況2:來的是pay,要繼續判斷是否來過create else if (value.eventType == "pay"){ // 情況2.1; 如果create 已經來過,匹配成功,要繼續判斷間隔時間是否超過了15分鍾 if( isCreated){ // 情況2.1.1: 如果沒有超時,正常輸出結果到主流 if(value.eventTime * 1000L < timerTs) { collector.collect(OrderResult(value.orderId, "payed successfully")) }else{ // 情況2.1.2: 如果已經超時,輸出timeout報警到側輸出流 context.output(orderTimeoutOutputTag, OrderResult(value.orderId, "payed but already timeout")) } // 不論哪種情況,有了輸出,清空狀態 isCreatedState.clear() timerTsState.clear() context.timerService().deleteEventTimeTimer(timerTs) } // 情況2.2: 如果create沒來,需要等待亂序create,注冊一個當前pay時間戳的定時器 else{ val ts = value.eventTime *1000L context.timerService().registerEventTimeTimer(ts) timerTsState.update(ts) isPayedState.update(true) } } } override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#OnTimerContext, out: Collector[OrderResult]): Unit = { // 定時器觸發要判斷是哪種情況 if( isPayedState.value()){ // 如果pay過,那么說明create沒來,可能出現數據丟失異常情況 ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey, "already payed but not found created log")) } else { // 如果沒有pay 過,那么說明真正15分鍾超時 ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey, "order timeout")) } // 清理狀態 isPayedState.clear() isCreatedState.clear() timerTsState.clear() } }
3 來自兩條流的訂單交易匹配
對於訂單支付事件,用戶支付完成其實並不算完,我們還得確認平台賬戶上是否到賬了。而往往這會來自不同的日志信息,所以我們要同時讀入兩條流的數據來
做 合 並 處 理 。 這 里 我 們 利 用 connect 將 兩 條 流 進 行 連 接 , 然 后 用 自 定 義 的CoProcessFunction 進行處理。
package com.atguigu.orderpay_detect import com.atguigu.orderpay_detect.OrderTimeoutWithoutCEP.getClass import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.co.CoProcessFunction import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.util.Collector // 輸入輸出的樣例類 case class ReceiptEvent(txId:String, payChannel:String, timestamp:Long) case class OrderEvent(orderId:Long, eventType:String, txId:String, eventTime:Long) object OrderPayTxMatch { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) // 從文件中讀取數據,並轉換為樣例類 val resource = getClass.getResource("/OrderLog.csv") //val orderEventStream: DataStream[OrderEvent] = env.readTextFile(resource.getPath) val orderEventStream: DataStream[OrderEvent] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\OrderPayDetect\\src\\main\\resources\\OrderLog.csv") .map(data => { val dataArray = data.split(",") OrderEvent(dataArray(0).toLong, dataArray(1), dataArray(2), dataArray(3).toLong) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(3)) { override def extractTimestamp(t: OrderEvent): Long = t.eventTime * 1000L }) .filter(_.eventType != "") // 只過濾出pay事件 .keyBy(_.txId) // 從文件中讀取數據,並轉換為樣例類 val resource2 = getClass.getResource("/OrderLog.csv") //val orderEventStream: DataStream[OrderEvent] = env.readTextFile(resource.getPath) val receiptEventStream: DataStream[ReceiptEvent] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\OrderPayDetect\\src\\main\\resources\\ReceiptLog.csv") .map(data => { val dataArray = data.split(",") ReceiptEvent(dataArray(0), dataArray(1), dataArray(2).toLong) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ReceiptEvent](Time.seconds(3)) { override def extractTimestamp(t: ReceiptEvent): Long = t.timestamp * 1000L }) .keyBy(_.txId) // 用connect連接兩條流,匹配事件進行處理 val resultStream:DataStream[(OrderEvent, ReceiptEvent)] = orderEventStream .connect(receiptEventStream) .process(new OrderPayTxDetect()) val unmatchedPays = new OutputTag[OrderEvent]("unmatched-pays") val unmatchedReceipts = new OutputTag[ReceiptEvent]("unmatched-receipts") resultStream.print("matched") resultStream.getSideOutput(unmatchedPays).print("unmatched-pays") resultStream.getSideOutput(unmatchedReceipts).print("unmatched-receipts") env.execute("order pay tx match job") } } // 自定義CoProcessFunction 實現兩條流數據的匹配檢驗 class OrderPayTxDetect() extends CoProcessFunction[OrderEvent, ReceiptEvent,(OrderEvent, ReceiptEvent)]{ // 用兩個valueState 保存當前交易應對的支付事件和到賬事件 lazy val payState: ValueState[OrderEvent] = getRuntimeContext.getState(new ValueStateDescriptor[OrderEvent]("pay", classOf[OrderEvent])) lazy val receiptState: ValueState[ReceiptEvent] = getRuntimeContext.getState(new ValueStateDescriptor[ReceiptEvent]("receipt", classOf[ReceiptEvent])) val unmatchedPays = new OutputTag[OrderEvent]("unmatched-pays") val unmatchedReceipts = new OutputTag[ReceiptEvent]("unmatched-receipts") override def processElement1(pay: OrderEvent, context: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, collector: Collector[(OrderEvent, ReceiptEvent)]): Unit = { // pay 來了,考察是否有對應的receipt來過 val receipt = receiptState.value() if(receipt !=null){ collector.collect((pay, receipt)) receiptState.clear() }else{ // 如果receipt還沒有來,那么把pay存入狀態,注冊一個定時器等待5秒 payState.update(pay) context.timerService().registerEventTimeTimer(pay.eventTime *1000L + 5000L) } } override def processElement2(receipt: ReceiptEvent, context: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, collector: Collector[(OrderEvent, ReceiptEvent)]): Unit = { // receipt 來了,考察是否有對應的receipt來過 val pay = payState.value() if(pay !=null){ collector.collect((pay, receipt)) payState.clear() }else{ // 如果pay還沒有來,那么把pay存入狀態,注冊一個定時器等待3秒 receiptState.update(receipt) context.timerService().registerEventTimeTimer(receipt.timestamp *1000L + 3000L) } } // 定時觸發, 有兩種情況,所以要判斷當前有沒有pay和receipt override def onTimer(timestamp: Long, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#OnTimerContext, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = { // 如果pay不為空,說明receipt沒來,輸出unmatechedPays if(payState.value() != null){ ctx.output(unmatchedPays,payState.value()) } if(receiptState.value() != null){ ctx.output( unmatchedReceipts, receiptState.value()) } payState.clear() receiptState.clear() } }
withJOIN
package com.atguigu.orderpay_detect import com.atguigu.orderpay_detect.OrderPayTxMatch.getClass import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.util.Collector // 輸入輸出的樣例類 case class ReceiptEvent(txId:String, payChannel:String, timestamp:Long) case class OrderEvent(orderId:Long, eventType:String, txId:String, eventTime:Long) object OrderPayTxMatchWithJoin { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) // 從文件中讀取數據,並轉換為樣例類 val resource = getClass.getResource("/OrderLog.csv") //val orderEventStream: DataStream[OrderEvent] = env.readTextFile(resource.getPath) val orderEventStream: KeyedStream[OrderEvent, String] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\OrderPayDetect\\src\\main\\resources\\OrderLog.csv") .map(data => { val dataArray = data.split(",") OrderEvent(dataArray(0).toLong, dataArray(1), dataArray(2), dataArray(3).toLong) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[OrderEvent](Time.seconds(3)) { override def extractTimestamp(t: OrderEvent): Long = t.eventTime * 1000L }) .filter(_.eventType != "") // 只過濾出pay事件 .keyBy(_.txId) // 從文件中讀取數據,並轉換為樣例類 val resource2 = getClass.getResource("/OrderLog.csv") //val orderEventStream: DataStream[OrderEvent] = env.readTextFile(resource.getPath) val receiptEventStream: KeyedStream[ReceiptEvent, String] = env.readTextFile("C:\\Users\\DELL\\IdeaProjects\\UserBehaviorAnalysis\\OrderPayDetect\\src\\main\\resources\\ReceiptLog.csv") .map(data => { val dataArray = data.split(",") ReceiptEvent(dataArray(0), dataArray(1), dataArray(2).toLong) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ReceiptEvent](Time.seconds(3)) { override def extractTimestamp(t: ReceiptEvent): Long = t.timestamp * 1000L }) .keyBy(_.txId) // 使用join連接兩條流 val resultStream:DataStream[(OrderEvent, ReceiptEvent)] = orderEventStream .intervalJoin(receiptEventStream) .between(Time.seconds(-3),Time.seconds(5)) .process(new OrderPayTxDetectWithJoin()) resultStream.print() env.execute("order pay tx match with join job") } } // 自定義ProcessJoinFunction class OrderPayTxDetectWithJoin() extends ProcessJoinFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]{ override def processElement(left: OrderEvent, right: ReceiptEvent, context: ProcessJoinFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, collector: Collector[(OrderEvent, ReceiptEvent)]): Unit = { collector.collect((left, right)) } }