一、spark集群運算的模式
Spark 有很多種模式,最簡單就是單機本地模式,還有單機偽分布式模式,復雜的則運行在集群中,目前能很好的運行在 Yarn和 Mesos 中,當然 Spark 還有自帶的 Standalone 模式,對於大多數情況 Standalone 模式就足夠了,如果企業已經有 Yarn 或者 Mesos 環境,也是很方便部署的。
standalone(集群模式):典型的Mater/slave模式,不過也能看出Master是有單點故障的;Spark支持ZooKeeper來實現 HA
on yarn(集群模式): 運行在 yarn 資源管理器框架之上,由 yarn 負責資源管理,Spark 負責任務調度和計算
on mesos(集群模式): 運行在 mesos 資源管理器框架之上,由 mesos 負責資源管理,Spark負責任務調度和計算。
on cloud(集群模式):比如 AWS 的 EC2,使用這個模式能很方便的訪問 Amazon的 S3;Spark支持多種分布式存儲系統:HDFS和S3
二、RDD中reduceBykey與groupByKey哪個性能好,為什么?
reduceByKey:reduceByKey會在結果發送至reducer之前會對每個mapper在本地進行merge,有點類似於在MapReduce中的combiner。這樣做的好處在於,在map端進行一次reduce之后,數據量會大幅度減小,從而減小傳輸,保證reduce端能夠更快的進行結果計算。
groupByKey:groupByKey會對每一個RDD中的value值進行聚合形成一個序列(Iterator),此操作發生在reduce端,所以勢必會將所有的數據通過網絡進行傳輸,造成不必要的浪費。同時如果數據量十分大,可能還會造成OutOfMemoryError。
通過以上對比可以發現在進行大量數據的reduce操作時候建議使用reduceByKey。不僅可以提高速度,還是可以防止使用groupByKey造成的內存溢出問題。
三、如何從kafka中獲取數據?
1)基於Receiver的方式
這種方式使用Receiver來獲取數據。Receiver是使用Kafka的高層次Consumer API來實現的。receiver從Kafka中獲取的數據都是存儲在Spark Executor的內存中的,然后Spark Streaming啟動的job回去處理那些數據。
2)基於Direct的方式
這種新的不基於Receiver的直接方式,是在Spark 1.3中引入的,從而能夠確保更加健壯的機制。替代掉使用Receiver來接收數據后,這種方式會周期性地查詢Kafka,來獲得每個topic+partition的最新的offset,從而定義每個batch的offset的范圍。當處理數據的job啟動時,就會使用Kafka的簡單consumer api來獲取Kafka指定offset范圍的數據
四、cache后面能不能接其他算子,它是不是action操作?
Cache后可以接其他算子,但是接了算子之后,起不到緩存的作用,因為會重復出發cache。
Cache不是action操作。
五、ReduceByKey是action算子嘛?
ReduceByKey是transform算子,reduce是action算子
六、數據本地性是在哪個階段確定的?
Dag在划分stage時確定。
七、RDD的彈性體現在什么方面?
自動的進行內存和磁盤的存儲切換
基於Lineage的高效容錯
Task如果失敗會自動進行特定次數的重試
Stage如果失敗會自動進行特定次數的重試,而且只會計算失敗的分片
Cache和persist,數據計算之后持久化緩存
數據調度彈性,DAG TASK調度和資源無關
數據分片的高度彈性,a.分片很多碎片可以合並成大的
八、常規的容錯方式有哪幾種?
數據檢查點(checkpoint),會發生拷貝,浪費資源
記錄數據的更新,每次更新都會記錄下來,復雜且消耗性能
詳見:Spark容錯機制
九、RDD通過Lineage(記錄數據更新)方式為何很高效?
1)lazy記錄了數據的來源,RDD是不可變的,且是lazy級別的,且rDD之間構成了鏈條,lazy是彈性的基石。由於RDD不可變,所以每次操作就產生新的rdd,不存在全局修改的問題,控制難度下降,所有有計算鏈條將復雜計算鏈條存儲下來,計算的時候從后往前回溯900步是上一個stage的結束,要么就checkpoint
2)記錄原數據,是每次修改都記錄,代價很大如果修改一個集合,代價就很小,官方說rdd是粗粒度的操作,是為了效率,為了簡化,每次都是操作數據集合,寫或者修改操作,都是基於集合的rdd的寫操作是粗粒度的,rdd的讀操作既可以是粗粒度的也可以是細粒度,讀可以讀其中的一條條的記錄。
3)簡化復雜度,是高效率的一方面,寫的粗粒度限制了使用場景如網絡爬蟲,現實世界中,大多數寫是粗粒度的場景
十、RDD有哪些缺陷?
不支持細粒度的寫和更新操作,spark寫數據是粗粒度的。所謂粗粒度,就是批量寫入數據,為了提高效率。但是讀數據是細粒度的,也就是說是一條一條讀的。
不支持增量迭代計算,Flink支持。
十一、Spark有哪些聚合類的算子,我們應該避免什么類型的算子?
在我們的開發過程中,能避免則盡量避免使用reduceByKey、groupByKey、join、dictinct,repartition等會進行shuffle的算子,盡量使用map類的非shuffle算子。這樣的話,沒有shuffle操作或者僅有較少的shuffle操作的Spark作業,可以大大減少性能開銷。
十二、對於spark中的數據傾斜問題有什么好的方案?
前提是定位數據傾斜,是OOM還是任務執行緩慢,查看日志或者看4040weburl
解決方法:
· 避免不必要的shuffle,如使用廣播小表的方式,將reduce-side-join提升為map-side-join
· 分拆發生數據傾斜的記錄,分成幾個部分進行,然后合並join后的結果
· 改變並行度,可能並行度太少了,導致個別task數據壓力大
· 兩階段聚合,先局部聚合,再全局聚合
· 自定義partitioner,分散key的分布,使其更加均勻
十三、RDD創建有哪幾種方式?
使用程序中的集合創建rdd(.parallelize)
使用本地文件系統創建rdd(.textfile)
使用hdfs創建rdd
基於數據庫db創建rdd
基於nosql創建rdd,如hbase
基於s3創建rdd
基於數據流創建rdd,如socket
十四、Spark中並行度怎么設置比較合適?
Spark並行度,每個core承載24個partition,32core,那么64128之間的並行度,也就是設置64~128個partition,並行度和數據規模無關,只和內存使用量和cpu使用時間有關。
十五、Spark中的數據的位置是由誰來管理的?
每個數據分片都對應具體物理位置,數據位置是由blockManager管理,無論數據是在磁盤,內存還是tacyan,都由blockManager管理。
十六、spark的數據本地性有哪幾種?
Spark的數據本地性有三種:
PROCESS_LOCAL是指讀取緩存在本地節點的數據
NODE_LOCAL是指讀取本地節點磁盤的數據
ANY是指讀取非本地節點的數據
通常讀取數據PROCESS_LOCAL>NODE_LOCAL>ANY,盡量使數據以PROCESS_LOCAL或NODE_LOCAL方式讀取。其中PROCESS_LOCAL還和cache有關,如果RDD經常用的話將該RDD cache到內存中,注意,由於cache是lazy的,所以必須通過一個action的觸發,才能真正的將該RDD cache到內存中。
十七、RDD有幾種操作算子類型?
transformation,rdd由一種轉為另一種rdd
Action,reduce,collect...
Crontroller,crontroller是控制算子,cache,persist,對性能和效率的有很好的支持。
十八、Spark如何處理不能被序列化的對象?
將不能序列化的內容封裝成object
十九、collect的功能是什么?其底層如何實現的?
Driver將集群中各節點的內容收集起來匯總成結果,collect返回的類型為Array,collect把各個節點上的數據抓過來,抓過來數據是Array類型,collect對Array抓過來的結果進行合並,合並后Array中只有一個元素,是tuple類型(KV)的。
二十、Spark程序執行,有時候默認為什么會產生很多task,怎么修改默認task執行個數?
輸入數據有很多task,尤其是有很多小文件的時候,有多少個輸入block就有多少個task
Spark中有partition的概念。每個partition對應一個task,task越多,在處理大規模數據的時候,就會越有效率。不過task並不是越多越好,如果數據量不大,則沒必要啟動太多task;
參數可以通過spark_home/conf/spark-default.conf配置文件設置:
spark.sql.shuffle.partitions 50 spark.default.parallelism 10
第一個是針對spark sql的task數量
第二個是非spark sql程序設置生效
二十一、為什么Spark Application在沒有獲得足夠的資源,job就開始執行了,可能會導致什么問題發生?
會導致執行該job時候集群資源不足,導致執行job結束也沒有分配足夠的資源,分配了部分Executor,該job就開始執行task,應該是task的調度線程和Executor資源申請是異步的。
二十二、map和flatmap的區別?
Map:對RDD每個元素轉換,文件中每一行的數據返回一個數組對象。
Flatmap:對RDD每個元素轉換,然后再扁平化將所有對象合並成一個對象,文件中的所有行數據金返回一個數組對象,會拋棄為null的值。
二十三、Spark為什么要持久化,一般什么場景下要進行persist操作?
Spark所有復雜的算法都會有persist的身影,spark默認數據放在內存,spark很多內容都是放在內存的,非常適合高速迭代,1000個步驟,只有第一個輸入數據,中間不產生臨時數據,但分布式系統風險很高,所以容易出錯,就要容錯,rdd出錯或者分片可以根據血統算出來,如果沒有對父rdd進行persist或者cache的話,就需要重頭做。
以下場景要進行persist:
某個步驟計算非常耗時,需要進行persist持久化
計算鏈條非常長,重新恢復要算很多步驟
Checkpoint所在的rdd要持久化persist,lazy級別,框架發現有checkpoint時單獨觸發一個job,需要重新算一遍,checkpoint前要持久化,寫個rdd.cache或者rdd.persist,將結果保存起來,再寫checkpoint操作,這樣執行起來會非常快,不需要重新計算rdd鏈條了。
Shuffle之后要persist,shuffle要進行網絡傳輸,風險很大,數據丟失重來,恢復代價很大
Shuffle之前要persist,框架默認將數據持久化到磁盤,這個是框架自動做的
二十四、為什么要進行序列化
序列化可以減少數據的體積,減少存儲空間,高效存儲和傳輸數據,不好的是使用的時候要反序列化,非常消耗cpu。
二十五、介紹一下join操作優化經驗?
join其實常見的就分為兩類: map-side join 和 reduce-side join。當大表和小表join時,用map-side join能顯著提高效率。將多份數據進行關聯是數據處理過程中非常普遍的用法,不過在分布式計算系統中,這個問題往往會變的非常麻煩,因為框架提供的 join 操作一般會將所有數據根據 key 發送到所有的 reduce 分區中去,也就是 shuffle 的過程。造成大量的網絡以及磁盤IO消耗,運行效率極其低下,這個過程一般被稱為 reduce-side-join。如果其中有張表較小的話,我們則可以自己實現在 map 端實現數據關聯,跳過大量數據進行 shuffle 的過程,運行時間得到大量縮短,根據不同數據可能會有幾倍到數十倍的性能提升。