最完整的數據傾斜解決方案(spark)


一.了解數據傾斜

  數據傾斜的原理:

      

      在執行shuffle操作的時候,按照key,來進行values的數據的輸出,拉取和聚合.同一個key的values,一定是分配到一個Reduce task進行處理.

      假如多個key對應的values,總共是90萬,但是可能某個key對應了88萬條數據,key-88萬條values,分配到一個task上面去執行.

      另外兩個task,可能各分配到了1萬條數據,可能是數百個key,對應一萬條數據.

 

  數據傾斜的現象:

    發生數據傾斜的兩種表現: 

      1.你的大部分的task,都執行的特別特別快,在很短的時間就執行完了(用client模式,standalone client,yarn client,本地機器主要一執行spark-submit腳本,就會開始打印log),比如task88 finished,剩下幾個task,執行的特別慢,前面的task,一般1s可以執行完5個,最后發現1000個task,998,999task,要執行1個小時,甚至兩個小時才能執行完一個task.

      2.運行的時候,同樣是其他的task都很快執行完了,也沒什么特別的問題,但是有的task,就是會突然間,啪,報了一個OOM,JVM Out Of Memory,內存溢出了,task failed, task lost, resubmitting task.反復提交執行了幾次都到了某個task就是跑不通,最后就會掛掉.某個task就直接OOM,那么基本上也是因為數據傾斜了,task分配的數量實在是太大了!!!所以內存放不下,然后你的task每處理一條數據,還要創建大量的對象。內存爆掉了。

  數據傾斜的產生原因與定位:

      根據log去定位:

      出現數據傾斜的原因,基本只可能是因為發生了shuffle操作,在shuffle的過程中,出現了數據傾斜的問題,因為某個,或者某些key對應的數據,遠遠高於其他的key.

      1.在所寫的程序找找,哪些地方用到了回產生shuffle的算子,groupByKey、countByKey、reduceByKey、join,groupBy,repartition,cogroup,distinct,leftouterJoin

      2.看log,log一般會報是在你的哪一行代碼,導致了OOM異常,或者呢,看log,看看是執行到了第幾個stage(stage划分).

二.解決數據傾斜的方案

  方案一:聚合源數據

       情況一:     

          (避免聚合)spark算子聚合作業,其實就是groupByKey、reduceByKey,其實就是拿到每個key對應的values;reduceByKey,其實就是對每個key對應的values執行一定的計算。

          這些有可能導致數據傾斜的操作,比如groupByKey和reduceByKey,包括之前說的join。都是在spark作業中執行的。

          spark作業的數據來源通常(90%)的情況下,數據來源都是hive表(hdfs,大數據分布式存儲系統)。hdfs上存儲的大數據。

          hive表,hive表中的數據,通常是怎么出來的呢?有了spark以后,hive比較適合做什么事情?hive就是適合做離線的,晚上凌晨跑的,ETL(extract transform load,數據的采集、清洗、導入),hive sql,去做這些事情,從而去形成一個完整的hive中的數據倉庫;說白了,數據倉庫,就是一堆表。

          spark作業的源表(hive表),其實通常情況下來說,也是通過某些hive etl生成的。hive etl可能是晚上凌晨在那兒跑。今天跑昨天的數據。

           數據傾斜,某個key對應的80萬數據,某些key對應幾百條,某些key對應幾十條;現在我們直接在生成hive表的hive etl中,對數據進行聚合。比如按key來分組,將key對應的所有的values,全部用一種特殊的格式,拼接到一個字符串里面去,比如“key=sessionid, value: action_seq=1|user_id=1|search_keyword=火鍋|category_id=001;action_seq=2|user_id=1|search_keyword=涮肉|category_id=001”。

           在hive的Hql中我們可以對key進行group,然后在spark中,拿到key=sessionid,values<Iterable>;hive etl中,已經對key進行了聚合。那么也就意味着,每個key就只對應一條數據。在spark中,就不需要再去執行groupByKey+map這種操作了。直接對每個key對應的values字符串,map操作,進行你需要的操作即可。key,values串。

          spark中,可能對這個操作,就不需要執行shffule操作了,也就根本不可能導致數據傾斜。 或者是,對每個key在hive etl中進行聚合,對所有values聚合一下,不一定是拼接起來,可能是直接進行計算。reduceByKey,計算函數,應用在hive etl中,每個key的values

          情況二:     

          (增粗聚合粒度)我們可能沒有辦法對每個key,就聚合出來一條數據; 那么也可以做一個妥協;對每個key對應的數據,10萬條;有好幾個粒度,比如10萬條里面包含了幾個城市、幾天、幾個地區的數據,現在放粗粒度;直接就按照城市粒度,做一下聚合,幾個城市,幾天、幾個地區粒度的數據,都給聚合起來。

          比如說 city_id date area_id   select ... from ... group by city_id

           盡量去聚合,減少每個key對應的數量,也許聚合到比較粗的粒度之后,原先有10萬數據量的key,現在只有1萬數據量。減輕數據傾斜的現象和問題。

  方案二:過濾導致傾斜的key

       如果可以接受某些數據,在spark作業中直接就摒棄掉,不使用,比如說,總共有100萬個key,只有2個key,是數據量達到10萬的,其他所有的key,對應的數據量都是幾十.

      這個時候,我們自己可以進行取舍,如果業務和需求可以理解和接受的話,在我們從hive表查詢元數據的時候,直接在sql中用where條件,過濾掉某幾個key.

      這個幾個原先有大量數據,會導致數據傾斜的key,被過濾掉之后,在我們的Spark作業中,自然就不會發生數據傾斜了.

  方案三:提高shuffle操作的reduce並行度

      

      

      將reduce task的數量,變多,就可以讓每個reduce task分配到更少的數據量,這樣的話,也許就可以緩解,或者甚至是基本解決掉數據傾斜的問題。

      具體方法就是在shuffle算子后面指定task分區數,比如val rdd2 = rdd1.reduceByKey(_+_,10)      

      提升shuffle reduce端並行度,怎么來操作?

          很簡單,主要給我們所有的shuffle算子,比如groupByKey、countByKey、reduceByKey。在調用的時候,傳入進去一個參數。一個數字。那個數字,就代表了那個shuffle操作的reduce端的並行度。那么在進行shuffle操作的時候,就會對應着創建指定數量的reduce task。

          這樣的話,就可以讓每個reduce task分配到更少的數據。基本可以緩解數據傾斜的問題。

          比如說,原本某個task分配數據特別多,直接OOM,內存溢出了,程序沒法運行,直接掛掉。按照log,找到發生數據傾斜的shuffle操作,給它傳入一個並行度數字,這樣的話,原先那個task分配到的數據,肯定會變少。就至少可以避免OOM的情況,程序至少是可以跑的。

      提升shuffle reduce並行度的缺陷:      

          治標不治本的意思,因為,它沒有從根本上改變數據傾斜的本質和問題。不像第一個和第二個方案(直接避免了數據傾斜的發生)。原理沒有改變,只是說,盡可能地去緩解和減輕shuffle reduce task的數據壓力,以及數據傾斜的問題。 

           1、如果最理想的情況下,提升並行度以后,減輕了數據傾斜的問題,或者甚至可以讓數據傾斜的現象忽略不計,那么就最好。就不用做其他的數據傾斜解決方案了。

          2、不太理想的情況下,就是比如之前某個task運行特別慢,要5個小時,現在稍微快了一點,變成了4個小時;或者是原先運行到某個task,直接OOM,現在至少不會OOM了,但是那個task運行特別慢,要5個小時才能跑完。

  方案四:使用隨機key實現雙重group聚合方案

 

		/**
		 * 使用隨機key實現雙重聚合
		 */
		
		/**
		 * 第一步,給每個key打上一個隨機數
		 */
		JavaPairRDD<String, Long> mappedClickCategoryIdRDD = clickCategoryIdRDD.mapToPair(
				
				new PairFunction<Tuple2<Long,Long>, String, Long>() {

					private static final long serialVersionUID = 1L;
		
					@Override
					public Tuple2<String, Long> call(Tuple2<Long, Long> tuple)
							throws Exception {
						Random random = new Random();
						int prefix = random.nextInt(10);
						return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2);
					}
					
				});
		
		/**
		 * 第二步,執行第一輪局部聚合
		 */
		JavaPairRDD<String, Long> firstAggrRDD = mappedClickCategoryIdRDD.reduceByKey(
				
				new Function2<Long, Long, Long>() {

					private static final long serialVersionUID = 1L;

					@Override
					public Long call(Long v1, Long v2) throws Exception {
						return v1 + v2;
					}
					
				});
		
		/**
		 * 第三步,去除掉每個key的前綴
		 */
		JavaPairRDD<Long, Long> restoredRDD = firstAggrRDD.mapToPair(
				
				new PairFunction<Tuple2<String,Long>, Long, Long>() {

					private static final long serialVersionUID = 1L;
		
					@Override
					public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)
							throws Exception {
						long categoryId = Long.valueOf(tuple._1.split("_")[1]);  
						return new Tuple2<Long, Long>(categoryId, tuple._2);  
					}
					
				});
		
		/**
		 * 第四步,最第二輪全局的聚合
		 */
		JavaPairRDD<Long, Long> clickCategoryId2CountRDD = restoredRDD.reduceByKey(
				
				new Function2<Long, Long, Long>() {

					private static final long serialVersionUID = 1L;

					@Override
					public Long call(Long v1, Long v2) throws Exception {
						return v1 + v2;
					}
					
				});

 

       1.將一個熱點的key進行加鹽(就是加上隨機的前綴)

       2.然后對進行加鹽處理后的key進行reduceByKey,groupByKey等算子操作

      3.去掉key的前綴.

      4.重復步驟2操作,避免了一個key對應的value過多的算子操作(數據傾斜).

  方案五:將reduce join 轉換為map join

      原理圖:

 

        /**
         * reduce join轉換為map join
         */
        
        List<Tuple2<Long, Row>> userInfos = userid2InfoRDD.collect();
        final Broadcast<List<Tuple2<Long, Row>>> userInfosBroadcast = sc.broadcast(userInfos);
        
        JavaPairRDD<String, String> sessionid2FullAggrInfoRDD = userid2PartAggrInfoRDD.mapToPair(
                
                new PairFunction<Tuple2<Long,String>, String, String>() {

                    private static final long serialVersionUID = 1L;

                    @Override
                    public Tuple2<String, String> call(Tuple2<Long, String> tuple)
                            throws Exception {
                        // 得到用戶信息map
                        List<Tuple2<Long, Row>> userInfos = userInfosBroadcast.value();
                        
                        Map<Long, Row> userInfoMap = new HashMap<Long, Row>();
                        for(Tuple2<Long, Row> userInfo : userInfos) {
                            userInfoMap.put(userInfo._1, userInfo._2);
                        }
                        
                        // 獲取到當前用戶對應的信息
                        String partAggrInfo = tuple._2;
                        Row userInfoRow = userInfoMap.get(tuple._1);
                        
                        String sessionid = StringUtils.getFieldFromConcatString(
                                partAggrInfo, "\\|", Constants.FIELD_SESSION_ID);
                        
                        int age = userInfoRow.getInt(3);
                        String professional = userInfoRow.getString(4);
                        String city = userInfoRow.getString(5);
                        String sex = userInfoRow.getString(6);
                        
                        String fullAggrInfo = partAggrInfo + "|"
                                + Constants.FIELD_AGE + "=" + age + "|"
                                + Constants.FIELD_PROFESSIONAL + "=" + professional + "|"
                                + Constants.FIELD_CITY + "=" + city + "|"
                                + Constants.FIELD_SEX + "=" + sex;
                        
                        return new Tuple2<String, String>(sessionid, fullAggrInfo);
                    }
                    
                });

       reduce join轉換map join,適合在什么樣的情況下,可以來使用?

          如果兩個Rdd要進行join,其中一個RDD是比較小的,一個RDD是100萬條數據,一個RDD是1萬數據.

          其中一個RDD必須是比較小的,broadcast出去的那個小RDD的數據以后,就會在每個executor的block manager中都駐留一份.要確保你的內存足夠放那個小RDD的數據.

          這種方式下,根本不會發生shuffle操作,肯定也不會發生數據傾斜;從根本上杜絕了join操作可能導致的數據傾斜的問題;

          對於join中有數據傾斜的情況,大家盡量第一時間先考慮這種方式,效果非常好;如果某個RDD比較小的情況下。

      不適合的情況:    

          兩個RDD都比較大,那么這個時候,你去將其中一個RDD做成broadcast,就很笨拙了。很可能導致內存不足。最終導致內存溢出,程序掛掉。 而且其中某些key(或者是某個key),還發生了數據傾斜;此時可以采用最后兩種方式。

      總結:   

          對於join這種操作,不光是考慮數據傾斜的問題;即使是沒有數據傾斜問題,也完全可以優先考慮,用我們講的這種高級的reduce join轉map join的技術,不要用普通的join,去通過shuffle,進行數據的join;完全可以通過簡單的map,使用map join的方式,犧牲一點內存資源;在可行的情況下,優先這么使用。 不走shuffle,直接走map,是不是性能也會高很多?這是肯定的。

  方案六:sample采樣傾斜key進行兩次join

      

      當第五種方案不能解決,就是兩個RDD都比較大的情況下的join操作,直接針對產生數據傾斜的Key的處理方案.

      方案思路:

        在要進行join操作的時候,隨機采樣出10%數據,使用countByKey計算出key的數量然后sortByKey(false)倒序取出第一個key,然后對取出來的key和普通的key分別進行join操作之后再進行join操作.

        這個方案的實現思路,跟大家解析一下:其實關鍵之處在於,將發生數據傾斜的key,單獨拉出來,放到一個RDD中去;就用這個原本會傾斜的key RDD跟其他RDD,單獨去join一下,這個時候,key對應的數據,可能就會分散到多個task中去進行join操作。 就不至於說是,這個key跟之前其他的key混合在一個RDD中時,肯定是會導致一個key對應的所有數據,都到一個task中去,就會導致數據傾斜。

      這種方案什么時候適合使用?

        優先對於join,肯定是希望能夠采用上一個方案的,reduce join轉換map join。兩個RDD數據都比較大,那么就不要那么搞了。

        針對你的RDD的數據,你可以自己把它轉換成一個中間表,或者是直接用countByKey()的方式,你可以看一下這個RDD各個key對應的數據量;

        此時如果你發現整個RDD就一個,或者少數幾個key,是對應的數據量特別多;盡量建議,比如就是一個key對應的數據量特別多。 此時可以采用咱們的這種方案,單拉出來那個最多的key;單獨進行join,盡可能地將key分散到各個task上去進行join操作。

      什么時候不適用呢?

         如果一個RDD中,導致數據傾斜的key,特別多;那么此時,最好還是不要這樣了;還是使用我們最后一個方案,終極的join數據傾斜的解決方案。

       升級版做法思路:

        就是說,咱們單拉出來了,一個或者少數幾個可能會產生數據傾斜的key,然后還可以進行更加優化的一個操作; 對於那個key,從另外一個要join的表中,也過濾出來一份數據,比如可能就只有一條數據。userid2infoRDD,一個userid key,就對應一條數據。 然后呢,采取對那個只有一條數據的RDD,進行flatMap操作,打上100個隨機數,作為前綴,返回100條數據。 單獨拉出來的可能產生數據傾斜的RDD,給每一條數據,都打上一個100以內的隨機數,作為前綴。 再去進行join,是不是性能就更好了。肯定可以將數據進行打散,去進行join。join完以后,可以執行map操作,去將之前打上的隨機數,給去掉,然后再和另外一個普通RDD join以后的結果,進行union操作。

      代碼實現:

        https://www.cnblogs.com/gentle-awen/p/10144882.html

  方案七:使用隨機數以及擴容表進行join

      

      方案思路:

        針對上面方案都沒辦法解決數據傾斜,只能使用這種.當采用隨機數和擴容表進行join解決數據傾斜的時候,就代表着,你的之前的數據傾斜的解決方案,都沒法使用。 這個方案是沒辦法徹底解決數據傾斜的,更多的,是一種對數據傾斜的緩解。

      方案步驟:

        1、選擇一個RDD,要用flatMap,進行擴容,將每條數據,映射為多條數據,每個映射出來的數據,都帶了一個n以內的隨機數,通常來說,會選擇10。

        2、將另外一個RDD,做普通的map映射操作,每條數據,都打上一個10以內的隨機數。

        3、最后,將兩個處理后的RDD,進行join操作。

      方案局限性:

        1、因為你的兩個RDD都很大,所以你沒有辦法去將某一個RDD擴的特別大,一般咱們就是10倍。

        2、如果就是10倍的話,那么數據傾斜問題,的確是只能說是緩解和減輕,不能說徹底解決。

      方案代碼:

        https://www.cnblogs.com/gentle-awen/p/10144893.html

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM