Flink 實現 雙流Join


需求

  將五分鍾之內的訂單信息和支付信息進行對賬,對不上的發出警告

代碼實現

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector


/**
 * $CONTENT
 *
 * @author yangxu
 * @date 2020/6/20 14:36
 * @version 1.0
 */
object TwoStreamJoinDemo {

    // 訂單支付事件
    case class OrderEvent(orderId: String,
                                                eventType: String,
                                                eventTime: Long)

    // 第三方支付事件,例如微信,支付寶
    case class PayEvent(orderId: String,
                                            eventType: String,
                                            eventTime: Long)

    // 用來輸出沒有匹配到的訂單支付事件
    val unmatchedOrders = new OutputTag[String]("unmatched-orders")
    // 用來輸出沒有匹配到的第三方支付事件
    val unmatchedPays = new OutputTag[String]("unmatched-pays")

    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

        val orders: KeyedStream[OrderEvent, String] = env
            .fromElements(
                OrderEvent("order_1", "pay", 2000L),
                OrderEvent("order_2", "pay", 5000L),
                OrderEvent("order_3", "pay", 6000L)
            )
            .assignAscendingTimestamps(_.eventTime)
            .keyBy(_.orderId)

        val pays: KeyedStream[PayEvent, String] = env
            .fromElements(
                PayEvent("order_1", "weixin", 7000L),
                PayEvent("order_2", "weixin", 8000L),
                PayEvent("order_4", "weixin", 9000L)
            )
            .assignAscendingTimestamps(_.eventTime)
            .keyBy(_.orderId)

        val processed = orders.connect(pays).process(new MatchFunction)

        processed.print()

        processed.getSideOutput(unmatchedOrders).print()

        processed.getSideOutput(unmatchedPays).print()

        env.execute()
    }

    //進入同一條流中的數據肯定是同一個key,即OrderId
    class MatchFunction extends KeyedCoProcessFunction[String, OrderEvent, PayEvent, String] {
        lazy private val orderState: ValueState[OrderEvent] = getRuntimeContext.getState(new ValueStateDescriptor[OrderEvent]("orderState", Types.of[OrderEvent]))
        lazy private val payState: ValueState[PayEvent] = getRuntimeContext.getState(new ValueStateDescriptor[PayEvent]("payState", Types.of[PayEvent]))

        override def processElement1(value: OrderEvent, ctx: KeyedCoProcessFunction[String, OrderEvent, PayEvent, String]#Context, out: Collector[String]): Unit = {
            //從payState中查找數據,如果存在,說明匹配成功
            val pay = payState.value()
            if (pay != null) {
                payState.clear()
                out.collect("訂單ID為 " + pay.orderId + " 的兩條流對賬成功!")
            } else {
                //如果不存在,則說明可能對應的pay數據沒有來,需要存入狀態等待
                //定義一個5min的定時器,到時候再匹配,如果還沒匹配上,則說明匹配失敗發出警告
                orderState.update(value)
                ctx.timerService().registerEventTimeTimer(value.eventTime + 5000)
            }
        }

        override def processElement2(value: _root_.project.TwoStreamJoinDemo.PayEvent, ctx: _root_.org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction[_root_.scala.Predef.String, _root_.project.TwoStreamJoinDemo.OrderEvent, _root_.project.TwoStreamJoinDemo.PayEvent, _root_.scala.Predef.String]#Context, out: _root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]): Unit = {
            val order = orderState.value()
            if (order != null) {
                orderState.clear()
                out.collect("訂單ID為 " + order.orderId + " 的兩條流對賬成功!")
            } else {
                payState.update(value)
                ctx.timerService().registerEventTimeTimer(value.eventTime + 5000)
            }
        }

        override def onTimer(timestamp: Long, ctx: KeyedCoProcessFunction[String, OrderEvent, PayEvent, String]#OnTimerContext, out: Collector[String]): Unit = {
            if (orderState.value() != null) {
                //將警告信息發送到側輸出流中
                ctx.output(unmatchedOrders,s"訂單ID為 ${orderState.value().orderId } 的兩條流沒有對賬成功!")
                orderState.clear()
            }
            if (payState.value() != null){
                ctx.output(unmatchedPays,s"訂單ID為 ${payState.value().orderId } 的兩條流沒有對賬成功!")
                payState.clear()
            }

        }
    }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM