基本思想
與flink流的join原理不同的是,Spark雙流join是對倆個流做滿外連接 ,因為網絡延遲等關系,不能保證每個窗口中的數據key都能匹配上,這樣勢必會出現三種情況:(some,some),(None,some),(Some,None),根據這三種情況,下面做一下詳細解析:
(some,some)—— 1號流和2號流中key能正常進行邏輯運算,但是考慮到2號流后續可能會有剩下的數據到來,所以需要將1號流中的key保存到redis,以等待接下來的數據
(None,Some)—— 找不到1號流中對應key的數據,需要去redis中查找1號流的緩存,如果找不到,則緩存起來,等待1號流
(Some,None)—— 找不到2號流中的數據,需要將key保存到redis,以等待接下來的數據,並且去reids中找2號流的緩存,如果有,則join,然后刪除2號流的緩存
代碼示例
def fullJoin(orderInfoStream: DStream[OrderInfo], orderDetailStream: DStream[OrderDetail]) = { val orderIdAndOrderInfo: DStream[(String, OrderInfo)] = orderInfoStream.map(info => (info.id, info)) val orderIdAndOrderDetail: DStream[(String, OrderDetail)] = orderDetailStream.map(info => (info.order_id, info)) orderIdAndOrderInfo .fullOuterJoin(orderIdAndOrderDetail) .mapPartitions((it: Iterator[(String, (Option[OrderInfo], Option[OrderDetail]))]) => { // 獲取redis客戶端 val client: Jedis = RedisUtil.getClient // 讀寫操作 val result: Iterator[SaleDetail] = it.flatMap { // order_info有數據, order_detail有數據 case (orderId, (Some(orderInfo), Some(orderDetail))) => println("Some(orderInfo) Some(orderDetail)") // 1. 把order_info信息寫入到緩存(因為order_detail信息有部分信息可能遲到) cacheOrderInfo(orderInfo, client) // 2. 把信息join到一起(其實就是放入一個樣例類中) (缺少用戶信息, 后面再專門補充) val saleDetail = SaleDetail().mergeOrderInfo(orderInfo).mergeOrderDetail(orderDetail) // 3. 去order_detail的緩存找數據, 進行join // 3.1 先獲取這個order_id對應的所有的order_detail的key import scala.collection.JavaConversions._ val keys: List[String] = client.keys("order_detail:" + orderInfo.id + ":*").toList // 轉成scala集合 val saleDetails: List[SaleDetail] = keys.map(key => { val orderDetail: OrderDetail = JSON.parseObject(client.get(key), classOf[OrderDetail]) // 刪除對應的key, 如果不刪, 有可能造成數據重復 client.del(key) SaleDetail().mergeOrderInfo(orderInfo).mergeOrderDetail(orderDetail) }) saleDetail :: saleDetails case (orderId, (Some(orderInfo), None)) => println("Some(orderInfo), None") // 1. 把order_info信息寫入到緩存(因為order_detail信息有部分信息可能遲到) cacheOrderInfo(orderInfo, client) // 3. 去order_detail的緩存找數據, 進行join // 3.1 先獲取這個order_id對應的所有的order_detail的key import scala.collection.JavaConversions._ val keys: List[String] = client.keys("order_detail:" + orderInfo.id + ":*").toList // 轉成scala集合 val saleDetails: List[SaleDetail] = keys.map(key => { val orderDetail: OrderDetail = JSON.parseObject(client.get(key), classOf[OrderDetail]) // 刪除對應的key, 如果不刪, 有可能造成數據重復 client.del(key) SaleDetail().mergeOrderInfo(orderInfo).mergeOrderDetail(orderDetail) }) saleDetails case (orderId, (None, Some(orderDetail))) => println("None, Some(orderDetail)") // 1. 去order_info的緩存中查找 val orderInfoJson = client.get("order_info:" + orderDetail.order_id) if (orderInfoJson == null) { // 3. 如果不存在, 則order_detail緩存 cacheOrderDetail(orderDetail, client) Nil } else { // 2. 如果存在, 則join val orderInfo = JSON.parseObject(orderInfoJson, classOf[OrderInfo]) SaleDetail().mergeOrderInfo(orderInfo).mergeOrderDetail(orderDetail) :: Nil } } // 關閉redis客戶端 client.close() result }) }