一、spark的優勢:
1、每一個作業獨立調度,可以把所有的作業做一個圖進行調度,各個作業之間相互依賴,在調度過程中一起調度,速度快。
2、所有過程都基於內存,所以通常也將Spark稱作是基於內存的迭代式運算框架。
3、spark提供了更豐富的算子,讓操作更方便。
二、為什么Spark比Map Reduced運算速度快:Spark在計算模型和調度上比MR做了更多的優化,不需要過多地和磁盤交互。
1、Spark計算比MapReduce快的根本原因在於DAG計算模型。DAG相比Hadoop的MapReduce在大多數情況下可以減少shuffle次數。spark遇到寬依賴才會出現shuffer,通常每次MapReduce都會有一次shuffer;DAG 相當於改進版的 MapReduce,可以說是由多個 MapReduce 組成,當數據處理流程中存在多個map和多個Reduce操作混合執行時,MapReduce只能提交多個Job執行,而Spark可以只提交一次application即可完成。
2、MapReduce 每次shuffle 操作后,必須寫到磁盤,然后每次計算都需要從磁盤上讀書數據,磁盤上的I/O開銷比較大。spark的Executor中有一個BlockManager存儲模塊,會將內存和磁盤共同作為存儲設備,當需要多輪迭代計算時,可以將中間結果存儲到這個存儲模塊里,下次需要時,就可以直接讀該存儲模塊里的數據,而不需要讀寫到HDFS等文件系統里,因而有效減少了IO開銷;還有一點就是spark的RDD數據結構,RDD在每次transformation后並不立即執行,而且action后才執行,有進一步減少了I/O操作。
3、MR它必須等map輸出的所有數據都寫入本地磁盤文件以后,才能啟動reduce操作,因為mr要實現默認的根據Key的排序!所以要排序肯定得寫完所有數據,才能排序,然后reduce來拉取。但是spark不需要,spark默認情況下,是不會對數據進行排序的。因此shufflemaptask每寫入一點數據,resulttask就可以拉取一點數據,然后在本地執行我們定義的聚合函數和算子,進行計算.
4、利用多線程來執行具體的任務(Hadoop MapReduce采用的是進程模型),減少任務的啟動和切換開銷;
三、spark的RDD與DataFrame以及Dataset的區別:
1、基本數據結構RDD:是彈性分布式數據集。
(1)RDD特點
1)彈性:RDD的每個分區在spark節點上存儲時默認是放在內存中的,若內存存儲不下,則存儲在磁盤中。
2)分布性:每個RDD中的數據可以處在不同的分區中,而分區可以處在不同的節點中.
3)容錯性:當一個RDD出現故障時,可以根據RDD之間的依賴關系來重新計算出發生故障的RDD.
(2)RDD與DataFrame以及DataSet的區別
1)RDD
a、具有面向對象的風格,是一組表示數據的Java或Scala對象,編譯時類型安全,方便處理非結構化數據。
b、處理結構化數據比較麻煩;默認采用的是java序列號方式,序列化性能開銷大,而且數據存儲在java堆內存中,導致gc比較頻繁
2)DataFrame:
a、是一個按指定列組織的分布式數據集合。類似於表。處理結構化數據方便;可以將數據序列化為二進制格式,數據保存在堆外內存中,可以減少了gc次數。
b、不支持編譯時類型安全,若結構未知,則不能操作數據。不具有面向對象風格。
3)DataSet
a、表示行(row)的JVM對象或行對象集合形式的數據,在編譯時檢查類型安全。方便處理結構化和非結構化數據。采用堆外內存存儲,gc友好。
2、spark的算子
(1)transform算子:map轉換算子,filter篩選算子,flatmap,groupByKey,reduceByKey,sortByKey,join,cogroup,combinerByKey。
(2)action算子:reduce,collect,count,take,aggregate,countByKey。
transformation是得到一個新的RDD,方式很多,比如從數據源生成一個新的RDD,從RDD生成一個新的RDD,action是得到一個值,或者一個結果(直接將RDDcache到內存中)所有的transformation都是采用的懶策略,就是如果只是將transformation提交是不會執行計算的,計算只有在action被提交的時候才被觸發。
(3)map與mapPartitions的區別
1)map是對rdd中的每一個元素進行操作;mapPartitions則是對rdd中的每個分區的迭代器進行操作
2)假如是普通的map,若一個partition中有1萬條數據。那么map中的方法要執行和計算1萬次。若是MapPartitions,一個task僅僅會執行一次function,此function一次接收所有的partition數據,執行一次即可,性能比較高。SparkSql或DataFrame默認會對程序進行mapPartition的優化。
3)普通的map操作通常不會導致內存的OOM異常,因為可以將已經處理完的1千條數據從內存里面垃圾回收掉。 但是MapPartitions操作,對於大量數據來說,將一個partition的數據一次傳入一個function以后,那么可能一下子內存不夠,但是又沒有辦法去騰出內存空間來,可能就OOM,內存溢出。
(4)treeReduce與reduce的區別
1)treeReduce:是在reduce的時候,先在自己的本地節點分區進行本地聚合一下,然后在進行全局聚合,相當於預處理.
2)reduce:是在reduce的時候,沒有本地聚合,直接返回給driver端。
(5)coalesce與repartition的區別
1)coalesce 與 repartition 都是對RDD進行重新划分,repartition只是coalesce接口中參數shuffle為true的實現。
2)若coalesce中shuffle為false時,傳入的參數大於現有的分區數目,RDD的分區數不變,也就是說不經過shuffle,是無法將RDD的分區數變多的。
3)若存在過多的小任務的時候,可以通過coalesce方法,收縮合並分區,減少分區的個數,減小任務調度成本,盡量避免shuffle,這樣會比repartition效率高。
(6)reduceByKey與groupByKey的區別:
pairRdd.reduceByKey(_+_).collect.foreach(println)等價於pairRdd.groupByKey().map(t => (t._1,t._2.sum)).collect.foreach(println)
reduceByKey的結果:(hello,2)(world,3) groupByKey的結果:(hello,(1,1))(world,(1,1,1))
使用reduceByKey()的時候,會對同一個Key所對應的value進行本地聚合,然后再傳輸到不同節點的節點。而使用groupByKey()的時候,並不進行本地的本地聚合,而是將全部數據傳輸到不同節點再進行合並,groupByKey()傳輸速度明顯慢於reduceByKey()。雖然groupByKey().map(func)也能實現reduceByKey(func)功能,但是,優先使用reduceByKey(func).
(7)spark的cache和persist的區別:
1)計算流程DAG特別長,服務器需要將整個DAG計算完成得出結果,若計算流程中突然中間算出的數據丟失了,spark又會根據RDD的依賴關系重新計算,這樣會浪費時間,為避免浪費時間可以將中間的計算結果通過cache或者persist放到內存或者磁盤中
2)cache最終調用了persist方法,默認的存儲級別僅是存儲內存中的;persist是最根本的底層函數,有多個存儲級別,executor執行時,60%用來緩存RDD,40%用來存放數據.
三、spark的小知識點。
1、DAG叫做有向無環圖
原始的RDD通過依賴關系形成了DAG,根據RDD之間依賴類型不同可以將DAG划分成不同的Stage(調度階段)。對於窄依賴,partition的轉換處理在一個Stage中完成計算。對於寬依賴,由於有Shuffle的存在,只能在parent RDD處理完成后,才能開始接下來的計算,因此寬依賴是划分Stage的依據。
2、spark如何從HDFS中讀取數據(參數MR的分片)
Spark從HDFS讀入文件的分區數默認等於HDFS文件的塊數(blocks),HDFS中的block是分布式存儲的最小單元。如果我們上傳一個30GB的非壓縮的文件到HDFS,HDFS默認的塊容量大小128MB,因此該文件在HDFS上會被分為235塊(30GB/128MB);Spark讀取SparkContext.textFile()讀取該文件,默認分區數等於塊數即235。
(1)讀取文件生成RDD時
1)從本地文件讀取生成RDD:rdd的分區數 = max(本地file的分片數, sc.defaultMinPartitions)
2)從HDFS上讀取文件生成RDD:rdd的分區數 = max(hdfs文件的block數目, sc.defaultMinPartitions)
(2)通過RDD生成時:
1)分區的默認個數等於spark.default.parallelism的指定值
2)根據父rdd的reduceTask數量
3、spark的checkpoint操作
checkpoint的意思就是建立檢查點,類似於快照,若DAG計算流程特別長,則需要將整個DAG計算完成得出結果,但是如果中間計算出的數據出錯,spark又會根據RDD的依賴關系重新計算,這樣子很費性能;當然我們可以將中間的計算結果通過cache或者persist放到內存或者磁盤中,但是這樣也不能保證數據完全不會丟失,存儲的這個內存出問題了或者磁盤壞了,也會導致spark從頭再根據RDD計算一遍,所以就有了checkpoint,其中checkpoint的作用就是將DAG中比較重要的中間數據做一個檢查點將結果存儲到一個高可用的地方(通常這個地方就是HDFS里面)
4、spark廣播變量和累加器
(1)廣播變量:廣播變量只能在Driver定義,且在Exector端不可改變。當在Executor端用到了Driver變量而不使用廣播變量,那么在每個Executor中有多少task就有多少Driver端變量副本。如果使用廣播變量,則在每個Executor端中只有一份Driver端的變量副本,減少了executor端的備份,節省了executor的內存,同時減少了網絡傳輸.
1、廣播變量的創建:廣播變量的創建發生在Driver端,當調用b=sc.broadcast(URI)來創建廣播變量時,會把該變量的數據切分成多個數據塊,保存到driver端的BlockManger中,使用的存儲級別是:MEMORY_AND_DISK_SER。廣播變量的值必須是本地的可序列化的值,不能是RDD。廣播變量一旦創建就不應該再修改,這樣可以保證所有的worker節點上的值是一致的。
2、廣播變量的讀取:b.value(),廣播變量的讀取也是懶加載的,此時廣播變量的數據只在Driver端存在,只有在Executor端需要廣播變量時才會去加載。加載后,首先從Executor本地的BlockManager中讀取廣播變量的數據,若存在就直接獲取。只要有一個worker節點的Executor從Driver端獲取到了廣播變量的數據,則其他的Executor就不需要從Driver端獲取了。
(2)累加器:Accumulator則可以讓多個task共同操作一份變量,主要可以進行累加操作。Accumulator是存在於Driver端的,集群上運行的task進行Accumulator的累加,隨后把值發到Driver端,在Driver端匯總。Accumulator只提供了累加的功能,但是卻給我們提供了多個task對於同一個變量並行操作的功能,但是task只能對Accumulator進行累加操作,不能讀取它的值,只有Driver端可以讀取Accumulator的值。
注意:比較經典的應用場景是用來在Spark Streaming應用中記錄某些事件的數量。
5、task之間的內存分配:
為了更好地使用使用內存,Executor 內運行的 Task 之間共享着 Execution 內存。
(1)Spark 內部維護了一個 HashMap 用於記錄每個 Task 占用的內存。當 Task 需要在 Executor 中申請內存時,先判斷 HashMap 里面是否維護着這個 Task 的內存使用情況,如果沒有,則將 TaskId 為 key,內存使用量 value為0 加入到 HashMap 里面。
(2)之后為這個 Task 申請 numBytes 內存,如果 Executor 內存區域正好有大於 numBytes 的空閑內存,則在 HashMap 里面將當前 Task 使用的內存加上 numBytes,然后返回;如果當前 Executor 內存區域無法申請到每個 Task 最小可申請的內存,則當前 Task 被阻塞,直到有其他任務釋放了足夠的執行內存,該任務才可以被喚醒。
(3)每個 Task 可以使用 Execution 內存大小范圍為 1/2N ~ 1/N,其中 N 為當前 Executor 內正在運行的 Task 個數。一個 Task 能夠運行必須申請到最小內存為 (1/2N * Execution 內存);當 N = 1 的時候,Task 可以使用全部的 Execution 內存。比如如果 Execution 內存大小為 10GB,當前 Executor 內正在運行的 Task 個數為5,則該 Task 可以申請的內存范圍為 10 / (2 * 5) ~ 10 / 5,也就是 1GB ~ 2GB的范圍。
6、spark與MapReduce的shuffle的區別:
(1)相同點:都是將 mapper(Spark 里是 ShuffleMapTask)的輸出進行 partition,不同的 partition 送到不同的 reducer(Spark里reducer 可能是下一個 stage 里的 ShuffleMapTask,也可能是 ResultTask)
(2)不同點:
1)MapReduce默認是排序的,spark默認不排序,除非使用sortByKey算子。
2)MapReduce可以划分成split,map()、spill、merge、shuffle、sort、reduce()等階段,spark沒有明顯的階段划分,只有不同的stage和算子操作。
3)MR落盤,Spark不落盤,spark可以解決mr落盤導致效率低下的問題。
四、spark的運行模式
1、基於yarn運行的基本流程
(1)首先通過spark-submit向yarn提交Application應用,ResouceManager選擇一個NodeManager為Application啟動ApplicationMaster。
(2)ApplicationMaster向ResouceManager注冊和申請Container,ResouceManager收到ApplicationMaster的請求后,使用自己的資源調度算法為applicationMaster分配多個Container。
(3)ApplicationMaster在不同的Container中啟動executor,executor啟動之后會反向注冊到ApplicationMaster;
(4)隨后初始化Sparkcontext,Sparkcontext是用戶通向spark集群的入口,在初始化sparkContext的同時,會初始化DAGScheduler、TaskScheduler對象。
(5)初始化后的sparkContext對RDD的所有操作形成一個DAG有向無循環圖,每執行到action操作就會創建一個job到DAGScheduler中,而job又根據RDD的依賴關系划分成多個stage,每個stage根據最后一個RDD的分區數目來創建相應數量的task,這些task形成一個taskset。
(6)DAGScheduler將taskset送到taskscheduler中,然后taskscheduler對task進行序列化,封裝到launchTask中,最后將launchTask發送到指定的executor中。
(7)executor接收到了TaskScheduler發送過來的launchTask 時,會對launchTask 進行反序列化,封裝到一個TaskRunner 中,然后從executor線程池中獲取一個線程來執行指定的任務.
(8)最終當所有的task任務完成之后,整個application執行完成,關閉sparkContext對象。
2、spark運行模式的類型
(1)本地模式:master和worker分別運行在一台機器的不同進程上,不會啟動executor,由SparkSubmit進程生成指定數量的線程數來執行任務,啟動多少個線程取決於local的參數:local/只啟動一個線程,local[k]啟動k個線程,local[*]啟動跟CPU數目相同的線程。
(2)standalone模式:standalone模式既獨立模式,自帶完整服務,可單獨部署到一個集群中,無需依賴其他任何資源管理系統,只支持FIFO調度器。在standalone模式中,沒有AM和NM的概念,也沒有RM的概念,用戶節點直接與master打交道,由driver負責向master申請資源,並由driver進行資源的分配和調度等等。
(3)基於yarn模式:yarn-cluster和yarn-client模式,區別在於driver端啟動在本地(client),還是在Yarn集群內部的AM中(cluster)
1)yarn-client:Driver是運行在本地客戶端,它的AM只是作為一個Executor啟動器。負責調度Application,會與yarn集群產生大量的網絡傳輸。好處是,執行時可以在本地看到所有的log,便於調試。所以一般用於測試環境。
2)yarn-cluster:driver運行在NodeManager,每次運行都是隨機分配到NM機器上去,不會產生大量的網絡傳輸。缺點就是本地提交后看不到log,只能通過yarn application-logs application id命令來查看,比較麻煩。
五、spark的數據傾斜
1、數據傾斜的現象
(1)大部分的task執行的特別快,剩下的幾個task執行的特別慢.
(2)運行一段時間后,其他task都已經執行完成,但是有的task可能會出現OOM異常。
2、數據傾斜的原因及其后果:
(1)根本原因是某個Key所對應的數據特別多,同一個key所對應的數據進入同一個reduce中,而其他的reduce中數據特別少。
(2)后果:某些任務執行特別慢,有的task可能會出現OOM異常,因為task的所分配的數據量太大,而且task每處理一條數據還要創建大量的對象,內存存儲不下.
3、如何定位數據傾斜
就是看哪些地方用了會產生shuffle的算子,distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition。
4、解決數據傾斜的方法
(1)若是數據分區太少,導致部分分區中數據量相對較大,產生輕度的數據傾斜,此時增加分區數即可解決。
(2)某個Key特別多,增大分區也無效。
1)數據傾斜的類型。
a、map端的數據傾斜:map端的主要功能是從磁盤中將數據讀入內存。在map端讀數據時,由於讀入數據的文件大小分布不均勻,因此會導致有些map讀取和處理的數據特別多,而有些map處理的數據特別少,造成map端長尾。
1.上游表文件的大小特別不均勻,並且小文件特別多(讀取的記錄數少),導致當前表map端讀取的數據分布不均勻,引起長尾。
解決方案:可以合並上游小文件,同時調節本節點的小文件的參數來進行優化。
2.Map端做聚合時,由於某些map讀取文件的某個值特別多(某些文件記錄數特別多)而引起長尾。
解決方案:來打亂數據分布,使數據盡可能分布均勻。
2)reduce端解決數據傾斜的方法:
a、聚合源數據:在數據的源頭將數據聚合成一個key對應多個value值.這樣在進行操作時就可能不會出現shuffle過程.
b、將導致數據傾斜的key提取出來,若是key對應的null或者無效數據,就將其刪除,若是正常的數據,就將其單獨處理,再與正常處理的數據進行union操作.
c、對key添加隨機值,操作后去掉隨機值,再操作一次。將原始的 key 轉化為 key + 隨機值(例如Random.nextInt),對數據進行操作后將 key + 隨機值 轉成 key.
六、Spark中的OOM問題:
1、map類型的算子執行中內存溢出如flatMap,mapPatitions
(1)原因:map端過程產生大量對象導致內存溢出:這種溢出的原因是在單個map中產生了大量的對象導致的針對這種問題。
(2)解決方案:
1)增加堆內內存。
2)在不增加內存的情況下,可以減少每個Task處理數據量,使每個Task產生大量的對象時,Executor的內存也能夠裝得下。具體做法可以在會產生大量對象的map操作之前調用repartition方法,分區成更小的塊傳入map。
2、shuffle后內存溢出如join,reduceByKey,repartition。
shuffle內存溢出的情況可以說都是shuffle后,單個文件過大導致的。在shuffle的使用,需要傳入一個partitioner,大部分Spark中的shuffle操作,默認的partitioner都是HashPatitioner,默認值是父RDD中最大的分區數.這個參數spark.default.parallelism只對HashPartitioner有效.如果是別的partitioner導致的shuffle內存溢出就需要重寫partitioner代碼了.
3、driver內存溢出
(1)用戶在Dirver端口生成大對象,比如創建了一個大的集合數據結構。解決方案:將大對象轉換成Executor端加載,比如調用sc.textfile或者評估大對象占用的內存,增加dirver端的內存
(2)從Executor端收集數據(collect)回Dirver端,建議將driver端對collect回來的數據所作的操作,轉換成executor端rdd操作。
七、spark的性能優化
1、參數優化
(1)計算資源的優化:調整--executor-memory和--executor-cores的大小;core表示executor同時計算的task數,memory表示執行的內存,這兩個參數過大過小都不合適,內存調大會出現內存瓶頸,內存過小會出現作業失敗;core太小導致並行計算度小,計算慢,太大會引起磁盤IO瓶頸。
(2)shuffle並行度優化:shuffleReadTask並行度增大,可以設置spark.sql.shuffle.partitions值來設置並行度。數據能分配到更多的分區,減少數據傾斜默認為200。
(3)設置spark.default.parallelism=600 每個stage的默認task數量。
(4)大小表join:對於兩表join,若一張表是另外一張表的2個數量級倍數大,可以考慮將小表broadcast到每一個executor,來達到降低網絡傳輸開銷優化目標;進而完全規避掉shuffle類的操作。
2、代碼優化:
(1)RDD的優化:避免重復創建RDD即避免創建多個從文件讀取而成的RDD,盡量復用RDD,對於多次使用的RDD需要cache或者persist;
3、算子的優化:
(1)盡量避免使用shuffle算子
1)能避免則盡量避免使用reduceByKey,join,distinct,repartition等會進行shuffle的算子
2)Broadcast小數據在map端進行join,避免shuffle
(2)使用高性能算子
1)使用reduceByKey代替groupByKey(reduceByKey在map端聚合數據)
2)使用mappartitions代替map(減少函數重復調用的計算開銷)
3)使用treeReduce代替reduce(treeReduce的計算會在executor中進行本地聚合)
4)使用foreachPartitions代替foreach(原理同mapPartitions)
5)使用filter之后使用coalesce操作(目的減少分區數,減少task啟動開銷)
6)使用Broadcast廣播變量
Executor中有一個BlockManager存儲模塊,會將內存和磁盤共同作為存儲設備,當需要多輪迭代計算時,可以將中間結果存儲到這個存儲模塊里,下次需要時,就可以直接讀該存儲模塊里的數據,而不需要讀寫到HDFS等文件系統里,因而有效減少了IO開銷;或者在交互式查詢場景下,預先將表緩存到該存儲系統上,從而可以提高讀寫IO性能。
八、spark的內存管理機制:
作為一個 JVM 進程,Executor 的內存管理建立在 JVM 的內存管理之上,Spark 對 JVM 的堆內(On-heap)空間做了詳細的分配,以充分利用內存。同時,Spark 引入了堆外(Off-heap)內存,使之可以直接在工作節點的系統內存中開辟空間,進一步優化了內存的使用。
1、堆內內存:堆內內存的大小,由 Spark 應用程序啟動時的 –executor-memory參數配置,分別是execution內存,storage內存,other內存。
(1)execution內存是執行內存,文檔中說join,map,aggregate都在這部分內存中執行,shuffle的數據也會先緩存在這個內存中,滿了再寫入磁盤,能夠減少磁盤IO。
(2)storage內存是存儲broadcast,cache,persist數據的地方。
(3)other內存是程序執行時預留給自己的內存。
2、堆外內存:
Off-heap memory不在 JVM 內申請內存,而是調用 Java 的 unsafe 相關 API (類似於malloc()函數)直接向操作系統申請內存。堆外內存只區分 Execution 內存和 Storage 內存。
(1)優點與缺點:因為堆外內存不進過 JVM 內存管理,所以可以避免頻繁的 GC,這種內存申請的缺點是必須自己編寫內存申請和釋放的邏輯。
(2)作用:為了進一步優化內存的使用以及提高Shuffle時排序的效率,存儲經過序列化的二進制數據。
注意:無論堆內和堆外內存目前 Execution 內存和 Storage 內存可以互相共享的。也就是說,如果 Execution 內存不足,而 Storage 內存有空閑,那么 Execution 可以從 Storage 中申請空間;反之亦然.
九、spark如何分區:
分區是RDD內部並行計算的一個計算單元,RDD的數據集在邏輯上被划分為多個分片,每一個分片稱為分區,分區的個數決定了並行計算的粒度,而每個分區的數值計算都是在一個任務中進行的,因此任務的個數,也是由RDD(准確來說是作業最后一個RDD)的分區數決定。spark默認分區方式是HashPartitioner.只有Key-Value類型的RDD才有分區的,非Key-Value類型的RDD分區的值是None,每個RDD的分區ID范圍:0~numPartitions-1,決定這個值是屬於那個分區的。
1、HashPartitioner分區:
partition = key.hashCode () % numPartitions,如果余數小於0,則用余數+分區的個數,最后返回的值就是這個key所屬的分區ID。
缺點:可能導致每個分區中數據量的不均勻,極端情況下會導致某些分區擁有RDD的全部數據
2、RangePartitioner分區(范圍分區):
通過抽樣確定各個Partition的Key范圍。首先會對采樣的key進行排序,然后計算每個Partition平均包含的Key權重,最后采用平均分配原則來確定各個Partition包含的Key范圍。盡量保證每個分區中數據量的均勻,而且分區與分區之間是有序的,一個分區中的元素肯定都是比另一個分區內的元素小或者大;但是分區內的元素是不能保證順序的。(計算每個Key所在Partition:當分區范圍長度在128以內,使用順序搜索來確定Key所在的Partition,否則使用二分查找算法來確定Key所在的Partition。)
3、CustomPartitioner自定義分區:
需要繼承org.apache.spark.Partitioner類,sc.parallelize(List((1,'a'),(1,'aa'),(2,'b'),(2,'bb'),(3,'c')), 3).partitionBy(new CustomPartitioner(3))
十、sparkSQL
1、sparkSQL執行的流程
SQL語句首先通過Parser模塊被解析為語法樹,此棵樹稱為Unresolved Logical Plan;Unresolved Logical Plan通過Analyzer模塊借助於Catalog中的表信息解析為Logical Plan;此時,Optimizer再通過各種基於規則的優化策略進行深入優化,得到Optimized Logical Plan;優化后的邏輯執行計划依然是邏輯的,並不能被Spark系統理解,此時需要將此邏輯執行計划轉換為Physical Plan。
2、sparkSQL是如何讀寫hive表的
(1)寫到hive表
1)方式一:是利用spark Rdd的API將數據寫入hdfs形成hdfs文件,之后再將hdfs文件和hive表做加載映射。
2)方式二:利用sparkSQL將獲取的數據Rdd轉換成dataFrame,再將dataFrame寫成緩存表,最后利用sparkSQL直接插入hive表中。而對於利用sparkSQL寫hive表官方有兩種常見的API,第一種是利用JavaBean做映射,第二種是利用StructType創建Schema做映射
3、RDDJoin中寬依賴與窄依賴的判斷
如果Join之前被調用的RDD是寬依賴(存在shuffle), 而且兩個join的RDD的分區數量一致,join結果的rdd分區數量也一樣,這個時候join是窄依賴,除此之外的,rdd 的join是寬依賴
十一、SparkStreaming
1、基本概念
(1)處理方式:SparkStreaming實際上處理並不是像Flink一樣來一條處理一條數據,而是對接的外部數據流之后,按照一定時間間隔切分,按批處理一個個切分后的文件,與Spark處理離線數據的邏輯是相同的。
(2)Dstream:SparkStreaming提供表示連續數據流和離散流的DStream,假如外部數據不斷涌入,按照一分鍾切片,每個一分鍾內部的數據是連續的(連續數據流),而一分鍾與一分鍾的切片卻是相互獨立的(離散流)。Spark的RDD可以理解為空間維度,Dstream的RDD理解為在空間維度上又加了個時間維度。
1)Dstream特點
a、持久化:接收到的數據暫存。目的做容錯的,當數據流出錯,把數據從源頭進行回溯,暫存的數據可以進行恢復。
b、離散化:按時間分片,形成處理單元。
c、分片處理:分批處理。
(3)SparkStreaming 窗口操作:
任何基於窗口操作需要指定兩個參數:窗口總長度(window length):你想計算多長時間的數據,滑動時間間隔(slide interval):你每多長時間去更新一次
(3)SparkStreaming的兩種處理方式
(1)receiver方式:將數據拉取到executor中做操作,若數據量大,內存存儲不下,可以通過WAL,設置了本地存儲,保證數據不丟失,然后使用Kafka高級API通過zk來維護偏移量,保證消費數據。receiver消費的數據偏移量是在zk獲取的,此方式效率低,容易出現數據丟失。
1)receiver方式的容錯性:在默認的配置下,這種方式可能會因為底層的失敗而丟失數據。如果要啟用高可靠機制,讓數據零丟失,就必須啟用Spark Streaming的預寫日志機制(Write Ahead Log,WAL)。該機制會同步地將接收到的Kafka數據寫入分布式文件系統(比如HDFS)上的預寫日志中。所以,即使底層節點出現了失敗,也可以使用預寫日志中的數據進行恢復。
2)Kafka中的topic的partition,與Spark中的RDD的partition是沒有關系的。在1、KafkaUtils.createStream()中,提高partition的數量,只會增加Receiver方式中讀取partition的線程的數量。不會增加Spark處理數據的並行度。 可以創建多個Kafka輸入DStream,使用不同的consumer group和topic,來通過多個receiver並行接收數據。
(2)基於Direct方式:使用Kafka底層Api,其消費者直接連接kafka的分區上,因為createDirectStream創建的DirectKafkaInputDStream每個batch所對應的RDD的分區與kafka分區一一對應,但是需要自己維護偏移量,即用即取,不會給內存造成太大的壓力,效率高。
1)優點:簡化並行讀取:如果要讀取多個partition,不需要創建多個輸入DStream然后對它們進行union操作。Spark會創建跟Kafka partition一樣多的RDD partition,並且會並行從Kafka中讀取數據。所以在Kafka partition和RDD partition之間,有一個一對一的映射關系。
2)高性能:如果要保證零數據丟失,在基於receiver的方式中,需要開啟WAL機制。這種方式其實效率低下,因為數據實際上被復制了兩份,Kafka自己本身就有高可靠的機制,會對數據復制一份,而這里又會復制一份到WAL中。而基於direct的方式,不依賴Receiver,不需要開啟WAL機制,只要Kafka中作了數據的復制,那么就可以通過Kafka的副本進行恢復。
(3)receiver與和direct的比較:
1)基於receiver的方式,是使用Kafka的高階API來在ZooKeeper中保存消費過的offset的。這是消費Kafka數據的傳統方式。這種方式配合着WAL機制可以保證數據零丟失的高可靠性,但是卻無法保證數據被處理一次且僅一次,可能會處理兩次。因為Spark和ZooKeeper之間可能是不同步的。
2)基於direct的方式,使用kafka的簡單api,Spark Streaming自己就負責追蹤消費的offset,並保存在checkpoint中。Spark自己一定是同步的,因此可以保證數據是消費一次且僅消費一次。
3)Receiver方式是通過zookeeper來連接kafka隊列,Direct方式是直接連接到kafka的節點上獲取數據
(4)sparkStreaming與Kafka的應用
1)kafka到spark streaming如何保證數據完整性?
a、spark RDD內部機制可以保證數據at-least語義。
b、Receiver方式開啟WAL(預寫日志),將從kafka中接受到的數據寫入到日志文件中,所有數據從失敗中可恢復。
c、Direct方式 依靠checkpoint機制來保證。 保證數據不重復使用Exactly once語義。
2)kafka到spark streaming如何保證數據不重復消費?
a、冪等操作:重復執行不會產生問題,不需要做額外的工作即可保證數據不重復。
b、業務代碼添加事務操作:針對每個partition的數據,產生一個uniqueId,若此partition的所有數據被完全消費,則成功,否則算失效,要回滾。下次重復執行這個uniqueId時,如果已經被執行成功,則skip掉。
(5)sparkStreaming出現數據堆積如何處理
1)spark.streaming.concurrentJobs=10:提高Job並發數,從源碼中可以察覺到,這個參數其實是指定了一個線程池的核心線程數而已,沒有指定時,默認為1。
2)spark.streaming.kafka.maxRatePerPartition=2000:設置每秒每個分區最大獲取日志數,控制處理數據量,保證數據均勻處理。
3)spark.streaming.kafka.maxRetries=50:獲取topic分區leaders及其最新offsets時,調大重試次數。