spark-yarn模式和shuffle原理


sparkjob的部署
-----------------
    1.client
        driver run on client
    2.cluster
        driver on a worker



4.啟動job時,指定資源使用。
    $>spark-submit 
        --driver-memory MEM            //設置driver內存,默認1g,配置2g
        --executor-memory MEM        //控制每個執行器內存,默認1g

        [只在standalone模式下]
        --driver-cores                //控制driver使用的內核數,默認1.

        [standalone & mesos]
        --total-executor-cores NUM    //控制執行器使用的總內核數

        [standalone & yarn]
        --executor-cores NUM        //控制每個執行的內核數。
        
        [yarn]
        --driver-cores NUM            //控制driver內核數,默認1
        --num-executors NUM            //啟動的執行器個數,動態分配內核啟用時,數字就是Num的值。

    
5.啟動spark-shell,手動分配資源
    //啟動3個executor,worker節點不能啟動2個executor
    spark-shell --master spark://s101:7077 --driver-memory 2g --executor-memory 6g --total-executor-cores 4 --executor-cores 1
    //啟動了4個executor,
    spark-shell --master spark://s101:7077 --driver-memory 2g --executor-memory 3g --total-executor-cores 4 --executor-cores 1
    //啟動了7個executor,
    spark-shell --master spark://s101:7077 --driver-memory 2g --executor-memory 3g --total-executor-cores 22 --executor-cores 3


spark + yarn模式
--------------------
    yarn模式,不需要spark集群,只是在client安裝spark,提交作業時,走的是hadoop的流程。
    使用spark的jar,在nodemanager上啟動的spark的executor進程。
    --master的值指定yarn即可,rm的地址從配置文件中提取的。

    --master yarn --deployMode client            //--master yarn-client
    --master yarn --deployMode cluster            //--master yarn-cluster
    [yarn-client]
        Appmaster只運行appmaster自身程序,負責資源請求。
        Driver仍然位於client執行。

    [yarn-cluster]
        appmaster不但負責資源請求,還負責運行driver。

    //實操
    1.停止spark集群
        stop-all.sh

    2.啟動zk和hdfs-yarn
        start-yarn.sh

    3.配置spark的spark-env.sh的HADOOP_CONF_DIR並分發.
        ...
        export HADOOP_CONF_DIR=/soft/hadoop/etc/hadoop
    
    4.啟動spark-shell
        spark-shell --master yarn --deploy-mode client --num-executors 4
        
    5.故障診斷
        出現 is running beyond virtual memory limits. 
        Current usage: 178.7 MB of 1 GB physical memory used; 2.3 GB of 2.1 GB virtual memory used. Killing container.

        關閉yarn-site.xml虛擬內存檢查並分發文件。
        [yarn-site.xml]
        <property>
            <name>yarn.nodemanager.vmem-check-enabled</name>
            <value>false</value>
        </property>

    6.spark yarn運行時將spark的所有jar上傳到hdfs,協同hadoop的作業運行流程。
        配置spark.yarn.jars或者spark.yarn.archive,避免每次上傳jar包。
        1.spark.yarn.jars
            spark.yarn.jars=hdfs:///some/path
        2.spark.yarn.archive
            spark.yarn.archive=hdfs://mycluster/user/centos/spark/spark-jars.zip

        3.配置spark.yarn.archive屬性,避免每次上傳大的jar包。
            a)上傳zip文件到hdfs://mycluster/user/centos/spark/spark-jars.zip
            b)配置spark配置文件。
                [spark/conf/spark-default.conf]
                spark.yarn.archive hdfs://mycluster/user/centos/spark/spark-jars.zip
            c)啟動shell
                $>spark-shell --master yarn-client

ShuffleMapTask
------------------
    private[spark] class ShuffleMapTask(
        stageId: Int,
        stageAttemptId: Int,
        taskBinary: Broadcast[Array[Byte]],        //(rdd,dep)
        partition: Partition,
        @transient private var locs: Seq[TaskLocation],
        metrics: TaskMetrics,
        localProperties: Properties,
        jobId: Option[Int] = None,
        appId: Option[String] = None,
        appAttemptId: Option[String] = None)
    }

shuffle管理
-------------------
    [ShuffleManager]
        ShuffleManager,是shuffle系統可插拔接口。
        ShuffleManager在driver和每個executor通過SparkEnv進行創建。
        基於spark.shuffle.manager屬性配置創建相應shuffleManager實現。
        在spark 2.1.0中只有SortShuffleManager.
        在spark 1.6.0中有SortShuffleManager和HashShuffleManager.

    [HashShuffleManager]
        spark.shuffle.consolidateFiles=true,默認false,合並輸出。
        slot = 並發能力 = 並發執行的線程數 = (執行器個數 * 每個執行器的cpu內核數) / 每個任務占用的內核數。

    spark 2.1.0的實現類是SortShuffleManager(不論sort還是tungsten-sort(鎢絲排序))
    [SortShuffleManager]
        基於排序的shuffle,輸入kv按照目標分區的id進行排序,然后寫入一個map輸出文件。
        reducer讀取連續文件區域來提取數據。map內存不足,溢出到磁盤,磁盤上的文件最終輸出到一個文件中。

        該方式的shuffle有兩種途徑生成map輸出文件:
        1.串行化排序(以下三個條件均滿足使用)
            a)shuffle依賴沒有指定聚合或者輸出排序
            b)shuffle序列化器支持序列化值得重新定位。(當前只有KryoSerializer和SQL的Serializer可以,java不可以)
            c)shuffle生成的分區少於16777216個.

        2.反串行排序
            所有其他情況。
    
    [串行化排序模式]
        該模式下,傳遞給ShuffleWriter的record即可被串行化,排序時也是串行化進行緩沖。該方式有幾點優化
        處理:
        1.對串行化的二進制數據進行排序,而不是針對java對象,因此可以減少內存消耗和過度GC。
          該優化機制要求串行化器具有特殊的屬性能夠對串行的record進行重排序,不需要反串過程。

        2.使用串行化的具有高效緩存特征的sorter,可以對壓縮的record指針和分區id的數組進行排序。
          數組中,每條record使用8字節空間存儲。

        3.溢出合並過程對串行化的數據塊(屬於同一分區)進行操作,並且合並期間不需要反串(流)。

        4.支持壓縮文件塊的合成,合並過程簡單的將壓縮和串行化的分區最終合並成一個分區文件,
          支持高效數據復制方式,例如NIO中的零拷貝。
        
        
ShuffleManager.registerShuffle()
-----------------------------------
    //1.通過ShuffleDep判斷是否需要bypass
    if (SortShuffleWriter.shouldBypassMergeSort(SparkEnv.get.conf, dependency)) {
      new BypassMergeSortShuffleHandle[K, V](shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } 
    //判斷依賴是否可以串行shuffle
    else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
      new SerializedShuffleHandle[K, V]( shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
    } 
    //基本shuffle
    else {
      new BaseShuffleHandle(shuffleId, numMaps, dependency)
    }


    
是否迂回的條件
-------------------------
    def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
    //如果map端需要聚合,不能回調。
    if (dep.mapSideCombine) {
      require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
      false
    }
    //判斷依賴的分區數量是否小於指定的配置(默認時200)
    else {
      val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
      dep.partitioner.numPartitions <= bypassMergeThreshold
    }
    }

    //結論
    if(map需要聚合){
        //不能迂回
    }
    else{
        if(分區數 <= 200(可配:spark.shuffle.sort.bypassMergeThreshold)){
            //可以迂回
        }
        else{
            //不能迂回
        }
    }


串行shuffle的判斷條件
------------------------
    def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
        val shufId = dependency.shuffleId
        val numPartitions = dependency.partitioner.numPartitions
        //判斷是否dep中使用的串行化器是否時kryo(kryo支持)。
        if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
          false
        }
        //判斷dep是否定義聚合器
        else if (dependency.aggregator.isDefined) {
          false
        } 
        //分區數大於特定值
        else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
          false
        } 
        ///
        else {
          true
        }
    }
    
    //結論
    if(不是kryo){
        //不能用串行shuffle
    }
    //
    else if(dep定義了聚合器){
        //不能用串行shuffle
    }
    else if(分區數 > (1 << 24) ){
        //不能用串行shuffle
    }
    else{
        //使用串行shuffle
    }


整個shuffle處理手段的優先級
---------------------------
    //1.迂回策略
    if(能否迂回){
        //new BypassMergeSortShuffleHandle()
    }
    //2.串行策略
    else if(是否串行){
        //new SerializedShuffleHandle();
    }
    //3.常規策略
    else{
        //new BaseShuffleHandle
    }


SortShuffleManager.getWrtier()
--------------------------------
    handle match{
        case SerializedShuffleHandle        => new UnsafeShuffleWriter();
        case BypassMergeSortShuffleHandle    => new BypassMergeSortShuffleWriter();
        case BaseShuffleHandle                => new SortShuffleWriter();
    
    }


ShuffleWriter的特性
--------------------
    abstract class ShuffleWriter
         |
        / \
        ---
         |
         |------BypassMergeSortShuffleWriter
         |------UnsafeShuffleWriter
         |------SortShuffleWriter

    [BypassMergeSortShuffleWriter]
        該類實現了hash方式的shuffle處理手段,將record寫入單獨文件,每個分區一個文件。
        然后對每個分區文件合並再產生一個文件,文件的不同區域用於不同reduce,該模式下,
        record不在內存中緩存,這是和HashShuffleWriter本質不同點。

        該方式對於有大量分區的shuffle處理效率不高,原因是需要對所有分區同時打開串行化器
        和文件流。

    [UnsafeShuffleWriter]
        將kv分開單獨以kryo串行寫入緩沖區,然后將緩沖放入ShuffleExternalSorter中。
        1.ShuffleExternalSorter
            專門用於基於sort的shuffle。record追加到date page,如果所有record插入
            后或者內存到達limit值,這些記錄按照分區id進行排序,排序后的記錄寫入單獨
            的輸出文件(或多個文件),輸出文件的格式和SortShuffleWriter輸出文件格式相同,
            每條分區的記錄都是單獨串行和壓縮寫入的,同樣使用反串和解壓縮方式讀取。
            和ExternalSorter不同,該對象不對溢出文件進行合並,而是將合並過程交給
            UnsafeShuffleWriter,避免多余串行和反串過程。

            KV以串行和壓縮方式寫緩沖區,再將緩沖區字節數組寫入頁面內存(long[]),標記好
            長度、偏移量、分區數等等,每個KV在頁面內存的地址和分區進行編碼后寫入內存
            排序器(InMemorySorter,該排序器使用分區id降序排列).如果內存頁默認超過1G(
            可以通過spark.shuffle.spill.numElementsForceSpillThreshold進行修改)個kv,
            發生溢出,進行排序輸出到文件。


    [SortShuffleWriter]
        
        



Spark中的串行化
-------------------
    spark默認使用java串行化器,但性能一般,優化手段之一
    使用kryo串行化,但是kryo串行化器對於要串行化的類使用前
    需要注冊,spark的kryo串行化器只是對java內置類、scala的內置
    類核spark的內置類進行了注冊,自定義的類必須手動注冊。
    也是沒有把kryo串行化器做為默認設置的原因.
    
    
    keyo串行化為什么快
——————————————————————————————————————————
為什么kryo比其它的序列化方案要快?

為每一個類分配一個id

實現了自己的IntMap

代碼中一些取巧的地方:

利用變量memoizedRegistration和memoizedType記錄上一次的調用writeObject函數的Class,則如果兩次寫入同一類型時,可以直接拿到,不再查找HashMap。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM