spark面試問題收集


spark面試問題

1、spark中的RDD是什么,有哪些特性

  • RDD(Resilient Distributed Dataset)叫做彈性分布式數據集,是Spark中最基本的數據抽象,它代表一個不可變、可分區、里面的元素可並行計算的集合。

    • Dataset:就是一個集合,用於存放數據的
    • Distributed:分布式,可以並行在集群計算
    • Resilient:表示彈性的
      • RDD的彈性體現在哪里?
        • 1、自動的進行內存和磁盤數據存儲的切換;
        • 2、基於lineage的高效容錯
        • 3、task如果失敗會特定次數的重試 (默認4次)
        • 4、stage如果失敗會自動進行特定次數的重試,而且只會只計算失敗的分片
        • 5、checkpoint【每次對RDD操作都會產生新的RDD,如果鏈條比較長,計算比較笨重,就把數據放在硬盤中】和persist 【內存或磁盤中對數據進行復用】(檢查點、持久化)
        • 6、任務調度彈性:DAG TASK 和資源管理無關
          • spark程序就是一個計算任務的邏輯,哪里可以給當前這個任務提供計算資源,就可以把任務提交到哪里去運行
            • standAlone
            • yarn
              • yarn-cluster
              • yarn-client
            • mesos
        • 7、數據分片的高度彈性repartition
  • RDD五大特性:

    • A list of partitions
      一個分區列表,RDD中的數據都存在一個分區列表里面
    • A function for computing each split
      作用在每一個分區中的函數
    • A list of dependencies on other RDDs
      一個RDD依賴於其他多個RDD,這個點很重要,RDD的容錯機制就是依據這個特性而來的
    • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
      可選的,針對於kv類型的RDD才具有這個特性,作用是決定了數據的來源以及數據處理后的去向
    • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
      可選項,數據本地性,數據位置最優

2、概述一下spark中的常用算子區別(map、mapPartitions、foreach、foreachPartition)

  • map:用於遍歷RDD,將函數f應用於每一個元素,返回新的RDD(transformation算子)。
  • foreach:用於遍歷RDD,將函數f應用於每一個元素,無返回值(action算子)。
  • mapPartitions:用於遍歷操作RDD中的每一個分區,返回生成一個新的RDD(transformation算子)。
  • foreachPartition: 用於遍歷操作RDD中的每一個分區。無返回值(action算子)。
  • 總結:一般使用mapPartitions或者foreachPartition算子比map和foreach更加高效,推薦使用。

3、簡述reduceByKey和groupByKey區別

  • reduceByKey
    • 會使用用戶自定義的函數在每個節點先進行預聚合,減少數據的拉取量,減少數據的網絡傳輸
    • 類似於mapreduce中的combiner
  • groupByKey
    • 不會進行數據的聚合,全量拉取數據,性能較差

4、yarn-cluster和yarn-client的區別

  • yarn-cluster
    • driver運行在ApplicationMaster上,並且后期從yarn中申請資源,該進程會運行在yarn的container內,所有啟動AM的client可以立即關閉而不用持續到Application的生命周期
  • yarn-client
    • 程序在哪里提交任務,就在當前機器產生driver,driver需要跟后面的task進行通信,這個時候的client不可以干掉,干掉client就相當於把整個Application的driver干掉了。

5、談談spark中的寬窄依賴

  • RDD和它依賴的父RDD(s)的關系有兩種不同的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。
  • 寬依賴:指的是多個子RDD的Partition會依賴同一個父RDD的Partition
  • 窄依賴:指的是每一個父RDD的Partition最多被子RDD的一個Partition使用。

6、spark中如何划分stage

  • 1.Spark Application中可以因為不同的Action觸發眾多的job,一個Application中可以有很多的job,每個job是由一個或者多個Stage構成的,后面的Stage依賴於前面的Stage,也就是說只有前面依賴的Stage計算完畢后,后面的Stage才會運行。
  • 2.Stage划分的依據就是寬依賴,何時產生寬依賴,例如reduceByKey,groupByKey的算子,會導致寬依賴的產生。
  • 3.由Action(例如collect)導致了SparkContext.runJob的執行,最終導致了DAGScheduler中的submitJob的執行,其核心是通過發送一個case class JobSubmitted對象給eventProcessLoop。
    eventProcessLoop是DAGSchedulerEventProcessLoop的具體實例,而DAGSchedulerEventProcessLoop是eventLoop的子類,具體實現EventLoop的onReceive方法,onReceive方法轉過來回調doOnReceive
  • 4.在doOnReceive中通過模式匹配的方法把執行路由到
  • 5.在handleJobSubmitted中首先創建finalStage,創建finalStage時候會建立父Stage的依賴鏈條
  • 總結:以來是從代碼的邏輯層面上來展開說的,可以簡單點說:寫介紹什么是RDD中的寬窄依賴,然后在根據DAG有向無環圖進行划分,從當前job的最后一個算子往前推,遇到寬依賴,那么當前在這個批次中的所有算子操作都划分成一個stage,然后繼續按照這種方式在繼續往前推,如在遇到寬依賴,又划分成一個stage,一直到最前面的一個算子。最后整個job會被划分成多個stage,而stage之間又存在依賴關系,后面的stage依賴於前面的stage。

7、spark-submit的時候如何引入外部jar包

  • 在通過spark-submit提交任務時,可以通過添加配置參數來指定
    • --driver-class-path 外部jar包
      • 它會把jar下發到driver端
    • --jars 外部jar包
      • 它會把jar包下發到每一個executor進程中

8、spark 如何防止內存溢出

  • driver端的內存溢出
    • 可以增大driver的內存參數:spark.driver.memory (default 1g)
    • 這個參數用來設置Driver的內存。在Spark程序中,SparkContext,DAGScheduler都是運行在Driver端的。對應rdd的Stage切分也是在Driver端運行,如果用戶自己寫的程序有過多的步驟,切分出過多的Stage,這部分信息消耗的是Driver的內存,這個時候就需要調大Driver的內存。
  • map過程產生大量對象導致內存溢出
    • 這種溢出的原因是在單個map中產生了大量的對象導致的,例如:rdd.map(x=>for(i <- 1 to 10000) yield i.toString),這個操作在rdd中,每個對象都產生了10000個對象,這肯定很容易產生內存溢出的問題。針對這種問題,在不增加內存的情況下,可以通過減少每個Task的大小,以便達到每個Task即使產生大量的對象Executor的內存也能夠裝得下。具體做法可以在會產生大量對象的map操作之前調用repartition方法,分區成更小的塊傳入map。例如:rdd.repartition(10000).map(x=>for(i <- 1 to 10000) yield i.toString)。
      面對這種問題注意,不能使用rdd.coalesce方法,這個方法只能減少分區,不能增加分區,不會有shuffle的過程。
  • 數據不平衡導致內存溢出
    • 數據不平衡除了有可能導致內存溢出外,也有可能導致性能的問題,解決方法和上面說的類似,就是調用repartition重新分區。這里就不再累贅了。
  • shuffle后內存溢出
    • shuffle內存溢出的情況可以說都是shuffle后,單個文件過大導致的。在Spark中,join,reduceByKey這一類型的過程,都會有shuffle的過程,在shuffle的使用,需要傳入一個partitioner,大部分Spark中的shuffle操作,默認的partitioner都是HashPatitioner,默認值是父RDD中最大的分區數,這個參數通過spark.default.parallelism控制(在spark-sql中用spark.sql.shuffle.partitions) , spark.default.parallelism參數只對HashPartitioner有效,所以如果是別的Partitioner或者自己實現的Partitioner就不能使用spark.default.parallelism這個參數來控制shuffle的並發量了。如果是別的partitioner導致的shuffle內存溢出,就需要從partitioner的代碼增加partitions的數量。
  • standalone模式下資源分配不均勻導致內存溢出
    • 在standalone的模式下如果配置了--total-executor-cores 和 --executor-memory 這兩個參數,但是沒有配置--executor-cores這個參數的話,就有可能導致,每個Executor的memory是一樣的,但是cores的數量不同,那么在cores數量多的Executor中,由於能夠同時執行多個Task,就容易導致內存溢出的情況。這種情況的解決方法就是同時配置--executor-cores或者spark.executor.cores參數,確保Executor資源分配均勻。
  • 使用rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)代替rdd.cache()
    • rdd.cache()和rdd.persist(Storage.MEMORY_ONLY)是等價的,在內存不足的時候rdd.cache()的數據會丟失,再次使用的時候會重算,而rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)在內存不足的時候會存儲在磁盤,避免重算,只是消耗點IO時間。

9、spark中cache和persist的區別

  • cache:緩存數據,默認是緩存在內存中,其本質還是調用persist
  • persist:緩存數據,有豐富的數據緩存策略。數據可以保存在內存也可以保存在磁盤中,使用的時候指定對應的緩存級別就可以了。
    • 什么時候設置緩存
      • 1、某個rdd后期被使用了多次
        • val rdd2=rdd1.map()
        • val rdd2=rdd1.map()

10、簡要描述Spark分布式集群搭建的步驟

  • 地球人都知道
  • 這里可以概述下如何搭建高可用的spark集群(HA)
    • 主要是引入了zookeeper

11、spark中的數據傾斜的現象、原因、后果

  • (1)、數據傾斜的現象
    • 多數task執行速度較快,少數task執行時間非常長,或者等待很長時間后提示你內存不足,執行失敗。
  • (2)、數據傾斜的原因
    • 數據問題
      • 1、key本身分布不均衡(包括大量的key為空)
      • 2、key的設置不合理
    • spark使用問題
      • 1、shuffle時的並發度不夠
      • 2、計算方式有誤
  • (3)、數據傾斜的后果
    • 1、spark中的stage的執行時間受限於最后那個執行完成的task,因此運行緩慢的任務會拖垮整個程序的運行速度(分布式程序運行的速度是由最慢的那個task決定的)。
    • 2、過多的數據在同一個task中運行,將會把executor撐爆。

12、如何解決spark中的數據傾斜問題

  • 發現數據傾斜的時候,不要急於提高executor的資源,修改參數或是修改程序,首先要檢查數據本身,是否存在異常數據。
    • 1、數據問題造成的數據傾斜
      • 找出異常的key
        • 如果任務長時間卡在最后最后1個(幾個)任務,首先要對key進行抽樣分析,判斷是哪些key造成的。
          選取key,對數據進行抽樣,統計出現的次數,根據出現次數大小排序取出前幾個。
        • 比如: df.select("key").sample(false,0.1).(k=>(k,1)).reduceBykey(+).map(k=>(k.2,k.1)).sortByKey(false).take(10)
        • 如果發現多數數據分布都較為平均,而個別數據比其他數據大上若干個數量級,則說明發生了數據傾斜。
      • 經過分析,傾斜的數據主要有以下三種情況:
        • 1、null(空值)或是一些無意義的信息()之類的,大多是這個原因引起。
        • 2、無效數據,大量重復的測試數據或是對結果影響不大的有效數據。
        • 3、有效數據,業務導致的正常數據分布。
      • 解決辦法
        • 第1,2種情況,直接對數據進行過濾即可(因為該數據對當前業務不會產生影響)。
        • 第3種情況則需要進行一些特殊操作,常見的有以下幾種做法
          • (1) 隔離執行,將異常的key過濾出來單獨處理,最后與正常數據的處理結果進行union操作。
          • (2) 對key先添加隨機值,進行操作后,去掉隨機值,再進行一次操作。
          • (3) 使用reduceByKey 代替 groupByKey(reduceByKey用於對每個key對應的多個value進行merge操作,最重要的是它能夠在本地先進行merge操作,並且merge操作可以通過函數自定義.)
          • (4) 使用map join。
      • 案例
        • 如果使用reduceByKey因為數據傾斜造成運行失敗的問題。具體操作流程如下:
          • (1) 將原始的 key 轉化為 key + 隨機值(例如Random.nextInt)
          • (2) 對數據進行 reduceByKey(func)
          • (3) 將 key + 隨機值 轉成 key
          • (4) 再對數據進行 reduceByKey(func)
      • 案例操作流程分析:
        • 假設說有傾斜的Key,我們給所有的Key加上一個隨機數,然后進行reduceByKey操作;此時同一個Key會有不同的隨機數前綴,在進行reduceByKey操作的時候原來的一個非常大的傾斜的Key就分而治之變成若干個更小的Key,不過此時結果和原來不一樣,怎么破?進行map操作,目的是把隨機數前綴去掉,然后再次進行reduceByKey操作。(當然,如果你很無聊,可以再次做隨機數前綴),這樣我們就可以把原本傾斜的Key通過分而治之方案分散開來,最后又進行了全局聚合
        • 注意1: 如果此時依舊存在問題,建議篩選出傾斜的數據單獨處理。最后將這份數據與正常的數據進行union即可。
        • 注意2: 單獨處理異常數據時,可以配合使用Map Join解決。
    • 2、spark使用不當造成的數據傾斜
      • 提高shuffle並行度
        • dataFrame和sparkSql可以設置spark.sql.shuffle.partitions參數控制shuffle的並發度,默認為200。
        • rdd操作可以設置spark.default.parallelism控制並發度,默認參數由不同的Cluster Manager控制。
        • 局限性: 只是讓每個task執行更少的不同的key。無法解決個別key特別大的情況造成的傾斜,如果某些key的大小非常大,即使一個task單獨執行它,也會受到數據傾斜的困擾。
        • 使用map join 代替reduce join
        • 在小表不是特別大(取決於你的executor大小)的情況下使用,可以使程序避免shuffle的過程,自然也就沒有數據傾斜的困擾了.(詳細見http://blog.csdn.net/lsshlsw/article/details/50834858、http://blog.csdn.net/lsshlsw/article/details/48694893)
        • 局限性: 因為是先將小數據發送到每個executor上,所以數據量不能太大。

13、flume整合sparkStreaming問題

  • (1)、如何實現sparkStreaming讀取flume中的數據
    • 可以這樣說:
      • 前期經過技術調研,查看官網相關資料,發現sparkStreaming整合flume有2種模式,一種是拉模式,一種是推模式,然后在簡單的聊聊這2種模式的特點,以及如何部署實現,需要做哪些事情,最后對比兩種模式的特點,選擇那種模式更好。
        • 推模式:Flume將數據Push推給Spark Streaming
        • 拉模式:Spark Streaming從flume 中Poll拉取數據
  • (2)、在實際開發的時候是如何保證數據不丟失的
    • 可以這樣說:
      • flume那邊采用的channel是將數據落地到磁盤中,保證數據源端安全性(可以在補充一下,flume在這里的channel可以設置為memory內存中,提高數據接收處理的效率,但是由於數據在內存中,安全機制保證不了,故選擇channel為磁盤存儲。整個流程運行有一點的延遲性)
      • sparkStreaming通過拉模式整合的時候,使用了FlumeUtils這樣一個類,該類是需要依賴一個額外的jar包(spark-streaming-flume_2.10)
      • 要想保證數據不丟失,數據的准確性,可以在構建StreamingConext的時候,利用StreamingContext.getOrCreate(checkpoint, creatingFunc: () => StreamingContext)來創建一個StreamingContext,使用StreamingContext.getOrCreate來創建StreamingContext對象,傳入的第一個參數是checkpoint的存放目錄,第二參數是生成StreamingContext對象的用戶自定義函數。如果checkpoint的存放目錄存在,則從這個目錄中生成StreamingContext對象;如果不存在,才會調用第二個函數來生成新的StreamingContext對象。在creatingFunc函數中,除了生成一個新的StreamingContext操作,還需要完成各種操作,然后調用ssc.checkpoint(checkpointDirectory)來初始化checkpoint功能,最后再返回StreamingContext對象。
        這樣,在StreamingContext.getOrCreate之后,就可以直接調用start()函數來啟動(或者是從中斷點繼續運行)流式應用了。如果有其他在啟動或繼續運行都要做的工作,可以在start()調用前執行。
      • 流式計算中使用checkpoint的作用:
        • 保存元數據,包括流式應用的配置、流式沒崩潰之前定義的各種操作、未完成所有操作的batch。元數據被存儲到容忍失敗的存儲系統上,如HDFS。這種ckeckpoint主要針對driver失敗后的修復。
        • 保存流式數據,也是存儲到容忍失敗的存儲系統上,如HDFS。這種ckeckpoint主要針對window operation、有狀態的操作。無論是driver失敗了,還是worker失敗了,這種checkpoint都夠快速恢復,而不需要將很長的歷史數據都重新計算一遍(以便得到當前的狀態)。
      • 設置流式數據checkpoint的周期
        • 對於一個需要做checkpoint的DStream結構,可以通過調用DStream.checkpoint(checkpointInterval)來設置ckeckpoint的周期,經驗上一般將這個checkpoint周期設置成batch周期的5至10倍。
      • 使用write ahead logs功能
        • 這是一個可選功能,建議加上。這個功能將使得輸入數據寫入之前配置的checkpoint目錄。這樣有狀態的數據可以從上一個checkpoint開始計算。開啟的方法是把spark.streaming.receiver.writeAheadLogs.enable這個property設置為true。另外,由於輸入RDD的默認StorageLevel是MEMORY_AND_DISK_2,即數據會在兩台worker上做replication。實際上,Spark Streaming模式下,任何從網絡輸入數據的Receiver(如kafka、flume、socket)都會在兩台機器上做數據備份。如果開啟了write ahead logs的功能,建議把StorageLevel改成MEMORY_AND_DISK_SER。修改的方法是,在創建RDD時由參數傳入。
      • 使用以上的checkpoint機制,確實可以保證數據0丟失。但是一個前提條件是,數據發送端必須要有緩存功能,這樣才能保證在spark應用重啟期間,數據發送端不會因為spark streaming服務不可用而把數據丟棄。而flume具備這種特性,同樣kafka也具備。
  • (3)Spark Streaming的數據可靠性
    • 有了checkpoint機制、write ahead log機制、Receiver緩存機器、可靠的Receiver(即數據接收並備份成功后會發送ack),可以保證無論是worker失效還是driver失效,都是數據0丟失。原因是:如果沒有Receiver服務的worker失效了,RDD數據可以依賴血統來重新計算;如果Receiver所在worker失敗了,由於Reciever是可靠的,並有write ahead log機制,則收到的數據可以保證不丟;如果driver失敗了,可以從checkpoint中恢復數據重新構建。

14、kafka整合sparkStreaming問題

  • (1)、如何實現sparkStreaming讀取kafka中的數據
    • 可以這樣說:在kafka0.10版本之前有二種方式與sparkStreaming整合,一種是基於receiver,一種是direct,然后分別闡述這2種方式分別是什么
      • receiver:是采用了kafka高級api,利用receiver接收器來接受kafka topic中的數據,從kafka接收來的數據會存儲在spark的executor中,之后spark streaming提交的job會處理這些數據,kafka中topic的偏移量是保存在zk中的。
        • 基本使用: val kafkaStream = KafkaUtils.createStream(streamingContext,
          [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
        • 還有幾個需要注意的點:
          • 在Receiver的方式中,Spark中的partition和kafka中的partition並不是相關的,所以如果我們加大每個topic的partition數量,僅僅是增加線程來處理由單一Receiver消費的主題。但是這並沒有增加Spark在處理數據上的並行度.
          • 對於不同的Group和topic我們可以使用多個Receiver創建不同的Dstream來並行接收數據,之后可以利用union來統一成一個Dstream。
          • 在默認配置下,這種方式可能會因為底層的失敗而丟失數據. 因為receiver一直在接收數據,在其已經通知zookeeper數據接收完成但是還沒有處理的時候,executor突然掛掉(或是driver掛掉通知executor關閉),緩存在其中的數據就會丟失. 如果希望做到高可靠, 讓數據零丟失,如果我們啟用了Write Ahead Logs(spark.streaming.receiver.writeAheadLog.enable=true)該機制會同步地將接收到的Kafka數據寫入分布式文件系統(比如HDFS)上的預寫日志中. 所以, 即使底層節點出現了失敗, 也可以使用預寫日志中的數據進行恢復. 復制到文件系統如HDFS,那么storage level需要設置成 StorageLevel.MEMORY_AND_DISK_SER,也就是KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)
      • direct:在spark1.3之后,引入了Direct方式。不同於Receiver的方式,Direct方式沒有receiver這一層,其會周期性的獲取Kafka中每個topic的每個partition中的最新offsets,之后根據設定的maxRatePerPartition來處理每個batch。(設置spark.streaming.kafka.maxRatePerPartition=10000。限制每秒鍾從topic的每個partition最多消費的消息條數)。
  • (2) 對比這2中方式的優缺點:
    • 采用receiver方式:這種方式可以保證數據不丟失,但是無法保證數據只被處理一次,WAL實現的是At-least-once語義(至少被處理一次),如果在寫入到外部存儲的數據還沒有將offset更新到zookeeper就掛掉,這些數據將會被反復消費. 同時,降低了程序的吞吐量。
    • 采用direct方式:相比Receiver模式而言能夠確保機制更加健壯. 區別於使用Receiver來被動接收數據, Direct模式會周期性地主動查詢Kafka, 來獲得每個topic+partition的最新的offset, 從而定義每個batch的offset的范圍. 當處理數據的job啟動時, 就會使用Kafka的簡單consumer api來獲取Kafka指定offset范圍的數據。
      • 優點:
        • 1、簡化並行讀取
          • 如果要讀取多個partition, 不需要創建多個輸入DStream然后對它們進行union操作. Spark會創建跟Kafka partition一樣多的RDD partition, 並且會並行從Kafka中讀取數據. 所以在Kafka partition和RDD partition之間, 有一個一對一的映射關系.
        • 2、高性能
          • 如果要保證零數據丟失, 在基於receiver的方式中, 需要開啟WAL機制. 這種方式其實效率低下, 因為數據實際上被復制了兩份, Kafka自己本身就有高可靠的機制, 會對數據復制一份, 而這里又會復制一份到WAL中. 而基於direct的方式, 不依賴Receiver, 不需要開啟WAL機制, 只要Kafka中作了數據的復制, 那么就可以通過Kafka的副本進行恢復.
        • 3、一次且僅一次的事務機制
          • 基於receiver的方式, 是使用Kafka的高階API來在ZooKeeper中保存消費過的offset的. 這是消費Kafka數據的傳統方式. 這種方式配合着WAL機制可以保證數據零丟失的高可靠性, 但是卻無法保證數據被處理一次且僅一次, 可能會處理兩次. 因為Spark和ZooKeeper之間可能是不同步的. 基於direct的方式, 使用kafka的簡單api, Spark Streaming自己就負責追蹤消費的offset, 並保存在checkpoint中. Spark自己一定是同步的, 因此可以保證數據是消費一次且僅消費一次。不過需要自己完成將offset寫入zk的過程,在官方文檔中都有相應介紹.
            *簡單代碼實例:
            • messages.foreachRDD(rdd=>{
              val message = rdd.map(.2)//對數據進行一些操作
              message.map(method)//更新zk上的offset (自己實現)
              updateZKOffsets(rdd)
              })
            • sparkStreaming程序自己消費完成后,自己主動去更新zk上面的偏移量。也可以將zk中的偏移量保存在mysql或者redis數據庫中,下次重啟的時候,直接讀取mysql或者redis中的偏移量,獲取到上次消費的偏移量,接着讀取數據。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM