- 序言
- 一、spark基本常識
- 1、spark中的RDD是什么,有哪些特性?
- 2、概述一下spark中常用算子區別(map,mapPartitions, foreach, foreachPartition)
- 3、map與flatMap的區別
- 4、reduceByKey是不是action?
- 5、cache后面能不能接其他算子?它是不是action操作?
- 6、RDD有哪些缺陷?
- 7、本地數據性是在哪個環節確定的?
- 8、RDD的彈性表現在哪幾點?
- 9、常規的容錯方式有那集中類型?
- 10、Spark提交你的jar包時所用的命令是什么?
- 11、spark有哪些組件?
- 12、Spark工作機制
- 13、什么是粗粒度,什么是細粒度,各自的優缺點是什么?
- 14、driver的功能是什么?
- 15、spark中worker的主要工作是什么?
- 16、RDD的創建有哪幾種方式?
- 16、列舉你常用的action?
- 二、spark要點
- 1、Spark shuffle時,是否會在磁盤上存儲
- 2、談談spark中的寬窄依賴
- 3、spark中如何划分stage
- 4、spark中cache和persist的區別
- 5、spark中的數據傾斜的現象,原因,后果
- 6、spark數據傾斜的處理
- 7、spark有哪些聚合類的算子,我們應該避免什么類型的算子?
- 8、spark並行度怎么設置比較合適
- 9、spark中數據的位置是被誰管理的?
- 10、spark數據本地性有哪幾種?
- 11、RDD有幾種操作類型?
- 12、spark如何處理不能被序列化的對象?
- 13、collect功能是什么?底層是怎么實現的?
- 14、spark技術棧有哪些組件,每個組件都有什么功能,適合什么應用場景?
序言
本文內容是整理和收集spark基本問題,材料源於各類平台文章中,本文編寫僅作為學習參看使用。
其中有CSDN博主「為了九億少女的期待」,原文鏈接:https://blog.csdn.net/Lwj879525930/article/details/82559596
一、spark基本常識
1、spark中的RDD是什么,有哪些特性?
RDD(Resilient Distributed Dataset)叫做分布式數據集模式spark中最基本的數據抽象,它代表一個不可變,可分區,里面的元素可以並行計算的集合。
Resilient:表示彈性的,彈性表示
Destributed:分布式,可以並行在集群計算
Dataset:就是一個集合,用於存放數據的
五大特性:
1)一個分區列表,RDD中的數據都存儲在一個分區列表中
2)作用在每一個分區列表中的函數。
3)一個RDD依賴於其他多個RDD,RDD的容錯機制就是根據這個特性而來的。
4)可選的,針對於kv類型的RDD才有這個特性,作用是決定了數據的來源及數據處理后的去向。
5)可選項,數據本地性,數據位置最優。
2、概述一下spark中常用算子區別(map,mapPartitions, foreach, foreachPartition)
map:用於遍歷RDD,將函數應用的每一個元素,返回新的RDD(transformation算子)
mapPartitions:用於遍歷RDD的每一個分區,返回生成一個新的 RDD(transformation算子)
foreach:用於遍歷RDD,將函數應用於每一個元素,無返回值(action算子)
foreachPartition:用於遍歷操作RDD中的每一個分區,無返回值(action算子)
追述:一般使用mapPatitions和foreachPatition算子比map和foreach更加高效,推薦使用
3、map與flatMap的區別
map:對RDD每個元素轉換,文件中的每一行數據返回一個數組對象。
flatMap:對RDD每一元素進行轉換,然后再扁平化,將所有的對象合並為一個對象,文件中的所有行數據僅返回一個數組對象,會拋棄值為null的值。
4、reduceByKey是不是action?
不是,很多人都會以為是action, reduce rdd 是action。
5、cache后面能不能接其他算子?它是不是action操作?
cache可以接其他算子,但是接了算子之后,起不到緩存應有的效果,因為會重新出發cache。cache不是action操作。
6、RDD有哪些缺陷?
不支持細粒度的寫和更新操作(如網絡爬蟲),spark寫數據是粗粒度的,就是批量寫入數據,未來提高效率。但是讀數據是細粒度的,也就是說可以一條條的讀。
不支持增量迭代計算,但Flink支持。
7、本地數據性是在哪個環節確定的?
具體的task運行在哪台機器上,dag划分stage的時候確定的。
8、RDD的彈性表現在哪幾點?
1)自動的進行內存和磁盤的存儲和切換;
2)基於Lingage的高效容錯;
3)Task如果失敗就會自動進行特定次數的重試;
4)Stage如果失敗會自動進行特定次數的重試,而且只會自己算失敗的分片;
5)checkpoint和persist,數據計算之后持久化緩存
6)數據調度彈性,DAG TASK調度和資源無關;
7)數據分片的高度彈性,a.分片很多碎片可以合並成大的,b.par
9、常規的容錯方式有那集中類型?
1).數據檢查點,會發生拷貝,浪費資源;
2).記錄數據的更新,每次更新都會記錄下來,比較復雜且比較消耗性能。
10、Spark提交你的jar包時所用的命令是什么?
spark-submit
11、spark有哪些組件?
-
Master:集群管理節點,不參與計算。
-
Worker:計算節點,進程本身不參與計算,和master匯報。
-
Driver:運行程序的main方法,創建spark context對象。
-
Client:用戶提交程序的入口。
12、Spark工作機制
用戶在client提交作業后,會由Driver運行main方法並創建spark context上下文。執行RDD算子,形成dag圖輸入dagscheduler,按照add之間的依賴關系划分stage輸入task scheduler。 task scheduler會將stage划分為task set分發到各個節點的executor中執行。
13、什么是粗粒度,什么是細粒度,各自的優缺點是什么?
-
粗粒度:啟動時就分配好資源,程序啟動,后續具體使用就使用分配好的資源,不需要再分配資源。好處:作業特別多時,資源復用率較高,使用粗粒度。缺點:容易浪費資源,如果一個job有1000個task,完成999個,還有一個沒完成,那么就使用粗粒度。如果999個資源閑置在那里,就會造成大量的資源浪費。
-
細粒度:用資源的時候分配,用完了就立即收回資源,啟動會麻煩一點,啟動一次分配一次,會比較麻煩。
14、driver的功能是什么?
-
一個spark作業運行時包括一個driver進程,也就是作業的主進程,具有main函數,並且有sparkContext的實例,是程序的入口。
-
功能:負責向集群申請資源,向master注冊信息,負責了作業的調度,負責了作業的解析,生成stage並調度task到executor上,包括DAGScheduler,TaskScheduler。
15、spark中worker的主要工作是什么?
主要功能:管理當前節點內存,CPU的使用情況,接收Master發送過來的資源指令,通過executorRunner啟動程序分配任務,worker就類似於包工頭,管理分配新進程,做計算的服務,相當於process服務,需要注意的是:
- 1)worker會不會匯報當前信息給master?Worker心跳給master主要只有workid,不會以心跳的方式發送資源信息給master,這樣master就知道worker是否存活,只有故障的時候才發送資源信息。
- 2)Worker會不會運行代碼?具體運行的是executor。可以運行具體application斜的業務邏輯代碼,操作代碼的節點,不會去運行代碼。
16、RDD的創建有哪幾種方式?
- 1)使用程序中的集合創建RDD
- 2)使用本地文件系統創建RDD
- 3)使用hdfs創建RDD
- 4)基於數據庫db創建RDD
- 5)基於Nosql創建RDD
- 6)基於s3創建RDD
- 7)基於數據流,如socket創建RDD
16、列舉你常用的action?
collect, reduce, take, count, asveAsTextFile等。
二、spark要點
1、Spark shuffle時,是否會在磁盤上存儲
會
2、談談spark中的寬窄依賴
寬依賴:指的是多個子RDD的Partition會依賴於同一個父RDD的Partition,關系是一對多,父RDD的一個分區的數據去到了子RDD的不同分區里面,會有shuffle產生。
窄依賴:指的是每一個父的Partition最多被子RDD的一個Partition使用,是一對一的,也就是父RDD的一個分區去到了子RDD的一個分區中,這個過程沒有shuffle產生。
分區的標准就是看父RDD的一個分區的數據流的流向,要是流向一個partition的話就是窄依賴,否則就是寬依賴,如圖所示:
3、spark中如何划分stage
stage的概念:spark任務會根據RDD之間的依賴關系,形成一個DAG有向無環圖,DAG會提交給DAGScheduler,DAGScheduler會把DGA划分互相依賴的多個stage,划分依據就是寬窄依賴,遇到寬窄依賴就划分stage,每個stage包含一個或多個task,然后將這些task以taskSet的形式提交給TaskScheduler運行,stage是由一組並行的task組成。
- 1)spark程序中可以因為不同的action觸發眾多的job,一個程序中可以有很多的job,每一個job是由一個或者多個stage構成的,后面的stage依賴於前面的stage,也就是說只有前面以來的stage計算完畢后,后面的stage才會運行;
- 2)Stage的划分標准就是寬窄依賴:何時產生寬依賴就會產生一個新的stage,例如reduceByKey,groupByKey,join的算子,會導致寬依賴的產生;
- 3)切割規則:從后往前,遇到寬依賴就切割stage。
- 4)圖例:
4、spark中cache和persist的區別
cache:緩存數據,默認是緩存在內存中,其本質還是調用persist。
persist:緩存數據,有豐富的數據緩存策略。。數據可以保存在內存中也可以保存磁盤中,使用的時候指定對應的緩存級別就可以了。
5、spark中的數據傾斜的現象,原因,后果
-
1)數據傾斜的現象:
多數task執行速度較快,少數task執行時間非常長,或者等待很長時間后提示你內存不足,執行失敗。 -
2)數據傾斜的原因:
數據問題:key本身分布不均勻(包括大量的key為空);key的設置不合理。
Spark使用問題:shuffle時的並發度不夠;計算方式有誤。 -
3)數據傾斜的后果:
spark中的stage的執行時間受限於最后那個執行完成的task,因此運行緩慢的任務會拖垮整個程序運行的速度(分布式程序運行的速度是由最慢的那個task決定的)。
過多的數據在同一個task中運行,會把executor撐爆。
6、spark數據傾斜的處理
發生數據傾斜的時候,不要急於提高executor的資源,修改參數或是修改程序,首先要檢查數據本身,是否存在異常數據。
(一)數據問題造成的數據傾斜
-
1)找出異常的key值
a.如果任務長時間卡在最后一個或最后幾個任務,首先要對key進行抽樣分析,判斷是哪些key造成的。選取key,對數據進行抽樣,統計出現的次數,根據出現的次數大小排列數前幾個。
b.比如:df.select(“key”).sample(false,0.1).(k=>(k,1)).reduceBykey(+).map(k=>(k._2,k._1)).sortByKey(false).take(10)
c.如果發現多數數據分布都較為均勻,二個別數據比其他數據大上若干個數量級,則說明發生了數據傾斜。 -
2)經過分析,傾斜的數據主要有以下三種情況:
a.null(空值)或是一些無意義的信息()之類的,大多是這個原因引起。
b.無效數據,大量重復的測試數據或是對結果影響不大的有效數據。
c.有效數據,業務導致的正常數據分布。 -
3)解決辦法:
a.第1,2種情況,直接對數據進行過濾即可(因為該數據對當前業務不會產生影響)。
b.第3種情況則需要進行一些特殊操作,常見的有以下幾種做法 :
(1) 隔離執行,將異常的key過濾出來單獨處理,最后與正常數據的處理結果進行union操作。
(2) 對key先添加隨機值,進行操作后,去掉隨機值,再進行一次操作。
(3) 使用reduceByKey 代替 groupByKey(reduceByKey用於對每個key對應的多個value進行merge操作,最重要的是它能夠在本地先進行merge操作,並且merge操作可以通過函數自定義.)
(4) 使用map join。
(二)spark使用不當造成的數據傾斜
-
1)提高shuffle並行度
a. DataFrame和sparkSql可以設置spark.sql.shuffle.partitions參數控制shuffle的並發度,默認為20.
b. RDD操作可以設置spark.default.parallelism控制並發度,默認參數頭不同的ClusterManager控制。
c. 局限性:只是讓每個task執行更少的不同的key。無法解決個別key特別大的情況造成的傾斜,如果某些key的大小非常大,即使一個task單獨執行它,也會受到數據傾斜的困擾。 -
2)使用map join 代替 reduce join
a. 在校表不是特別大(取決於你的executor大小)的情況下使用,可以使程序避免shuffle的過程,自然也就沒有數據傾斜的困擾了。
b. 局限性:因為先是將小數據發送到每個executor上,所以數據量不能太大。
7、spark有哪些聚合類的算子,我們應該避免什么類型的算子?
在我們的開發過程中,能避免則盡可能避免使用reduceByKey,join,distinct,repartition等會進行shuffle的算子,盡量使用map類的非shuffle算子。這樣的話,沒有shuffle操作或者僅有較少shuffle操作的spark作業,可以大大減少性能開銷。
8、spark並行度怎么設置比較合適
Spark並行度,每個core承載24個partition,如32個core,那么64128之間的並行度,也就是設置64~128怕熱提提歐尼,並行度和數據規模無關,只和內存使用量和cpu使用時間有關。
9、spark中數據的位置是被誰管理的?
每個數據分片都對應具體物理位置,數據的位置是被blockManager管理,無論數據是在磁盤、內存還是tacyan,都是有blockManager管理。
10、spark數據本地性有哪幾種?
-
PROCESS_LOCAL是指讀取緩存在本地節點的數據。
-
NODE_LOCAL是指讀取本地節點硬盤數據。
-
ANY是指讀取非本地節點數據。
-
通常讀取數據PROCESS_LOCAL>NODE_LOCAL>ANY,盡量使用數據以PROCESS_LOCAL或NODE_LOCAL方式讀取。其中PROCESS_LOCAL還和cache有關,如果RDD經常用的話將該RDD cache到內存中,注意,由於cache是lazy的,所以必須要通過一個action的觸發,才能真正的將該RDD cache到內存中。
11、RDD有幾種操作類型?
-
1)transformation,進行數據狀態的轉換,對已有的RDD創建新的RDD。
-
2)action,觸發具體的作業,對RDD最后取結果的一種操作。
-
3)crontroller,對性能效率和容錯方面的支持。persist , cache, checkpoint。
12、spark如何處理不能被序列化的對象?
- 將不能被序列化的對象封裝成object
13、collect功能是什么?底層是怎么實現的?
- Driver通過collect把集群中各個節點的內容收集過來匯總成結果,collect返回結果是Array類型的,collect把各個節點上的數據抓過來,抓過來的數據是Array型,collect對Array抓過來的結果進行合並,合並后Array中只有一個元素,是tuple類型(KV類型)的。
14、spark技術棧有哪些組件,每個組件都有什么功能,適合什么應用場景?
-
1)Spark core:是其它組件的基礎,spark的內核,主要包含:有向循環圖、RDD、Lingage、Cache、Broadcast等,並封裝了底層通訊框架,是Spark的基礎。
-
2)SparkStreaming是一個對實時數據流進行高通量、容錯處理的流式處理系統,可以多多種數據源(如kafka、Flume、Twitter、Zero和TCP套接字)進行類似Map、Reduce和Join等復雜操作,將流式計算分解成一系列短小的批處理作業。
-
3)Spark sql:Shark是SparkSQL的前身,sparkSQL的一個重要特點是其能夠統一處理關系表和RDD,是的開發人員可以輕松的使用SQL命令進行外部查詢,同時進行更復雜的數據分析。
-
4)BlinkDB:是一個用於在海量數據上運行交互式SQL查詢的大規模並行查詢引擎,它允許用戶通過權衡數據精度來提升查詢響應時間,其數據的精度被控制在允許的誤差范圍內。
-
5)MLBase是Spark生態圈的一部分專注於機器學習,讓機器學習的門檻更低,讓一些並不了解機器學習的用戶也能方便使用MLbase。MLbase分為四部分:MLlib,MLI,ML Qptimizer和MLRuntime。
-
6)GraphX是Spark中用於圖河圖並行計算。