Spark 中在處理大批量數據排序問題時,如何避免OOM


錯誤思想

舉個列子,當我們想要比較  一個  類型為  RDD[(Long, (String, Int))]   的RDD,讓它先按Long分組,然后按int的值進行倒序排序,最容易想到的思維就是先分組,然后把Iterable  轉換為 list,然后sortby,但是這樣卻有一個致命的缺點,就是Iterable 在內存中是一個指針,不占內存,而list是一個容器,占用內存,如果Iterable 含有元素過多,那么極易引起OOM

 val cidAndSidCountGrouped: RDD[(Long, Iterable[(String, Int)])] = cidAndSidCount.groupByKey()
        // 4. 排序, 取top10
        val result: RDD[(Long, List[(String, Int)])] = cidAndSidCountGrouped.map {
            case (cid, sidCountIt) =>
                // sidCountIt 排序, 取前10
                // Iterable轉成容器式集合的時候, 如果數據量過大, 極有可能導致oom
                (cid, sidCountIt.toList.sortBy(-_._2).take(5))
        }

方法一:利用RDD排序特點

首先,我們要知道,RDD 的排序需要 shuffle, 是采用了內存+磁盤來完成的排序.這樣能有效避免OOM的風險,但是RDD是全部排序,所以需要針對性的過濾Key值來進行排序

 //把long(即key值)提取出來
        val cids: List[Long] = categoryCountList.map(_.cid.toLong)
        val buffer: ListBuffer[(Long, List[(String, Int)])] = ListBuffer[(Long, List[(String, Int)])]()
        //根據每個key來過濾RDD
        for (cid <- cids) {
            /*
            List((15,(632972a4-f811-4000-b920-dc12ea803a41,10)), (15,(f34878b8-1784-4d81-a4d1-0c93ce53e942,8)), (15,(5e3545a0-1521-4ad6-91fe-e792c20c46da,8)), (15,(66a421b0-839d-49ae-a386-5fa3ed75226f,8)), (15,(9fa653ec-5a22-4938-83c5-21521d083cd0,8)))
            目標:
            (9,List((199f8e1d-db1a-4174-b0c2-ef095aaef3ee,9), (329b966c-d61b-46ad-949a-7e37142d384a,8), (5e3545a0-1521-4ad6-91fe-e792c20c46da,8), (e306c00b-a6c5-44c2-9c77-15e919340324,7), (bed60a57-3f81-4616-9e8b-067445695a77,7)))
             */
            val arr: Array[(String, Int)] = cidAndSidCount.filter(cid == _._1)
                .sortBy(-_._2._2)
                .take(5)
                .map(_._2)
            buffer += ((cid, arr.toList))
        }
        buffer.foreach(println)

這樣做也有缺點:即有多少個key,就有多少個Job,占用資源

方法二:利用TreeSet自動排序的特性

 def statCategoryTop10Session_3(sc: SparkContext,
                                   categoryCountList: List[CategroyCount],
                                   userVisitActionRDD: RDD[UserVisitAction]) = {
        // 1. 過濾出來 top10品類的所有點擊記錄
        // 1.1 先map出來top10的品類id
        val cids = categoryCountList.map(_.cid.toLong)
        val topCategoryActionRDD: RDD[UserVisitAction] = userVisitActionRDD.filter(action => cids.contains(action.click_category_id))


        // 2. 計算每個品類 下的每個session 的點擊量  rdd ((cid, sid) ,1)
        val cidAndSidCount: RDD[(Long, (String, Int))] = topCategoryActionRDD
            .map(action => ((action.click_category_id, action.session_id), 1))
            // 使用自定義分區器  重點理解分區器的原理
            .reduceByKey(new CategoryPartitioner(cids), _ + _)
            .map {
                case ((cid, sid), count) => (cid, (sid, count))
            }
        
        // 3. 排序取top10
//因為已經按key分好了區,所以用Mappartitions ,在每個分區中新建一個TreeSet即可
        val result: RDD[(Long, List[SessionInfo])] = cidAndSidCount.mapPartitions((it: Iterator[(Long, (String, Int))]) => {
//new 一個TreeSet,並同時指定排序規則
     var treeSet: mutable.TreeSet[CategorySession] = new mutable.TreeSet[CategorySession]()(new Ordering[CategorySession] {
                    override def compare(x: CategorySession, y: CategorySession): Int = {
                        if (x.clickCount >= y.clickCount) -1 else 1
                    }
                })
     var id = 0l
    iter.foreach({
        case (l, session) => {
            id = l
            treeSet.add(session)
        if (treeSet.size > 10) treeSet = treeSet.take(10)
                    }
                })
                Iterator(id, treeSet)
            })
    
        result.collect.foreach(println)
        
        Thread.sleep(1000000)
    }
}

/*
根據傳入的key值來決定分區號,讓相同key進入相同的分區,能夠避免多次shuffle
 */
class CategoryPartitioner(cids: List[Long]) extends Partitioner {
    // 用cid索引, 作為將來他的分區索引.
    private val cidWithIndex: Map[Long, Int] = cids.zipWithIndex.toMap
    
    // 返回集合的長度
    override def numPartitions: Int = cids.length
    
    // 根據key返回分區的索引
    override def getPartition(key: Any): Int = {
        key match {
            // 根據品類id返回分區的索引!    0-9
            case (cid: Long, _) =>
                cidWithIndex(cid)
        }
    }
}

 

巧妙利用分區器可以避免多次shuffle

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM