Spark 雙流join代碼示例


基本思想

  與flink流的join原理不同的是,Spark雙流join是對倆個流做滿外連接 ,因為網絡延遲等關系,不能保證每個窗口中的數據key都能匹配上,這樣勢必會出現三種情況:(some,some),(None,some),(Some,None),根據這三種情況,下面做一下詳細解析:

(some,some)—— 1號流和2號流中key能正常進行邏輯運算,但是考慮到2號流后續可能會有剩下的數據到來,所以需要將1號流中的key保存到redis,以等待接下來的數據

(None,Some)—— 找不到1號流中對應key的數據,需要去redis中查找1號流的緩存,如果找不到,則緩存起來,等待1號流

 (Some,None)—— 找不到2號流中的數據,需要將key保存到redis,以等待接下來的數據,並且去reids中找2號流的緩存,如果有,則join,然后刪除2號流的緩存

 

代碼示例

def fullJoin(orderInfoStream: DStream[OrderInfo], orderDetailStream: DStream[OrderDetail]) = {
        val orderIdAndOrderInfo: DStream[(String, OrderInfo)] =
            orderInfoStream.map(info => (info.id, info))
        val orderIdAndOrderDetail: DStream[(String, OrderDetail)] =
            orderDetailStream.map(info => (info.order_id, info))
        
        orderIdAndOrderInfo
            .fullOuterJoin(orderIdAndOrderDetail)
            .mapPartitions((it: Iterator[(String, (Option[OrderInfo], Option[OrderDetail]))]) => {
                // 獲取redis客戶端
                val client: Jedis = RedisUtil.getClient
                // 讀寫操作
                val result: Iterator[SaleDetail] = it.flatMap {
                    // order_info有數據, order_detail有數據
                    case (orderId, (Some(orderInfo), Some(orderDetail))) =>
                        println("Some(orderInfo)   Some(orderDetail)")
                        // 1. 把order_info信息寫入到緩存(因為order_detail信息有部分信息可能遲到)
                        cacheOrderInfo(orderInfo, client)
                        // 2. 把信息join到一起(其實就是放入一個樣例類中)  (缺少用戶信息, 后面再專門補充)
                        val saleDetail = SaleDetail().mergeOrderInfo(orderInfo).mergeOrderDetail(orderDetail)
                        // 3. 去order_detail的緩存找數據, 進行join
                        // 3.1 先獲取這個order_id對應的所有的order_detail的key
                        import scala.collection.JavaConversions._
                        val keys: List[String] = client.keys("order_detail:" + orderInfo.id + ":*").toList // 轉成scala集合
                        val saleDetails: List[SaleDetail] = keys.map(key => {
                            val orderDetail: OrderDetail = JSON.parseObject(client.get(key), classOf[OrderDetail])
                            // 刪除對應的key, 如果不刪, 有可能造成數據重復
                            client.del(key)
                            SaleDetail().mergeOrderInfo(orderInfo).mergeOrderDetail(orderDetail)
                        })
                        saleDetail :: saleDetails
                    case (orderId, (Some(orderInfo), None)) =>
                        println("Some(orderInfo), None")
                        // 1. 把order_info信息寫入到緩存(因為order_detail信息有部分信息可能遲到)
                        cacheOrderInfo(orderInfo, client)
                        // 3. 去order_detail的緩存找數據, 進行join
                        // 3.1 先獲取這個order_id對應的所有的order_detail的key
                        import scala.collection.JavaConversions._
                        val keys: List[String] = client.keys("order_detail:" + orderInfo.id + ":*").toList // 轉成scala集合
                        val saleDetails: List[SaleDetail] = keys.map(key => {
                            val orderDetail: OrderDetail = JSON.parseObject(client.get(key), classOf[OrderDetail])
                            // 刪除對應的key, 如果不刪, 有可能造成數據重復
                            client.del(key)
                            SaleDetail().mergeOrderInfo(orderInfo).mergeOrderDetail(orderDetail)
                        })
                        saleDetails
                    case (orderId, (None, Some(orderDetail))) =>
                        println("None, Some(orderDetail)")
                        // 1. 去order_info的緩存中查找
                        val orderInfoJson = client.get("order_info:" + orderDetail.order_id)
                        if (orderInfoJson == null) {
                            // 3. 如果不存在, 則order_detail緩存
                            cacheOrderDetail(orderDetail, client)
                            Nil
                        } else {
                            // 2. 如果存在, 則join
                            val orderInfo = JSON.parseObject(orderInfoJson, classOf[OrderInfo])
                            SaleDetail().mergeOrderInfo(orderInfo).mergeOrderDetail(orderDetail) :: Nil
                        }
                }
                
                // 關閉redis客戶端
                client.close()
                
                result
            })
        
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM