1、Spark有幾種部署方式?(重點)
Spark支持3種集群管理器(Cluster Manager),分別為:
- Standalone:獨立模式,Spark原生的簡單集群管理器,自帶完整的服務,可單獨部署到一個集群中,無需依賴任何其他資源管理系統,使用Standalone可以很方便地搭建一個集群;
- Apache Mesos:一個強大的分布式資源管理框架,它允許多種不同的框架部署在其上,包括yarn;
- Hadoop YARN:統一的資源管理機制,在上面可以運行多套計算框架,如map reduce、storm等,根據driver在集群中的位置不同,分為yarn client和yarn cluster。
實際上,除了上述這些通用的集群管理器外,Spark內部也提供了一些方便用戶測試和學習的簡單集群部署模式。由於在實際工廠環境下使用的絕大多數的集群管理器是Hadoop YARN,因此我們關注的重點是Hadoop YARN模式下的Spark集群部署。
Spark的運行模式取決於傳遞給SparkContext的MASTER環境變量的值,個別模式還需要輔助的程序接口來配合使用,目前支持的Master字符串及URL包括:
Spark運行模式配置
Master URL |
Meaning |
local |
在本地運行,只有一個工作進程,無並行計算能力。 |
local[K] |
在本地運行,有K個工作進程,通常設置K為機器的CPU核心數量。 |
local[*] |
在本地運行,工作進程數量等於機器的CPU核心數量。 |
spark://HOST:PORT |
以Standalone模式運行,這是Spark自身提供的集群運行模式,默認端口號: 7077。 |
mesos://HOST:PORT |
在Mesos集群上運行,Driver進程和Worker進程運行在Mesos集群上,部署模式必須使用固定值:--deploy-mode cluster |
yarn-client |
在Yarn集群上運行,Driver進程在本地,Executor進程在Yarn集群上,部署模式必須使用固定值:--deploy-mode client。Yarn集群地址必須在HADOOP_CONF_DIR or YARN_CONF_DIR變量里定義。 |
yarn-cluster |
在Yarn集群上運行,Driver進程在Yarn集群上,Work進程也在Yarn集群上,部署模式必須使用固定值:--deploy-mode cluster。Yarn集群地址必須在HADOOP_CONF_DIR or YARN_CONF_DIR變量里定義。 |
用戶在提交任務給Spark處理時,以下兩個參數共同決定了Spark的運行方式。
· –master MASTER_URL :決定了Spark任務提交給哪種集群處理。
· –deploy-mode DEPLOY_MODE:決定了Driver的運行方式,可選值為Client或者Cluster。
2、Spark提交作業參數(重點)
參考答案:
https://blog.csdn.net/gamer_gyt/article/details/79135118
1)在提交任務時的幾個重要參數
executor-cores —— 每個executor使用的內核數,默認為1,官方建議2-5個,企業是4個
num-executors —— 啟動executors的數量,默認為2
executor-memory —— executor內存大小,默認1G
driver-cores —— driver使用內核數,默認為1
driver-memory —— driver內存大小,默認512M
2)給出一個提交任務的樣式
spark-submit \
--master local[5] \
--driver-cores 2 \
--driver-memory 8g \
--executor-cores 4 \
--num-executors 7 \
--executor-memory 8g \
--class PackageName.ClassName XXXX.jar \
--name "Spark Job Name" \
InputPath \
OutputPath
官網參數配置:http://spark.apache.org/docs/latest/configuration.html
3、簡述Spark on yarn的作業提交流程(重點)
YARN Client模式
在YARN Client模式下,Driver在任務提交的本地機器上運行,Driver啟動后會和ResourceManager通訊申請啟動ApplicationMaster,隨后ResourceManager分配container,在合適的NodeManager上啟動ApplicationMaster,此時的ApplicationMaster的功能相當於一個ExecutorLaucher,只負責向ResourceManager申請Executor內存。
ResourceManager接到ApplicationMaster的資源申請后會分配container,然后ApplicationMaster在資源分配指定的NodeManager上啟動Executor進程,Executor進程啟動后會向Driver反向注冊,Executor全部注冊完成后Driver開始執行main函數,之后執行到Action算子時,觸發一個job,並根據寬依賴開始划分stage,每個stage生成對應的taskSet,之后將task分發到各個Executor上執行。
YARN Cluster模式
YARN Cluster模式
在YARN Cluster模式下,任務提交后會和ResourceManager通訊申請啟動ApplicationMaster,隨后ResourceManager分配container,在合適的NodeManager上啟動ApplicationMaster,此時的ApplicationMaster就是Driver。
Driver啟動后向ResourceManager申請Executor內存,ResourceManager接到ApplicationMaster的資源申請后會分配container,然后在合適的NodeManager上啟動Executor進程,Executor進程啟動后會向Driver反向注冊,Executor全部注冊完成后Driver開始執行main函數,之后執行到Action算子時,觸發一個job,並根據寬依賴開始划分stage,每個stage生成對應的taskSet,之后將task分發到各個Executor上執行。
4、請列舉Spark的transformation算子(不少於5個)(重點)
1)map
2)flatMap
3)filter
4)groupByKey
5)reduceByKey
6)sortByKey
5、請列舉Spark的action算子(不少於5個)(重點)
1)reduce:
2)collect:
3)first:
4)take:
5)aggregate:
6)countByKey:
7)foreach:
8)saveAsTextFile:
6、簡述Spark的兩種核心Shuffle(重點)
spark的Shuffle有Hash Shuffle和Sort Shuffle兩種。
在Spark 1.2以前,默認的shuffle計算引擎是HashShuffleManager。
HashShuffleManager有着一個非常嚴重的弊端,就是會產生大量的中間磁盤文件,進而由大量的磁盤IO操作影響了性能。因此在Spark 1.2以后的版本中,默認的ShuffleManager改成了SortShuffleManager。
SortShuffleManager相較於HashShuffleManager來說,有了一定的改進。主要就在於,每個Task在進行shuffle操作時,雖然也會產生較多的臨時磁盤文件,但是最后會將所有的臨時文件合並(merge)成一個磁盤文件,因此每個Task就只有一個磁盤文件。在下一個stage的shuffle read task拉取自己的數據時,只要根據索引讀取每個磁盤文件中的部分數據即可。
未經優化的HashShuffle:
上游的stage的task對相同的key執行hash算法,從而將相同的key都寫入到一個磁盤文件中,而每一個磁盤文件都只屬於下游stage的一個task。在將數據寫入磁盤之前,會先將數據寫入到內存緩沖,當內存緩沖填滿之后,才會溢寫到磁盤文件中。但是這種策略的不足在於,下游有幾個task,上游的每一個task都就都需要創建幾個臨時文件,每個文件中只存儲key取hash之后相同的數據,導致了當下游的task任務過多的時候,上游會堆積大量的小文件
優化后的HashShuffle:
在shuffle write過程中,上游stage的task就不是為下游stage的每個task創建一個磁盤文件了。此時會出現shuffleFileGroup的概念,每個shuffleFileGroup會對應一批磁盤文件,磁盤文件的數量與下游stage的task數量是相同的。一個Executor上有多少個CPU core,就可以並行執行多少個task。而第一批並行執行的每個task都會創建一個shuffleFileGroup,並將數據寫入對應的磁盤文件內。當Executor的CPU core執行完一批task,接着執行下一批task時,下一批task就會復用之前已有的shuffleFileGroup,包括其中的磁盤文件。也就是說,此時task會將數據寫入已有的磁盤文件中,而不會寫入新的磁盤文件中。因此,consolidate機制允許不同的task復用同一批磁盤文件,這樣就可以有效將多個task的磁盤文件進行一定程度上的合並,從而大幅度減少磁盤文件的數量,進而提升shuffle write的性能。
注意:如果想使用優化之后的ShuffleManager,需要將:spark.shuffle.consolidateFiles調整為true。(當然,默認是開啟的)
未經優化: 上游的task數量:m , 下游的task數量:n , 上游的executor數量:k (m>=k) , 總共的磁盤文件:m*n
優化之后的: 上游的task數量:m , 下游的task數量:n , 上游的executor數量:k (m>=k) , 總共的磁盤文件: k*n
普通的SortShuffle:
在普通模式下,數據會先寫入一個內存數據結構中,此時根據不同的shuffle算子,可以選用不同的數據結構。如果是由聚合操作的shuffle算子,就是用map的數據結構(邊聚合邊寫入內存),如果是join的算子,就使用array的數據結構(直接寫入內存)。接着,每寫一條數據進入內存數據結構之后,就會判斷是否達到了某個臨界值,如果達到了臨界值的話,就會嘗試的將內存數據結構中的數據溢寫到磁盤,然后清空內存數據結構。
在溢寫到磁盤文件之前,會先根據key對內存數據結構中已有的數據進行排序,排序之后,會分批將數據寫入磁盤文件。默認的batch數量是10000條,也就是說,排序好的數據,會以每批次1萬條數據的形式分批寫入磁盤文件,寫入磁盤文件是通過Java的BufferedOutputStream實現的。BufferedOutputStream是Java的緩沖輸出流,首先會將數據緩沖在內存中,當內存緩沖滿溢之后再一次寫入磁盤文件中,這樣可以減少磁盤IO次數,提升性能。
此時task將所有數據寫入內存數據結構的過程中,會發生多次磁盤溢寫,會產生多個臨時文件,最后會將之前所有的臨時文件都進行合並,最后會合並成為一個大文件。最終只剩下兩個文件,一個是合並之后的數據文件,一個是索引文件(標識了下游各個task的數據在文件中的start offset與end offset)。最終再由下游的task根據索引文件讀取相應的數據文件。
SortShuffle - bypass運行機制 :
此時上游stage的task會為每個下游stage的task都創建一個臨時磁盤文件,並將數據按key進行hash然后根據key的hash值,將key寫入對應的磁盤文件之中。當然,寫入磁盤文件時也是先寫入內存緩沖,緩沖寫滿之后再溢寫到磁盤文件的。最后,同樣會將所有臨時磁盤文件都合並成一個磁盤文件,並創建一個單獨的索引文件。
自己的理解:bypass的就是不排序,還是用hash去為key分磁盤文件,分完之后再合並,形成一個索引文件和一個合並后的key hash文件。省掉了排序的性能。
bypass機制與普通SortShuffleManager運行機制的不同在於:
a、磁盤寫機制不同;
b、不會進行排序。
也就是說,啟用該機制的最大好處在於,shuffle write過程中,不需要進行數據的排序操作,也就節省掉了這部分的性能開銷。
觸發bypass機制的條件:
shuffle map task的數量小於 spark.shuffle.sort.bypassMergeThreshold 參數的值(默認200)並且不是聚合類的shuffle算子(比如groupByKey)
7、簡述SparkSQL中RDD、DataFrame、DataSet三者的區別與聯系?(重點)
RDD:彈性分布式數據集;不可變、可分區、元素可以並行計算的集合。
優點:
RDD編譯時類型安全:編譯時能檢查出類型錯誤;
面向對象的編程風格:直接通過類名點的方式操作數據。
缺點:
序列化和反序列化的性能開銷很大,大量的網絡傳輸;
構建對象占用了大量的heap堆內存,導致頻繁的GC(程序進行GC時,所有任務都是暫停)
DataFrame
DataFrame以RDD為基礎的分布式數據集。
優點:
DataFrame帶有元數據schema,每一列都帶有名稱和類型。
DataFrame引入了off-heap,構建對象直接使用操作系統的內存,不會導致頻繁GC。
DataFrame可以從很多數據源構建;
DataFrame把內部元素看成Row對象,表示一行行的數據。
DataFrame=RDD+schema
缺點:
編譯時類型不安全;
不具有面向對象編程的風格。
Dataset
DataSet包含了DataFrame的功能,Spark2.0中兩者統一,DataFrame表示為DataSet[Row],即DataSet的子集。
(1)DataSet可以在編譯時檢查類型;
(2)並且是面向對象的編程接口。
(DataSet 結合了 RDD 和 DataFrame 的優點,並帶來的一個新的概念 Encoder。當序列化數據時,Encoder 產生字節碼與 off-heap 進行交互,能夠達到按需訪問數據的效果,而不用反序列化整個對象。)。
三者之間的轉換:
8、Repartition和Coalesce關系與區別(重點)
1)關系:
兩者都是用來改變RDD的partition數量的,repartition底層調用的就是coalesce方法:coalesce(numPartitions, shuffle = true)
2)區別:
repartition一定會發生shuffle,coalesce根據傳入的參數來判斷是否發生shuffle
一般情況下增大rdd的partition數量使用repartition,減少partition數量時使用coalesce
9、Spark中cache默認緩存級別是什么?(重點)
DataFrame的cache默認采用 MEMORY_AND_DISK 這和RDD 的默認方式不一樣RDD cache 默認采用MEMORY_ONLY
10、SparkSQL中left semi join操作與left join操作的區別?(重點)
leftJoin類似於SQL中的左外關聯left outer join,返回結果以第一個RDD為主,關聯不上的記錄為空。
部分場景下可以使用left semi join替代left join:
因為 left semi join 是 in(keySet) 的關系,遇到右表重復記錄,左表會跳過,性能更高,而 left join 則會一直遍歷。但是left semi join 中最后 select 的結果中只許出現左表中的列名,因為右表只有 join key 參與關聯計算了。
11、Spark常用算子reduceByKey與groupByKey的區別,哪一種更具優勢?(重點)
reduceByKey:按照key進行聚合,在shuffle之前有combine(預聚合)操作,返回結果是RDD[k,v]。
groupByKey:按照key進行分組,直接進行shuffle。
開發指導:reduceByKey比groupByKey,建議使用。但是需要注意是否會影響業務邏輯。
12、Spark Shuffle默認並行度是多少?(重點)
參數spark.sql.shuffle.partitions 決定 默認並行度200
13、簡述Spark中共享變量(廣播變量和累加器)。(重點)
Spark為此提供了兩種共享變量,一種是Broadcast Variable(廣播變量),另一種是Accumulator(累加變量)。Broadcast Variable會將使用到的變量,僅僅為每個節點拷貝一份,更大的用處是優化性能,減少網絡傳輸以及內存消耗。Accumulator則可以讓多個task共同操作一份變量,主要可以進行累加操作。
Spark提供的Broadcast Variable,是只讀的。並且在每個節點上只會有一份副本,而不會為每個task都拷貝一份副本。因此其最大作用,就是減少變量到各個節點的網絡傳輸消耗,以及在各個節點上的內存消耗。此外,spark自己內部也使用了高效的廣播算法來減少網絡消耗。 可以通過調用SparkContext的broadcast()方法,來針對某個變量創建廣播變量。然后在算子的函數內,使用到廣播變量時,每個節點只會拷貝一份副本了。每個節點可以使用廣播變量的value()方法獲取值。記住,廣播變量,是只讀的。
14、SparkStreaming有哪幾種方式消費Kafka中的數據,它們之間的區別是什么? (重點)
一、基於Receiver的方式
這種方式使用Receiver來獲取數據。Receiver是使用Kafka的高層次Consumer API來實現的。receiver從Kafka中獲取的數據都是存儲在Spark Executor的內存中的(如果突然數據暴增,大量batch堆積,很容易出現內存溢出的問題),然后Spark Streaming啟動的job會去處理那些數據。
然而,在默認的配置下,這種方式可能會因為底層的失敗而丟失數據。如果要啟用高可靠機制,讓數據零丟失,就必須啟用Spark Streaming的預寫日志機制(Write Ahead Log,WAL)。該機制會同步地將接收到的Kafka數據寫入分布式文件系統(比如HDFS)上的預寫日志中。所以,即使底層節點出現了失敗,也可以使用預寫日志中的數據進行恢復。
二、基於Direct的方式
這種新的不基於Receiver的直接方式,是在Spark 1.3中引入的,從而能夠確保更加健壯的機制。替代掉使用Receiver來接收數據后,這種方式會周期性地查詢Kafka,來獲得每個topic+partition的最新的offset,從而定義每個batch的offset的范圍。當處理數據的job啟動時,就會使用Kafka的簡單consumer api來獲取Kafka指定offset范圍的數據。
優點如下:
簡化並行讀取:如果要讀取多個partition,不需要創建多個輸入DStream然后對它們進行union操作。Spark會創建跟Kafka partition一樣多的RDD partition,並且會並行從Kafka中讀取數據。所以在Kafka partition和RDD partition之間,有一個一對一的映射關系。
高性能:如果要保證零數據丟失,在基於receiver的方式中,需要開啟WAL機制。這種方式其實效率低下,因為數據實際上被復制了兩份,Kafka自己本身就有高可靠的機制,會對數據復制一份,而這里又會復制一份到WAL中。而基於direct的方式,不依賴Receiver,不需要開啟WAL機制,只要Kafka中作了數據的復制,那么就可以通過Kafka的副本進行恢復。
一次且僅一次的事務機制。
三、對比:
基於receiver的方式,是使用Kafka的高階API來在ZooKeeper中保存消費過的offset的。這是消費Kafka數據的傳統方式。這種方式配合着WAL機制可以保證數據零丟失的高可靠性,但是卻無法保證數據被處理一次且僅一次,可能會處理兩次。因為Spark和ZooKeeper之間可能是不同步的。
基於direct的方式,使用kafka的簡單api,Spark Streaming自己就負責追蹤消費的offset,並保存在checkpoint中。Spark自己一定是同步的,因此可以保證數據是消費一次且僅消費一次。
在實際生產環境中大都用Direct方式
15、請簡述一下SparkStreaming的窗口函數中窗口寬度和滑動距離的關系?(重點)
16、Spark通用運行流程概述?(重點)
不論Spark以何種模式進行部署,任務提交后,都會先啟動Driver進程,隨后Driver進程向集群管理器注冊應用程序,之后集群管理器根據此任務的配置文件分配Executor並啟動,當Driver所需的資源全部滿足后,Driver開始執行main函數,Spark查詢為懶執行,當執行到action算子時開始反向推算,根據寬依賴進行stage的划分,隨后每一個stage對應一個taskset,taskset中有多個task,根據本地化原則,task會被分發到指定的Executor去執行,在任務執行的過程中,Executor也會不斷與Driver進行通信,報告任務運行情況。
17、Standalone模式運行機制
Standalone集群有四個 重要組成部分,分別是:
1) Driver:是一個進程,我們編寫的Spark應用程序就運行在Driver上,由Driver進程執行;
2) Master(RM):是一個進程,主要負責資源的調度和分配,並進行集群的監控等職責;
3) Worker(NM):是一個進程,一個Worker運行在集群中的一台服務器上,主要負責兩個職責,一個是用自己的內存存儲RDD的某個或某些partition;另一個是啟動其他進程和線程(Executor),對RDD上的partition進行並行的處理和計算。
4) Executor:是一個進程,一個Worker上可以運行多個Executor,Executor通過啟動多個線程(task)來執行對RDD的partition進行並行計算,也就是執行我們對RDD定義的例如map、flatMap、reduce等算子操作。
Standalone Client模式
在Standalone Client模式下,Driver在任務提交的本地機器上運行,Driver啟動后向Master注冊應用程序,Master根據submit腳本的資源需求找到內部資源至少可以啟動一個Executor的所有Worker,然后在這些Worker之間分配Executor,Worker上的Executor啟動后會向Driver反向注冊,所有的Executor注冊完成后,Driver開始執行main函數,之后執行到Action算子時,開始划分stage,每個stage生成對應的taskSet,之后將task分發到各個Executor上執行。
Standalone Cluster模式
在Standalone Cluster模式下,任務提交后,Master會找到一個Worker啟動Driver進程, Driver啟動后向Master注冊應用程序,Master根據submit腳本的資源需求找到內部資源至少可以啟動一個Executor的所有Worker,然后在這些Worker之間分配Executor,Worker上的Executor啟動后會向Driver反向注冊,所有的Executor注冊完成后,Driver開始執行main函數,之后執行到Action算子時,開始划分stage,每個stage生成對應的taskSet,之后將task分發到各個Executor上執行。
注意,Standalone的兩種模式下(client/Cluster),Master在接到Driver注冊Spark應用程序的請求后,會獲取其所管理的剩余資源能夠啟動一個Executor的所有Worker,然后在這些Worker之間分發Executor,此時的分發只考慮Worker上的資源是否足夠使用,直到當前應用程序所需的所有Executor都分配完畢,Executor反向注冊完畢后,Driver開始執行main程序。
18、簡述一下Spark 內存管理?(了解)
在執行Spark 的應用程序時,Spark 集群會啟動 Driver 和 Executor 兩種 JVM 進程,前者為主控進程,負責創建 Spark 上下文,提交 Spark 作業(Job),並將作業轉化為計算任務(Task),在各個 Executor 進程間協調任務的調度,后者負責在工作節點上執行具體的計算任務,並將結果返回給 Driver,同時為需要持久化的 RDD 提供存儲功能。由於 Driver 的內存管理相對來說較為簡單,本節主要對 Executor 的內存管理進行分析,下文中的 Spark 內存均特指 Executor 的內存。
堆內和堆外內存規划
作為一個 JVM 進程,Executor 的內存管理建立在 JVM 的內存管理之上,Spark 對 JVM 的堆內(On-heap)空間進行了更為詳細的分配,以充分利用內存。同時,Spark 引入了堆外(Off-heap)內存,使之可以直接在工作節點的系統內存中開辟空間,進一步優化了內存的使用。
堆內內存受到JVM統一管理,堆外內存是直接向操作系統進行內存的申請和釋放。
Executor堆內與堆外內存
- 堆內內存
堆內內存的大小,由 Spark 應用程序啟動時的 –executor-memory 或 spark.executor.memory 參數配置。Executor 內運行的並發任務共享 JVM 堆內內存,這些任務在緩存 RDD 數據和廣播(Broadcast)數據時占用的內存被規划為存儲(Storage)內存,而這些任務在執行 Shuffle 時占用的內存被規划為執行(Execution)內存,剩余的部分不做特殊規划,那些 Spark 內部的對象實例,或者用戶定義的 Spark 應用程序中的對象實例,均占用剩余的空間。不同的管理模式下,這三部分占用的空間大小各不相同。
Spark 對堆內內存的管理是一種邏輯上的”規划式”的管理,因為對象實例占用內存的申請和釋放都由 JVM 完成,Spark 只能在申請后和釋放前記錄這些內存,我們來看其具體流程:
申請內存流程如下:
- Spark 在代碼中 new 一個對象實例;
- JVM 從堆內內存分配空間,創建對象並返回對象引用;
- Spark 保存該對象的引用,記錄該對象占用的內存。
釋放內存流程如下:
1. Spark記錄該對象釋放的內存,刪除該對象的引用;
2. 等待JVM的垃圾回收機制釋放該對象占用的堆內內存。
我們知道,JVM 的對象可以以序列化的方式存儲,序列化的過程是將對象轉換為二進制字節流,本質上可以理解為將非連續空間的鏈式存儲轉化為連續空間或塊存儲,在訪問時則需要進行序列化的逆過程——反序列化,將字節流轉化為對象,序列化的方式可以節省存儲空間,但增加了存儲和讀取時候的計算開銷。
對於 Spark 中序列化的對象,由於是字節流的形式,其占用的內存大小可直接計算,而對於非序列化的對象,其占用的內存是通過周期性地采樣近似估算而得,即並不是每次新增的數據項都會計算一次占用的內存大小,這種方法降低了時間開銷但是有可能誤差較大,導致某一時刻的實際內存有可能遠遠超出預期。此外,在被 Spark 標記為釋放的對象實例,很有可能在實際上並沒有被 JVM 回收,導致實際可用的內存小於 Spark 記錄的可用內存。所以 Spark 並不能准確記錄實際可用的堆內內存,從而也就無法完全避免內存溢出(OOM, Out of Memory)的異常。
雖然不能精准控制堆內內存的申請和釋放,但 Spark 通過對存儲內存和執行內存各自獨立的規划管理,可以決定是否要在存儲內存里緩存新的 RDD,以及是否為新的任務分配執行內存,在一定程度上可以提升內存的利用率,減少異常的出現。
- 堆外內存
為了進一步優化內存的使用以及提高 Shuffle 時排序的效率,Spark 引入了堆外(Off-heap)內存,使之可以直接在工作節點的系統內存中開辟空間,存儲經過序列化的二進制數據。
堆外內存意味着把內存對象分配在Java虛擬機的堆以外的內存,這些內存直接受操作系統管理(而不是虛擬機)。這樣做的結果就是能保持一個較小的堆,以減少垃圾收集對應用的影響。
利用 JDK Unsafe API(從 Spark 2.0 開始,在管理堆外的存儲內存時不再基於 Tachyon,而是與堆外的執行內存一樣,基於 JDK Unsafe API 實現),Spark 可以直接操作系統堆外內存,減少了不必要的內存開銷,以及頻繁的 GC 掃描和回收,提升了處理性能。堆外內存可以被精確地申請和釋放(堆外內存之所以能夠被精確的申請和釋放,是由於內存的申請和釋放不再通過JVM機制,而是直接向操作系統申請,JVM對於內存的清理是無法准確指定時間點的,因此無法實現精確的釋放),而且序列化的數據占用的空間可以被精確計算,所以相比堆內內存來說降低了管理的難度,也降低了誤差。
在默認情況下堆外內存並不啟用,可通過配置 spark.memory.offHeap.enabled 參數啟用,並由 spark.memory.offHeap.size 參數設定堆外空間的大小。除了沒有 other 空間,堆外內存與堆內內存的划分方式相同,所有運行中的並發任務共享存儲內存和執行內存。
19、Transformation和action的區別
1、transformation是得到一個新的RDD,方式很多,比如從數據源生成一個新的RDD,從RDD生成一個新的RDD
2、action是得到一個值,或者一個結果(直接將RDDcache到內存中)
所有的transformation都是采用的懶策略,就是如果只是將transformation提交是不會執行計算的,計算只有在action被提交的時候才被觸發。
20、Map和 FlatMap區別 對結果集的影響有什么不同
map的作用很容易理解就是對rdd之中的元素進行逐一進行函數操作映射為另外一個rdd。
flatMap的操作是將函數應用於rdd之中的每一個元素,將返回的迭代器的所有內容構成新的rdd。通常用來切分單詞。
Spark 中 map函數會對每一條輸入進行指定的操作,然后為每一條輸入返回一個對象。 而flatMap函數則是兩個操作的集合——正是“先映射后扁平化”
21、Spark Application在沒有獲得足夠的資源,job就開始執行了,可能會導致什么什么問題發生?
會導致執行該job時候集群資源不足,導致執行job結束也沒有分配足夠的資源,分配了部分Executor,該job就開始執行task,應該是task的調度線程和Executor資源申請是異步的;如果想等待申請完所有的資源再執行job的:需要將spark.scheduler.maxRegisteredResourcesWaitingTime設置的很大;spark.scheduler.minRegisteredResourcesRatio 設置為1,但是應該結合實際考慮,否則很容易出現長時間分配不到資源,job一直不能運行的情況
22、driver的功能是什么?
2.1、一個Spark作業運行時包括一個Driver進程,也是作業的主進程,具有main函數,並且有SparkContext的實例,是程序的人口點;
2.2、功能:負責向集群申請資源,向master注冊信息,負責了作業的調度,,負責作業的解析、生成Stage並調度Task到Executor上。包括DAGScheduler,TaskScheduler
23、Spark中Work的主要工作是什么?
主要功能:管理當前節點內存,CPU的使用狀況,接收master分配過來的資源指令,通過ExecutorRunner啟動程序分配任務,
worker就類似於包工頭,管理分配新進程,做計算的服務,相當於process服務。
需要注意的是:
1)worker會不會匯報當前信息給master,worker心跳給master主要只有workid,它不會發送資源信息以心跳的方式給mater,master分配的時候就知道work,只有出現故障的時候才會發送資源。
2)worker不會運行代碼,具體運行的是Executor是可以運行具體appliaction寫的業務邏輯代碼,操作代碼的節點,它不會運行程序的代碼的。
24、Spark為什么比mapreduce快?
4.1、spark是基於內存進行數據處理的,MapReduce是基於磁盤進行數據處理的
MapReduce的設設計:中間結果保存在文件中,提高了可靠性,減少了內存占用。但是犧牲了性能。
Spark的設計:數據在內存中進行交換,要快一些,但是內存這個東西,可靠性不如磁盤。所以性能方面比MapReduce要好。
DAG計算模型在迭代計算上還是比MapReduce的效率更高
4.2、spark中具有DAG有向無環圖,DAG有向無環圖在此過程中減少了shuffle以及落地磁盤的次數
Spark 計算比 MapReduce 快的根本原因在於 DAG 計算模型。一般而言,DAG 相比MapReduce 在大多數情況下可以減少 shuffle 次數。Spark 的 DAGScheduler 相當於一個改進版的 MapReduce,如果計算不涉及與其他節點進行數據交換,Spark 可以在內存中一次性完成這些操作,也就是中間結果無須落盤,減少了磁盤 IO 的操作。但是,如果計算過程中涉及數據交換,Spark 也是會把 shuffle 的數據寫磁盤的!有一個誤區,Spark 是基於內存的計算,所以快,這不是主要原因,要對數據做計算,必然得加載到內存,Hadoop 也是如此,只不過 Spark 支持將需要反復用到的數據給 Cache 到內存中,減少數據加載耗時,所以 Spark 跑機器學習算法比較在行(需要對數據進行反復迭代)。Spark 基於磁盤的計算也是比 Hadoop 快。剛剛提到了 Spark 的 DAGScheduler 是個改進版的 MapReduce,所以 Spark天生適合做批處理的任務。Hadoop 的 MapReduce 雖然不如 spark 性能好,但是 HDFS 仍然是業界的大數據存儲標准。
4.3、spark是粗粒度資源申請,也就是當提交spark application的時候,application會將所有的資源申請完畢,如果申請不到資源就等待,如果申請到資源才執行application,task在執行的時候就不需要自己去申請資源,task執行快,當最后一個task執行完之后task才會被釋放。
優點是執行速度快,缺點是不能使集群得到充分的利用
MapReduce是細粒度資源申請,當提交application的時候,task執行時,自己申請資源,自己釋放資源,task執行完畢之后,資源立即會被釋放,task執行的慢,application執行的相對比較慢。
優點是集群資源得到充分利用,缺點是application執行的相對比較慢。
Spark是基於內存的,而MapReduce是基於磁盤的迭代
有向無環圖是指:一個圖從頂點出發,無法再回到原點,那么這種圖叫做有向無環圖。
DAG計算模型在spark任務調度
Spark是粗粒度資源調度,MapReduce是細粒度資源調度
25、Mapreduce和Spark的都是並行計算,那么他們有什么相同和區別?
5.1、hadoop的一個作業稱為job,job里面分為map task和reduce task,每個task都是在自己的進程中運行的,當task結束時,進程也會結束。
5.2、spark用戶提交的任務成為application,一個application對應一個sparkcontext,app中存在多個job,每觸發一次action操作就會產生一個job。這些job可以並行或串行執行,每個job中有多個stage,stage是shuffle過程中DAGSchaduler通過RDD之間的依賴關系划分job而來的,每個stage里面有多個task,組成taskset有TaskSchaduler分發到各個executor中執行,executor的生命周期是和app一樣的,即使沒有job運行也是存在的,所以task可以快速啟動讀取內存進行計算。
5.3、hadoop的job只有map和reduce操作,表達能力比較欠缺而且在mr過程中會重復的讀寫hdfs,造成大量的io操作,多個job需要自己管理關系。
spark的迭代計算都是在內存中進行的,API中提供了大量的RDD操作如join,groupby等,而且通過DAG圖可以實現良好的容錯。
26、Spark應用程序的執行過程是什么?
6.1、構建Spark Application的運行環境(啟動SparkContext),SparkContext向資源管理器(可以是Standalone、Mesos或YARN)注冊並申請運行Executor資源;
6.2、資源管理器分配Executor資源並啟動StandaloneExecutorBackend,Executor運行情況將隨着心跳發送到資源管理器上;
6.3、SparkContext構建成DAG圖,將DAG圖分解成Stage,並把Taskset發送給Task Scheduler。Executor向SparkContext申請Task,Task Scheduler將Task發放給Executor運行同時SparkContext將應用程序代碼發放給Executor。
6.4、Task在Executor上運行,運行完畢釋放所有資源。
27、spark on yarn Cluster 模式下,ApplicationMaster和driver是在同一個進程么?
是,driver 位於ApplicationMaster進程中。該進程負責申請資源,還負責監控程序、資源的動態情況。
28、Spark on Yarn 模式有哪些優點?
8.1、與其他計算框架共享集群資源(eg.Spark框架與MapReduce框架同時運行,如果不用Yarn進行資源分配,MapReduce分到的內存資源會很少,效率低下);資源按需分配,進而提高集群資源利用等。
8.2、相較於Spark自帶的Standalone模式,Yarn的資源分配更加細致
8.3、Application部署簡化,例如Spark,Storm等多種框架的應用由客戶端提交后,由Yarn負責資源的管理和調度,利用Container作為資源隔離的單位,以它為單位去使用內存,cpu等。
8.4、Yarn通過隊列的方式,管理同時運行在Yarn集群中的多個服務,可根據不同類型的應用程序負載情況,調整對應的資源使用量,實現資源彈性管理。
29、spark中的RDD是什么,有哪些特性?
RDD(Resilient Distributed Dataset)叫做分布式數據集,是spark中最基本的數據抽象,它代表一個不可變,可分區,里面的元素可以並行計算的集合
五大特性:
9.1、A list of partitions:一個分區列表,RDD中的數據都存儲在一個分區列表中
9.2、A function for computing each split:作用在每一個分區中的函數
9.3、A list of dependencies on other RDDs:一個RDD依賴於其他多個RDD,這個點很重要,RDD的容錯機制就是依據這個特性而來的
9.4、Optionally,a Partitioner for key-value RDDs(eg:to say that the RDD is hash-partitioned):可選的,針對於kv類型的RDD才有這個特性,作用是決定了數據的來源以及數據處理后的去向
9.5、可選項,數據本地性,數據位置最優
30、spark如何防止內存溢出?
driver端的內存溢出 :
可以增大driver的內存參數:spark.driver.memory (default 1g)
這個參數用來設置Driver的內存。在Spark程序中,SparkContext,DAGScheduler都是運行在Driver端的。對應rdd的Stage切分也是在Driver端運行,如果用戶自己寫的程序有過多的步驟,切分出過多的Stage,這部分信息消耗的是Driver的內存,這個時候就需要調大Driver的內存。
map過程產生大量對象導致內存溢出:
這種溢出的原因是在單個map中產生了大量的對象導致的,例如:rdd.map(x=>for(i <- 1 to 10000) yield i.toString),這個操作在rdd中,每個對象都產生了10000個對象,這肯定很容易產生內存溢出的問題。針對這種問題,在不增加內存的情況下,可以通過減少每個Task的大小,以便達到每個Task即使產生大量的對象Executor的內存也能夠裝得下。具體做法可以在會產生大量對象的map操作之前調用repartition方法,分區成更小的塊傳入map。例如:rdd.repartition(10000).map(x=>for(i <- 1 to 10000) yield i.toString)。
面對這種問題注意,不能使用rdd.coalesce方法,這個方法只能減少分區,不能增加分區, 不會有shuffle的過程。
數據不平衡導致內存溢出:
數據不平衡除了有可能導致內存溢出外,也有可能導致性能的問題,解決方法和上面說的類似,就是調用repartition重新分區。
shuffle后內存溢出:
shuffle內存溢出的情況可以說都是shuffle后,單個文件過大導致的。在Spark中,join,reduceByKey這一類型的過程,都會有shuffle的過程,在shuffle的使用,需要傳入一個partitioner,大部分Spark中的shuffle操作,默認的partitioner都是HashPatitioner,默認值是父RDD中最大的分區數,這個參數通過spark.default.parallelism控制(在spark-sql中用spark.sql.shuffle.partitions) , spark.default.parallelism參數只對HashPartitioner有效,所以如果是別的Partitioner或者自己實現的Partitioner就不能使用spark.default.parallelism這個參數來控制shuffle的並發量了。如果是別的partitioner導致的shuffle內存溢出,就需要從partitioner的代碼增加partitions的數量。
standalone模式下資源分配不均勻導致內存溢出:
在standalone的模式下如果配置了–total-executor-cores 和 –executor-memory 這兩個參數,但是沒有配置–executor-cores這個參數的話,就有可能導致,每個Executor的memory是一樣的,但是cores的數量不同,那么在cores數量多的Executor中,由於能夠同時執行多個Task,就容易導致內存溢出的情況。這種情況的解決方法就是同時配置–executor-cores或者spark.executor.cores參數,確保Executor資源分配均勻。
使用rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)代替rdd.cache():
rdd.cache()和rdd.persist(Storage.MEMORY_ONLY)是等價的,在內存不足的時候rdd.cache()的數據會丟失,再次使用的時候會重算,而rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)在內存不足的時候會存儲在磁盤,避免重算,只是消耗點IO時間
31、spark中cache和persist的區別?
cache:緩存數據,默認是緩存在內存中,其本質還是調用persist
persist:緩存數據,有豐富的數據緩存策略。數據可以保存在內存也可以保存在磁盤中,使用的時候指定對應的緩存級別就可以了
32、Spark手寫WordCount程序
val conf=new SparkConf().setAppName(“wc”).setMaster(“Local[*]”)
val sc=new SparkContext(conf)
val result=sc.textFile(“輸入文件的路徑”) Val rdd2=result.flatmap(x=>x.split(“ ”)) .map((_,1)).reduceBykey((_+_)).saveAsTextFile(“輸出文件路徑”)
sc.stop()
|
33、Spark中創建RDD的方式總結3種
1、從集合中創建RDD;2、從外部存儲創建RDD;3、從其他RDD創建。
4.1、從集合中創建RDD,Spark主要提供了兩種函數:parallelize和makeRDD
val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8))
val rdd = sc.makeRDD(Array(1,2,3,4,5,6,7,8))
4.2、由外部存儲系統的數據集創建RDD
val rdd= sc.textFile("hdfs://node01:8020/data/test")
4.3、從其他RDD創建
val rdd1 = sc.parallelize(Array(1,2,3,4))
val rdd2 =rdd.map(x=>x.map(_*2))
34、常用算子
transformation算子Value類型
34.1 map(func)案例
1. 作用:返回一個新的RDD,該RDD由每一個輸入元素經過func函數轉換后組成
2. 需求:創建一個1-10數組的RDD,將所有元素*2形成新的RDD
(1)創建
scala> var source = sc.parallelize(1 to 10)
source: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24
(2)打印
scala> source.collect()
res7: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
(3)將所有元素*2
scala> val mapadd = source.map(_ * 2)
mapadd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at map at <console>:26
(4)打印最終結果
scala> mapadd.collect()
res8: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
34.2 mapPartitions(func) 案例
1. 作用:類似於map,但獨立地在RDD的每一個分片上運行,因此在類型為T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U]。假設有N個元素,有M個分區,那么map的函數的將被調用N次,而mapPartitions被調用M次,一個函數一次處理所有分區。
2. 需求:創建一個RDD,使每個元素*2組成新的RDD
(1)創建一個RDD
scala> val rdd = sc.parallelize(Array(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24
(2)使每個元素*2組成新的RDD
scala> rdd.mapPartitions(x=>x.map(_*2))
res3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at mapPartitions at <console>:27
(3)打印新的RDD
scala> res3.collect
res4: Array[Int] = Array(2, 4, 6, 8)
34.3 mapPartitionsWithIndex(func) 案例
1. 作用:類似於mapPartitions,但func帶有一個整數參數表示分片的索引值,因此在類型為T的RDD上運行時,func的函數類型必須是(Int, Interator[T]) => Iterator[U];
2. 需求:創建一個RDD,使每個元素跟所在分區形成一個元組組成一個新的RDD
(1)創建一個RDD
scala> val rdd = sc.parallelize(Array(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24
(2)使每個元素跟所在分區形成一個元組組成一個新的RDD
scala> val indexRdd = rdd.mapPartitionsWithIndex((index,items)=>(items.map((index,_))))
indexRdd: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[5] at mapPartitionsWithIndex at <console>:26
(3)打印新的RDD
scala> indexRdd.collect
res2: Array[(Int, Int)] = Array((0,1), (0,2), (1,3), (1,4))
34.4 map()和mapPartition()的區別
1. map():每次處理一條數據。
2. mapPartition():每次處理一個分區的數據,這個分區的數據處理完后,原RDD中分區的數據才能釋放,可能導致OOM。
3. 開發指導:當內存空間較大的時候建議使用mapPartition(),以提高處理效率。
34.5 flatMap(func) 案例
1. 作用:類似於map,但是每一個輸入元素可以被映射為0或多個輸出元素(所以func應該返回一個序列,而不是單一元素)
2. 需求:創建一個元素為1-5的RDD,運用flatMap創建一個新的RDD,新的RDD為原RDD的每個元素的擴展(1->1,2->1,2……5->1,2,3,4,5)
(1)創建
scala> val sourceFlat = sc.parallelize(1 to 5)
sourceFlat: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:24
(2)打印
scala> sourceFlat.collect()
res11: Array[Int] = Array(1, 2, 3, 4, 5)
(3)根據原RDD創建新RDD(1->1,2->1,2……5->1,2,3,4,5)
scala> val flatMap = sourceFlat.flatMap(1 to _)
flatMap: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at flatMap at <console>:26
(4)打印新RDD
scala> flatMap.collect()
res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5)
34.6 sortBy(func,[ascending], [numTasks]) 案例
1. 作用;使用func先對數據進行處理,按照處理后的數據比較結果排序,默認為正序。
2. 需求:創建一個RDD,按照不同的規則進行排序
(1)創建一個RDD
scala> val rdd = sc.parallelize(List(2,1,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:24
(2)按照自身大小排序
scala> rdd.sortBy(x => x).collect()
res11: Array[Int] = Array(1, 2, 3, 4)
(3)按照與3余數的大小排序
scala> rdd.sortBy(x => x%3).collect()
res12: Array[Int] = Array(3, 4, 1, 2)
34.7 groupBy(func)案例
1. 作用:分組,按照傳入函數的返回值進行分組。將相同的key對應的值放入一個迭代器。
2. 需求:創建一個RDD,按照元素模以2的值進行分組。
(1)創建
scala> val rdd = sc.parallelize(1 to 4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24
(2)按照元素模以2的值進行分組
scala> val group = rdd.groupBy(_%2)
group: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[2] at groupBy at <console>:26
(3)打印結果
scala> group.collect
res0: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4)), (1,CompactBuffer(1, 3)))
34.8 filter(func) 案例
1. 作用:過濾。返回一個新的RDD,該RDD由經過func函數計算后返回值為true的輸入元素組成。
2. 需求:創建一個RDD(由字符串組成),過濾出一個新RDD(包含”xiao”子串)
(1)創建
scala> var sourceFilter = sc.parallelize(Array("xiaoming","xiaojiang","xiaohe","dazhi"))
sourceFilter: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:24
(2)打印
scala> sourceFilter.collect()
res9: Array[String] = Array(xiaoming, xiaojiang, xiaohe, dazhi)
(3)過濾出含” xiao”子串的形成一個新的RDD
scala> val filter = sourceFilter.filter(_.contains("xiao"))
filter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at filter at <console>:26
(4)打印新RDD
scala> filter.collect()
res10: Array[String] = Array(xiaoming, xiaojiang, xiaohe)
34.9 sample(withReplacement, fraction, seed) 案例
1. 作用:以指定的隨機種子隨機抽樣出數量為fraction的數據,withReplacement表示是抽出的數據是否放回,true為有放回的抽樣,false為無放回的抽樣,seed用於指定隨機數生成器種子。
2. 需求:創建一個RDD(1-10),從中選擇放回和不放回抽樣
(1)創建RDD
scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:24
(2)打印
scala> rdd.collect()
res15: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
(3)放回抽樣
scala> var sample1 = rdd.sample(true,0.4,2)
sample1: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[21] at sample at <console>:26
(4)打印放回抽樣結果
scala> sample1.collect()
res16: Array[Int] = Array(1, 2, 2, 7, 7, 8, 9)
(5)不放回抽樣
scala> var sample2 = rdd.sample(false,0.2,3)
sample2: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[22] at sample at <console>:26
(6)打印不放回抽樣結果
scala> sample2.collect()
res17: Array[Int] = Array(1, 9)
34.10 distinct([numTasks])) 案例
1. 作用:對源RDD進行去重后返回一個新的RDD。
2. 需求:創建一個RDD,使用distinct()對其去重。
(1)創建一個RDD
scala> val distinctRdd = sc.parallelize(List(1,2,1,5,2,9,6,1))
distinctRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at parallelize at <console>:24
(2)對RDD進行去重
scala> val unionRDD = distinctRdd.distinct()
unionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[37] at distinct at <console>:26
(3)打印去重后生成的新RDD
scala> unionRDD.collect()
res20: Array[Int] = Array(1, 9, 5, 6, 2)
(4)對RDD
scala> val unionRDD = distinctRdd.distinct(2)
unionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[40] at distinct at <console>:26
(5)打印去重后生成的新RDD
scala> unionRDD.collect()
res21: Array[Int] = Array(6, 2, 1, 9, 5)
34.11 coalesce(numPartitions) 案例
1. 作用:縮減分區數,用於大數據集過濾后,提高小數據集的執行效率。
2. 需求:創建一個4個分區的RDD,對其縮減分區
(1)創建一個RDD
scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[54] at parallelize at <console>:24
(2)查看RDD的分區數
scala> rdd.partitions.size
res20: Int = 4
(3)對RDD重新分區
scala> val coalesceRDD = rdd.coalesce(3)
coalesceRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[55] at coalesce at <console>:26
(4)查看新RDD的分區數
scala> coalesceRDD.partitions.size
res21: Int = 3
34.12 repartition(numPartitions) 案例
1. 作用:根據分區數,重新通過網絡隨機洗牌所有數據。
2. 需求:創建一個4個分區的RDD,對其重新分區
(1)創建一個RDD
scala> val rdd = sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[56] at parallelize at <console>:24
(2)查看RDD的分區數
scala> rdd.partitions.size
res22: Int = 4
(3)對RDD重新分區
scala> val rerdd = rdd.repartition(2)
rerdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[60] at repartition at <console>:26
(4)查看新RDD的分區數
scala> rerdd.partitions.size
res23: Int = 2
coalesce和repartition的區別
1. coalesce重新分區,可以選擇是否進行shuffle過程。由參數shuffle: Boolean = false/true決定。
2. repartition實際上是調用的coalesce,進行shuffle。源碼如下:
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
transformation轉換算子Key-Value類型
34.13 partitionBy案例
1. 作用:對pairRDD進行分區操作,如果原有的partionRDD和現有的partionRDD是一致的話就不進行分區, 否則會生成ShuffleRDD,即會產生shuffle過程。
2. 需求:創建一個4個分區的RDD,對其重新分區
(1)創建一個RDD
scala> val rdd = sc.parallelize(Array((1,"aaa"),(2,"bbb"),(3,"ccc"),(4,"ddd")),4)
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[44] at parallelize at <console>:24
(2)查看RDD的分區數
scala> rdd.partitions.size
res24: Int = 4
(3)對RDD重新分區
scala> var rdd2 = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[45] at partitionBy at <console>:26
(4)查看新RDD的分區數
scala> rdd2.partitions.size
res25: Int = 2
34.14 reduceByKey(func, [numTasks]) 案例
1. 在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一起,reduce任務的個數可以通過第二個可選的參數來設置。
2. 需求:創建一個pairRDD,計算相同key對應值的相加結果
(1)創建一個pairRDD
scala> val rdd = sc.parallelize(List(("female",1),("male",5),("female",5),("male",2)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[46] at parallelize at <console>:24
(2)計算相同key對應值的相加結果
scala> val reduce = rdd.reduceByKey((x,y) => x+y)
reduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[47] at reduceByKey at <console>:26
(3)打印結果
scala> reduce.collect()
res29: Array[(String, Int)] = Array((female,6), (male,7))
34.15 groupByKey案例
1. 作用:groupByKey也是對每個key進行操作,但只生成一個seq。
2. 需求:創建一個pairRDD,將相同key對應值聚合到一個seq中,並計算相同key對應值的相加結果。
(1)創建一個pairRDD
scala> val words = Array("one", "two", "two", "three", "three", "three")
words: Array[String] = Array(one, two, two, three, three, three)
scala> val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))
wordPairsRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:26
(2)將相同key對應值聚合到一個Seq中
scala> val group = wordPairsRDD.groupByKey()
group: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[5] at groupByKey at <console>:28
(3)打印結果
scala> group.collect()
res1: Array[(String, Iterable[Int])] = Array((two,CompactBuffer(1, 1)), (one,CompactBuffer(1)), (three,CompactBuffer(1, 1, 1)))
(4)計算相同key對應值的相加結果
scala> group.map(t => (t._1, t._2.sum))
res2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at map at <console>:31
(5)打印結果
scala> res2.collect()
res3: Array[(String, Int)] = Array((two,2), (one,1), (three,3))
34.16 reduceByKey和groupByKey的區別
1. reduceByKey:按照key進行聚合,在shuffle之前有combine(預聚合)操作,返回結果是RDD[k,v]。
2. groupByKey:按照key進行分組,直接進行shuffle。
3. 開發指導:reduceByKey比groupByKey,建議使用。但是需要注意是否會影響業務邏輯。
34.19 sortByKey([ascending], [numTasks]) 案例
1. 作用:在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD
2. 需求:創建一個pairRDD,按照key的正序和倒序進行排序
(1)創建一個pairRDD
scala> val rdd = sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[14] at parallelize at <console>:24
(2)按照key的正序
scala> rdd.sortByKey(true).collect()
res9: Array[(Int, String)] = Array((1,dd), (2,bb), (3,aa), (6,cc))
(3)按照key的倒序
scala> rdd.sortByKey(false).collect()
res10: Array[(Int, String)] = Array((6,cc), (3,aa), (2,bb), (1,dd))
34.17 join(otherDataset, [numTasks]) 案例
1. 作用:在類型為(K,V)和(K,W)的RDD上調用,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD
2. 需求:創建兩個pairRDD,並將key相同的數據聚合到一個元組。
(1)創建第一個pairRDD
scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[32] at parallelize at <console>:24
(2)創建第二個pairRDD
scala> val rdd1 = sc.parallelize(Array((1,4),(2,5),(4,6)))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:24
(3)join操作並打印結果
scala> rdd.join(rdd1).collect()
res13: Array[(Int, (String, Int))] = Array((1,(a,4)), (2,(b,5)), (3,(c,6)))
Action算子
34.18 reduce(func)案例
1. 作用:通過func函數聚集RDD中的所有元素,先聚合分區內數據,再聚合分區間數據。
2. 需求:創建一個RDD,將所有元素聚合得到結果
(1)創建一個RDD[Int]
scala> val rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[85] at makeRDD at <console>:24
(2)聚合RDD[Int]所有元素
scala> rdd1.reduce(_+_)
res50: Int = 55
(3)創建一個RDD[String]
scala> val rdd2 = sc.makeRDD(Array(("a",1),("a",3),("c",3),("d",5)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[86] at makeRDD at <console>:24
(4)聚合RDD[String]所有數據
scala> rdd2.reduce((x,y)=>(x._1 + y._1,x._2 + y._2))
res51: (String, Int) = (adca,12)
34.19 countByKey()案例
1. 作用:針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每一個key對應的元素個數。
2. 需求:創建一個PairRDD,統計每種key的個數
(1)創建一個PairRDD
scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[95] at parallelize at <console>:24
(2)統計每種key的個數
scala> rdd.countByKey
res63: scala.collection.Map[Int,Long] = Map(3 -> 2, 1 -> 3, 2 -> 1)
34.20 foreach(func)案例
1. 作用:在數據集的每一個元素上,運行函數func進行更新。
2. 需求:創建一個RDD,對每個元素進行打印
(1)創建一個RDD
scala> var rdd = sc.makeRDD(1 to 5,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[107] at makeRDD at <console>:24
(2)對該RDD每個元素進行打印
scala> rdd.foreach(println(_))
3
4
5
1
2
34.22 collect()案例
1. 作用:在驅動程序中,以數組的形式返回數據集的所有元素。
2. 需求:創建一個RDD,並將RDD內容收集到Driver端打印
(1)創建一個RDD
scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
(2)將結果收集到Driver端
scala> rdd.collect
res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
34.23 count()案例
1. 作用:返回RDD中元素的個數
2. 需求:創建一個RDD,統計該RDD的條數
(1)創建一個RDD
scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
(2)統計該RDD的條數
scala> rdd.count
res1: Long = 10
34.24 first()案例
1. 作用:返回RDD中的第一個元素
2. 需求:創建一個RDD,返回該RDD中的第一個元素
(1)創建一個RDD
scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
(2)統計該RDD的條數
scala> rdd.first
res2: Int = 1
34.25 take(n)案例
1. 作用:返回一個由RDD的前n個元素組成的數組
2. 需求:創建一個RDD,統計該RDD的條數
(1)創建一個RDD
scala> val rdd = sc.parallelize(Array(2,5,4,6,8,3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24
(2)統計該RDD的條數
scala> rdd.take(3)
res10: Array[Int] = Array(2, 5, 4)
35、什么是寬窄依賴
窄依賴指的是每一個父RDD的Partition最多被子RDD的一個Partition使用。
寬依賴指的是多個子RDD的Partition會依賴同一個父RDD的Partition,會引起shuffle。
36、任務划分的幾個重要角色
RDD任務切分中間分為:Application、Job、Stage和Task
1)Application:初始化一個SparkContext即生成一個Application;
2)Job:一個Action算子就會生成一個Job;
3)Stage:根據RDD之間的依賴關系的不同將Job划分成不同的Stage,遇到一個寬依賴則划分一個Stage;
4)Task:Stage是一個TaskSet,將Stage划分的結果發送到不同的Executor執行即為一個Task。