Spark面試題(一)


一、spark集群運算的模式

Spark 有很多種模式,最簡單就是單機本地模式,還有單機偽分布式模式,復雜的則運行在集群中,目前能很好的運行在 Yarn和 Mesos 中,當然 Spark 還有自帶的 Standalone 模式,對於大多數情況 Standalone 模式就足夠了,如果企業已經有 Yarn 或者 Mesos 環境,也是很方便部署的。

standalone(集群模式):典型的Mater/slave模式,不過也能看出Master是有單點故障的;Spark支持ZooKeeper來實現 HA

on yarn(集群模式): 運行在 yarn 資源管理器框架之上,由 yarn 負責資源管理,Spark 負責任務調度和計算

on mesos(集群模式): 運行在 mesos 資源管理器框架之上,由 mesos 負責資源管理,Spark負責任務調度和計算。

on cloud(集群模式):比如 AWS 的 EC2,使用這個模式能很方便的訪問 Amazon的 S3;Spark支持多種分布式存儲系統:HDFS和S3

二、RDD中reduceBykey與groupByKey哪個性能好,為什么?

reduceByKey:reduceByKey會在結果發送至reducer之前會對每個mapper在本地進行merge,有點類似於在MapReduce中的combiner。這樣做的好處在於,在map端進行一次reduce之后,數據量會大幅度減小,從而減小傳輸,保證reduce端能夠更快的進行結果計算。

groupByKey:groupByKey會對每一個RDD中的value值進行聚合形成一個序列(Iterator),此操作發生在reduce端,所以勢必會將所有的數據通過網絡進行傳輸,造成不必要的浪費。同時如果數據量十分大,可能還會造成OutOfMemoryError。

通過以上對比可以發現在進行大量數據的reduce操作時候建議使用reduceByKey。不僅可以提高速度,還是可以防止使用groupByKey造成的內存溢出問題。

三、如何從kafka中獲取數據?

1)基於Receiver的方式

這種方式使用Receiver來獲取數據。Receiver是使用Kafka的高層次Consumer API來實現的。receiver從Kafka中獲取的數據都是存儲在Spark Executor的內存中的,然后Spark Streaming啟動的job回去處理那些數據。

2)基於Direct的方式

這種新的不基於Receiver的直接方式,是在Spark 1.3中引入的,從而能夠確保更加健壯的機制。替代掉使用Receiver來接收數據后,這種方式會周期性地查詢Kafka,來獲得每個topic+partition的最新的offset,從而定義每個batch的offset的范圍。當處理數據的job啟動時,就會使用Kafka的簡單consumer api來獲取Kafka指定offset范圍的數據

四、cache后面能不能接其他算子,它是不是action操作?

Cache后可以接其他算子,但是接了算子之后,起不到緩存的作用,因為會重復出發cache。

Cache不是action操作。

五、ReduceByKey是action算子嘛?

ReduceByKey是transform算子,reduce是action算子

六、數據本地性是在哪個階段確定的?

Dag在划分stage時確定。

七、RDD的彈性體現在什么方面?

自動的進行內存和磁盤的存儲切換

基於Lineage的高效容錯

Task如果失敗會自動進行特定次數的重試

Stage如果失敗會自動進行特定次數的重試,而且只會計算失敗的分片

Cache和persist,數據計算之后持久化緩存

數據調度彈性,DAG TASK調度和資源無關

數據分片的高度彈性,a.分片很多碎片可以合並成大的

八、常規的容錯方式有哪幾種?

數據檢查點(checkpoint),會發生拷貝,浪費資源

記錄數據的更新,每次更新都會記錄下來,復雜且消耗性能

詳見:Spark容錯機制

九、RDD通過Lineage(記錄數據更新)方式為何很高效?

1)lazy記錄了數據的來源,RDD是不可變的,且是lazy級別的,且rDD之間構成了鏈條,lazy是彈性的基石。由於RDD不可變,所以每次操作就產生新的rdd,不存在全局修改的問題,控制難度下降,所有有計算鏈條將復雜計算鏈條存儲下來,計算的時候從后往前回溯900步是上一個stage的結束,要么就checkpoint

2)記錄原數據,是每次修改都記錄,代價很大如果修改一個集合,代價就很小,官方說rdd是粗粒度的操作,是為了效率,為了簡化,每次都是操作數據集合,寫或者修改操作,都是基於集合的rdd的寫操作是粗粒度的,rdd的讀操作既可以是粗粒度的也可以是細粒度,讀可以讀其中的一條條的記錄。

3)簡化復雜度,是高效率的一方面,寫的粗粒度限制了使用場景如網絡爬蟲,現實世界中,大多數寫是粗粒度的場景

十、RDD有哪些缺陷?

不支持細粒度的寫和更新操作,spark寫數據是粗粒度的。所謂粗粒度,就是批量寫入數據,為了提高效率。但是讀數據是細粒度的,也就是說是一條一條讀的。

不支持增量迭代計算,Flink支持。

十一、Spark有哪些聚合類的算子,我們應該避免什么類型的算子?

在我們的開發過程中,能避免則盡量避免使用reduceByKey、groupByKey、join、dictinct,repartition等會進行shuffle的算子,盡量使用map類的非shuffle算子。這樣的話,沒有shuffle操作或者僅有較少的shuffle操作的Spark作業,可以大大減少性能開銷。

十二、對於spark中的數據傾斜問題有什么好的方案?

前提是定位數據傾斜,是OOM還是任務執行緩慢,查看日志或者看4040weburl

解決方法:

· 避免不必要的shuffle,如使用廣播小表的方式,將reduce-side-join提升為map-side-join

· 分拆發生數據傾斜的記錄,分成幾個部分進行,然后合並join后的結果

· 改變並行度,可能並行度太少了,導致個別task數據壓力大

· 兩階段聚合,先局部聚合,再全局聚合

· 自定義partitioner,分散key的分布,使其更加均勻

十三、RDD創建有哪幾種方式?

使用程序中的集合創建rdd(.parallelize)

使用本地文件系統創建rdd(.textfile)

使用hdfs創建rdd

基於數據庫db創建rdd

基於nosql創建rdd,如hbase

基於s3創建rdd

基於數據流創建rdd,如socket

十四、Spark中並行度怎么設置比較合適?

Spark並行度,每個core承載24個partition,32core,那么64128之間的並行度,也就是設置64~128個partition,並行度和數據規模無關,只和內存使用量和cpu使用時間有關。

十五、Spark中的數據的位置是由誰來管理的?

每個數據分片都對應具體物理位置,數據位置是由blockManager管理,無論數據是在磁盤,內存還是tacyan,都由blockManager管理。

十六、spark的數據本地性有哪幾種?

Spark的數據本地性有三種:

PROCESS_LOCAL是指讀取緩存在本地節點的數據

NODE_LOCAL是指讀取本地節點磁盤的數據

ANY是指讀取非本地節點的數據

通常讀取數據PROCESS_LOCAL>NODE_LOCAL>ANY,盡量使數據以PROCESS_LOCAL或NODE_LOCAL方式讀取。其中PROCESS_LOCAL還和cache有關,如果RDD經常用的話將該RDD cache到內存中,注意,由於cache是lazy的,所以必須通過一個action的觸發,才能真正的將該RDD cache到內存中。

十七、RDD有幾種操作算子類型?

transformation,rdd由一種轉為另一種rdd

Action,reduce,collect...

Crontroller,crontroller是控制算子,cache,persist,對性能和效率的有很好的支持。

十八、Spark如何處理不能被序列化的對象?

將不能序列化的內容封裝成object

十九、collect的功能是什么?其底層如何實現的?

Driver將集群中各節點的內容收集起來匯總成結果,collect返回的類型為Array,collect把各個節點上的數據抓過來,抓過來數據是Array類型,collect對Array抓過來的結果進行合並,合並后Array中只有一個元素,是tuple類型(KV)的。

二十、Spark程序執行,有時候默認為什么會產生很多task,怎么修改默認task執行個數?

輸入數據有很多task,尤其是有很多小文件的時候,有多少個輸入block就有多少個task

Spark中有partition的概念。每個partition對應一個task,task越多,在處理大規模數據的時候,就會越有效率。不過task並不是越多越好,如果數據量不大,則沒必要啟動太多task;

參數可以通過spark_home/conf/spark-default.conf配置文件設置:

spark.sql.shuffle.partitions 50 spark.default.parallelism 10

第一個是針對spark sql的task數量

第二個是非spark sql程序設置生效

二十一、為什么Spark Application在沒有獲得足夠的資源,job就開始執行了,可能會導致什么問題發生?

會導致執行該job時候集群資源不足,導致執行job結束也沒有分配足夠的資源,分配了部分Executor,該job就開始執行task,應該是task的調度線程和Executor資源申請是異步的。

二十二、map和flatmap的區別?

Map:對RDD每個元素轉換,文件中每一行的數據返回一個數組對象。

Flatmap:對RDD每個元素轉換,然后再扁平化將所有對象合並成一個對象,文件中的所有行數據金返回一個數組對象,會拋棄為null的值。

二十三、Spark為什么要持久化,一般什么場景下要進行persist操作?

Spark所有復雜的算法都會有persist的身影,spark默認數據放在內存,spark很多內容都是放在內存的,非常適合高速迭代,1000個步驟,只有第一個輸入數據,中間不產生臨時數據,但分布式系統風險很高,所以容易出錯,就要容錯,rdd出錯或者分片可以根據血統算出來,如果沒有對父rdd進行persist或者cache的話,就需要重頭做。

以下場景要進行persist:

某個步驟計算非常耗時,需要進行persist持久化

計算鏈條非常長,重新恢復要算很多步驟

Checkpoint所在的rdd要持久化persist,lazy級別,框架發現有checkpoint時單獨觸發一個job,需要重新算一遍,checkpoint前要持久化,寫個rdd.cache或者rdd.persist,將結果保存起來,再寫checkpoint操作,這樣執行起來會非常快,不需要重新計算rdd鏈條了。

Shuffle之后要persist,shuffle要進行網絡傳輸,風險很大,數據丟失重來,恢復代價很大

Shuffle之前要persist,框架默認將數據持久化到磁盤,這個是框架自動做的

二十四、為什么要進行序列化

序列化可以減少數據的體積,減少存儲空間,高效存儲和傳輸數據,不好的是使用的時候要反序列化,非常消耗cpu。

二十五、介紹一下join操作優化經驗?

join其實常見的就分為兩類: map-side join 和  reduce-side join。當大表和小表join時,用map-side join能顯著提高效率。將多份數據進行關聯是數據處理過程中非常普遍的用法,不過在分布式計算系統中,這個問題往往會變的非常麻煩,因為框架提供的 join 操作一般會將所有數據根據 key 發送到所有的 reduce 分區中去,也就是 shuffle 的過程。造成大量的網絡以及磁盤IO消耗,運行效率極其低下,這個過程一般被稱為 reduce-side-join。如果其中有張表較小的話,我們則可以自己實現在 map 端實現數據關聯,跳過大量數據進行 shuffle 的過程,運行時間得到大量縮短,根據不同數據可能會有幾倍到數十倍的性能提升。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM