需求
將五分鍾之內的訂單信息和支付信息進行對賬,對不上的發出警告
代碼實現
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.api.scala.typeutils.Types import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector /** * $CONTENT * * @author yangxu * @date 2020/6/20 14:36 * @version 1.0 */ object TwoStreamJoinDemo { // 訂單支付事件 case class OrderEvent(orderId: String, eventType: String, eventTime: Long) // 第三方支付事件,例如微信,支付寶 case class PayEvent(orderId: String, eventType: String, eventTime: Long) // 用來輸出沒有匹配到的訂單支付事件 val unmatchedOrders = new OutputTag[String]("unmatched-orders") // 用來輸出沒有匹配到的第三方支付事件 val unmatchedPays = new OutputTag[String]("unmatched-pays") def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val orders: KeyedStream[OrderEvent, String] = env .fromElements( OrderEvent("order_1", "pay", 2000L), OrderEvent("order_2", "pay", 5000L), OrderEvent("order_3", "pay", 6000L) ) .assignAscendingTimestamps(_.eventTime) .keyBy(_.orderId) val pays: KeyedStream[PayEvent, String] = env .fromElements( PayEvent("order_1", "weixin", 7000L), PayEvent("order_2", "weixin", 8000L), PayEvent("order_4", "weixin", 9000L) ) .assignAscendingTimestamps(_.eventTime) .keyBy(_.orderId) val processed = orders.connect(pays).process(new MatchFunction) processed.print() processed.getSideOutput(unmatchedOrders).print() processed.getSideOutput(unmatchedPays).print() env.execute() } //進入同一條流中的數據肯定是同一個key,即OrderId class MatchFunction extends KeyedCoProcessFunction[String, OrderEvent, PayEvent, String] { lazy private val orderState: ValueState[OrderEvent] = getRuntimeContext.getState(new ValueStateDescriptor[OrderEvent]("orderState", Types.of[OrderEvent])) lazy private val payState: ValueState[PayEvent] = getRuntimeContext.getState(new ValueStateDescriptor[PayEvent]("payState", Types.of[PayEvent])) override def processElement1(value: OrderEvent, ctx: KeyedCoProcessFunction[String, OrderEvent, PayEvent, String]#Context, out: Collector[String]): Unit = { //從payState中查找數據,如果存在,說明匹配成功 val pay = payState.value() if (pay != null) { payState.clear() out.collect("訂單ID為 " + pay.orderId + " 的兩條流對賬成功!") } else { //如果不存在,則說明可能對應的pay數據沒有來,需要存入狀態等待 //定義一個5min的定時器,到時候再匹配,如果還沒匹配上,則說明匹配失敗發出警告 orderState.update(value) ctx.timerService().registerEventTimeTimer(value.eventTime + 5000) } } override def processElement2(value: _root_.project.TwoStreamJoinDemo.PayEvent, ctx: _root_.org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction[_root_.scala.Predef.String, _root_.project.TwoStreamJoinDemo.OrderEvent, _root_.project.TwoStreamJoinDemo.PayEvent, _root_.scala.Predef.String]#Context, out: _root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]): Unit = { val order = orderState.value() if (order != null) { orderState.clear() out.collect("訂單ID為 " + order.orderId + " 的兩條流對賬成功!") } else { payState.update(value) ctx.timerService().registerEventTimeTimer(value.eventTime + 5000) } } override def onTimer(timestamp: Long, ctx: KeyedCoProcessFunction[String, OrderEvent, PayEvent, String]#OnTimerContext, out: Collector[String]): Unit = { if (orderState.value() != null) { //將警告信息發送到側輸出流中 ctx.output(unmatchedOrders,s"訂單ID為 ${orderState.value().orderId } 的兩條流沒有對賬成功!") orderState.clear() } if (payState.value() != null){ ctx.output(unmatchedPays,s"訂單ID為 ${payState.value().orderId } 的兩條流沒有對賬成功!") payState.clear() } } } }