http://blog.51cto.com/13943588/2165946
3、hadoop和spark的都是並行計算,那么他們有什么相同和區別?
兩者都是用mr模型來進行並行計算,hadoop的一個作業稱為job,job里面分為map task和reduce task,每個task都是在自己的進程中運行的,當task結束時,進程也會結束。
spark用戶提交的任務成為application,一個application對應一個sparkcontext,app中存在多個job,每觸發一次action操作就會產生一個job。
這些job可以並行或串行執行,每個job中有多個stage,stage是shuffle過程中DAGSchaduler通過RDD之間的依賴關系划分job而來的,每個stage里面有多個task,組成taskset有TaskSchaduler分發到各個executor中執行,executor的生命周期是和app一樣的,即使沒有job運行也是存在的,所以task可以快速啟動讀取內存進行計算。
hadoop的job只有map和reduce操作,表達能力比較欠缺而且在mr過程中會重復的讀寫hdfs,造成大量的io操作,多個job需要自己管理關系。
spark的迭代計算都是在內存中進行的,API中提供了大量的RDD操作如join,groupby等,而且通過DAG圖可以實現良好的容錯。
4、hadoop的TextInputFormat作用是什么,如何自定義實現
InputFormat會在map操作之前對數據進行兩方面的預處理
1是getSplits,返回的是InputSplit數組,對數據進行split分片,每片交給map操作一次
2是getRecordReader,返回的是RecordReader對象,對每個split分片進行轉換為key-value鍵值對格式傳遞給map
常用的InputFormat是TextInputFormat,使用的是LineRecordReader對每個分片進行鍵值對的轉換,以行偏移量作為鍵,行內容作為值
自定義類繼承InputFormat接口,重寫createRecordReader和isSplitable方法
在createRecordReader中可以自定義分隔符
5、Spark RDD寬依賴和窄依賴?
RDD和它依賴的parent RDD(s)的關系有兩種不同的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。
1)窄依賴指的是每一個parent RDD的Partition最多被子RDD的一個Partition使用
2)寬依賴指的是多個子RDD的Partition會依賴同一個parent RDD的Partition
6、spark cache和pesist的區別
1)cache和persist都是用於將一個RDD進行緩存的,這樣在之后使用的過程中就不需要重新計算了,可以大大節省程序運行時間;
2) cache只有一個默認的緩存級別MEMORY_ONLY ,cache調用了persist,而persist可以根據情況設置其它的緩存級別;
3)executor執行的時候,默認60%做cache,40%做task操作,persist最根本的函數,最底層的函數
可以看到緩存級別類StorageLevel類的主構造器包含了5個參數:
useDisk:使用硬盤(外存)
useMemory:使用內存
useOffHeap:使用堆外內存,這是Java虛擬機里面的概念,堆外內存意味着把內存對象分配在Java虛擬機的堆以外的內存,這些內存直接受操作系統管理(而不是虛擬機)。這樣做的結果就是能保持一個較小的堆,以減少垃圾收集對應用的影響。
deserialized:反序列化,其逆過程序列化(Serialization)是java提供的一種機制,將對象表示成一連串的字節;而反序列化就表示將字節恢復為對象的過程。序列化是對象永久化的一種機制,可以將對象及其屬性保存起來,並能在反序列化后直接恢復這個對象
replication:備份數(在多個節點上備份)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) 就表示使用這種緩存級別的RDD將存儲在硬盤以及內存中,使用序列化(在硬盤中),並且在多個節點上備份2份(正常的RDD只有一份)
7、常規的容錯方式有哪幾種類型?RDD通過Linage(記錄數據更新)的方式為何很高效?
1).數據檢查點,會發生拷貝,浪費資源
2).記錄數據的更新,每次更新都會記錄下來,比較復雜且比較消耗性能
——————
1) lazy記錄了數據的來源,RDD是不可變的,且是lazy級別的,且rDD之間構成了鏈條,lazy是彈性的基石。由於RDD不可變,所以每次操作就產生新的rdd,不存在全局修改的問題,控制難度下降,所有有計算鏈條將復雜計算鏈條存儲下來,計算的時候從后往前回溯900步是上一個stage的結束,要么就checkpoint
2) 記錄原數據,是每次修改都記錄,代價很大如果修改一個集合,代價就很小,官方說rdd是粗粒度的操作,是為了效率,為了簡化,每次都是操作數據集合,寫或者修改操作,都是基於集合的rdd的寫操作是粗粒度的,rdd的讀操作既可以是粗粒度的也可以是細粒度,讀可以讀其中的一條條的記錄。
3) 簡化復雜度,是高效率的一方面,寫的粗粒度限制了使用場景如網絡爬蟲,現實世界中,大多數寫是粗粒度的場景
8、RDD有哪些缺陷?
1)不支持細粒度的寫和更新操作(如網絡爬蟲),spark寫數據是粗粒度的所謂粗粒度,就是批量寫入數據,為了提高效率。但是讀數據是細粒度的也就是說可以一條條的讀
2)不支持增量迭代計算,Flink支持
9、Spark中數據的位置是被誰管理的?
每個數據分片都對應具體物理位置,數據的位置是被blockManager,無論數據是在磁盤,內存還是tacyan,都是由blockManager管理
10、Spark的數據本地性有哪幾種?
答:Spark中的數據本地性有三種:a.PROCESS_LOCAL是指讀取緩存在本地節點的數據b.NODE_LOCAL是指讀取本地節點硬盤數據c.ANY是指讀取非本地節點數據通常讀取數據PROCESS_LOCAL>NODE_LOCAL>ANY,盡量使數據以PROCESS_LOCAL或NODE_LOCAL方式讀取。其中PROCESS_LOCAL還和cache有關,如果RDD經常用的話將該RDD cache到內存中,注意,由於cache是lazy的,所以必須通過一個action的觸發,才能真正的將該RDD cache到內存中
11、rdd有幾種操作類型?
1)transformation,rdd由一種轉為另一種rdd
2)action,
3)cronroller,crontroller是控制算子,cache,persist,對性能和效率的有很好的支持三種類型,不要回答只有2中操作
12、Spark程序執行,有時候默認為什么會產生很多task,怎么修改默認task執行個數?
1)因為輸入數據有很多task,尤其是有很多小文件的時候,有多少個輸入block就會有多少個task啟動;
2)spark中有partition的概念,每個partition都會對應一個task,task越多,在處理大規模數據的時候,就會越有效率。不過task並不是越多越好,如果平時測試,或者數據量沒有那么大,則沒有必要task數量太多。
3)參數可以通過spark_home/conf/spark-default.conf配置文件設置:spark.sql.shuffle.partitions 50 spark.default.parallelism 10第一個是針對spark sql的task數量第二個是非spark sql程序設置生效
13、為什么Spark Application在沒有獲得足夠的資源,job就開始執行了,可能會導致什么什么問題發生?
答:會導致執行該job時候集群資源不足,導致執行job結束也沒有分配足夠的資源,分配了部分Executor,該job就開始執行task,應該是task的調度線程和Executor資源申請是異步的;如果想等待申請完所有的資源再執行job的:需要將spark.scheduler.maxRegisteredResourcesWaitingTime設置的很大;spark.scheduler.minRegisteredResourcesRatio 設置為1,但是應該結合實際考慮否則很容易出現長時間分配不到資源,job一直不能運行的情況。
14、join操作優化經驗?
join其實常見的就分為兩類: map-side join 和 reduce-side join。當大表和小表join時,用map-side join能顯著提高效率。將多份數據進行關聯是數據處理過程中非常普遍的用法,不過在分布式計算系統中,這個問題往往會變的非常麻煩,因為框架提供的 join 操作一般會將所有數據根據 key 發送到所有的 reduce 分區中去,也就是 shuffle 的過程。造成大量的網絡以及磁盤IO消耗,運行效率極其低下,這個過程一般被稱為 reduce-side-join。如果其中有張表較小的話,我們則可以自己實現在 map 端實現數據關聯,跳過大量數據進行 shuffle 的過程,運行時間得到大量縮短,根據不同數據可能會有幾倍到數十倍的性能提升。
15、介紹一下cogroup rdd實現原理,你在什么場景下用過這個rdd?
答:cogroup的函數實現:這個實現根據兩個要進行合並的兩個RDD操作,生成一個CoGroupedRDD的實例,這個RDD的返回結果是把相同的key中兩個RDD分別進行合並操作,最后返回的RDD的value是一個Pair的實例,這個實例包含兩個Iterable的值,第一個值表示的是RDD1中相同KEY的值,第二個值表示的是RDD2中相同key的值.由於做cogroup的操作,需要通過partitioner進行重新分區的操作,因此,執行這個流程時,需要執行一次shuffle的操作(如果要進行合並的兩個RDD的都已經是shuffle后的rdd,同時他們對應的partitioner相同時,就不需要執行shuffle
RDD的彈性表現在哪幾點?
1)自動的進行內存和磁盤的存儲切換;
2)基於Lingage的高效容錯;
3)task如果失敗會自動進行特定次數的重試;
4)stage如果失敗會自動進行特定次數的重試,而且只會計算失敗的分片;
5)checkpoint和persist,數據計算之后持久化緩存
6)數據調度彈性,DAG TASK調度和資源無關
7)數據分片的高度彈性,a.分片很多碎片可以合並成大的,b.par
常規的容錯方式有哪幾種類型?
1)checkPoint ,會發生拷貝,浪費資源
2).記錄數據的更新,每次更新都會記錄下來,比較復雜且比較消耗性能
RDD通過Linage(記錄數據更新)的方式為何很高效?
1) lazy記錄了數據的來源,RDD是不可變的,且是lazy級別的,且RDD之間構成了鏈條,lazy是彈性的基石。由於RDD不可變,所以每次操作就產生新的rdd,不存在全局修改的問題,控制難度下降,所有有計算鏈條將復雜計算鏈條存儲下來,計算的時候從后往前回溯900步是上一個stage的結束,要么就checkpoint
2) 記錄原數據,是每次修改都記錄,代價很大如果修改一個集合,代價就很小,官方說rdd是粗粒度的操作,是為了效率,為了簡化,每次都是操作數據集合,寫或者修改操作,都是基於集合的rdd的寫操作是粗粒度的,rdd的讀操作既可以是粗粒度的也可以是細粒度,讀可以讀其中的一條條的記錄。
3) 簡化復雜度,是高效率的一方面,寫的粗粒度限制了使用場景如網絡爬蟲,現實世界中,大多數寫是粗粒度的場景
Spark有哪些聚合類的算子,我們應該盡量避免什么類型的算子?
答:在我們的開發過程中,能避免則盡可能避免使用reduceByKey、join、distinct、repartition等會進行shuffle的算子,盡量使用map類的非shuffle算子。這樣的話,沒有shuffle操作或者僅有較少shuffle操作的Spark作業,可以大大減少性能開銷。
Spark的shuffle過程?
答:從下面三點去展開
1)shuffle過程的划分
2)shuffle的中間結果如何存儲
3)shuffle的數據如何拉取過來
RDD創建有哪幾種方式?
1).使用程序中的集合創建rdd
2).使用本地文件系統創建rdd
3).使用hdfs創建rdd,
4).基於數據庫db創建rdd
5).基於Nosql創建rdd,如hbase
6).基於s3創建rdd,
7).基於數據流,如socket創建rdd
如果只回答了前面三種,是不夠的,只能說明你的水平還是入門級的,實踐過程中有很多種創建方式。
Spark並行度怎么設置比較合適
答:spark並行度,每個core承載 2~4 個partition,如,32個core,那么64~128之間的並行度,也就是
設置 64~128 個partion,並行讀和數據規模無關,只和內存使用量和cpu使用
時間有關
Spark的數據本地性有哪幾種?
答:Spark中的數據本地性有三種:
a.PROCESS_LOCAL 是指讀取緩存在本地節點的數據
b.NODE_LOCAL 是指讀取本地節點硬盤數據
c.ANY 是指讀取非本地節點數據
通常讀取數據PROCESS_LOCAL>NODE_LOCAL>ANY,盡量使數據以PROCESS_LOCAL或NODE_LOCAL方式讀取。其中PROCESS_LOCAL還和cache有關,如果RDD經常用的話將該RDD cache到內存中,注意,由於cache是lazy的,所以必須通過一個action的觸發,才能真正的將該RDD cache到內存中。
rdd有幾種操作類型?
1)transformation,rdd由一種轉為另一種rdd
2)action,
3)cronroller,crontroller是控制算子,cache,persist,對性能和效率的有很好的支持
三種類型,不要回答只有2中操作
Spark如何處理不能被序列化的對象?
將不能序列化的內容封裝成object
collect功能是什么,其底層是怎么實現的?
答:driver通過collect把集群中各個節點的內容收集過來匯總成結果,collect返回結果是Array類型的,collect把各個節點上的數據抓過來,抓過來數據是Array型,collect對Array抓過來的結果進行合並,合並后Array中只有一個元素,是tuple類型(KV類型的)的。
Spaek程序執行,有時候默認為什么會產生很多task,怎么修改默認task執行個數?
答:1)因為輸入數據有很多task,尤其是有很多小文件的時候,有多少個輸入
block就會有多少個task啟動;2)spark中有partition的概念,每個partition都會對應一個task,task越多,在處理大規模數據的時候,就會越有效率。不過task並不是越多越好,如果平時測試,或者數據量沒有那么大,則沒有必要task數量太多。3)參數可以通過spark_home/conf/spark-default.conf配置文件設置:
spark.sql.shuffle.partitions 50 spark.default.parallelism 10
第一個是針對spark sql的task數量
第二個是非spark sql程序設置生效
為什么Spark Application在沒有獲得足夠的資源,job就開始執行了,可能會導致什么什么問題發生?
答:會導致執行該job時候集群資源不足,導致執行job結束也沒有分配足夠的資源,分配了部分Executor,該job就開始執行task,應該是task的調度線程和Executor資源申請是異步的;如果想等待申請完所有的資源再執行job的:需要將spark.scheduler.maxRegisteredResourcesWaitingTime設置的很大;spark.scheduler.minRegisteredResourcesRatio 設置為1,但是應該結合實際考慮
否則很容易出現長時間分配不到資源,job一直不能運行的情況。
Spark為什么要持久化,一般什么場景下要進行persist操作?
為什么要進行持久化?
spark所有復雜一點的算法都會有persist身影,spark默認數據放在內存,spark很多內容都是放在內存的,非常適合高速迭代,1000個步驟
只有第一個輸入數據,中間不產生臨時數據,但分布式系統風險很高,所以容易出錯,就要容錯,rdd出錯或者分片可以根據血統算出來,如果沒有對父rdd進行persist 或者cache的化,就需要重頭做。
以下場景會使用persist
1)某個步驟計算非常耗時,需要進行persist持久化
2)計算鏈條非常長,重新恢復要算很多步驟,很好使,persist
3)checkpoint所在的rdd要持久化persist,
lazy級別,框架發現有checnkpoint,checkpoint時單獨觸發一個job,需要重算一遍,checkpoint前
要持久化,寫個rdd.cache或者rdd.persist,將結果保存起來,再寫checkpoint操作,這樣執行起來會非常快,不需要重新計算rdd鏈條了。checkpoint之前一定會進行persist。
4)shuffle之后為什么要persist,shuffle要進性網絡傳輸,風險很大,數據丟失重來,恢復代價很大
5)shuffle之前進行persist,框架默認將數據持久化到磁盤,這個是框架自動做的。
為什么要進行序列化
序列化可以減少數據的體積,減少存儲空間,高效存儲和傳輸數據,不好的是使用的時候要反序列化,非常消耗CPU
介紹一下join操作優化經驗?
答:join其實常見的就分為兩類: map-side join 和 reduce-side join。當大表和小表join時,用map-side join能顯著提高效率。將多份數據進行關聯是數據處理過程中非常普遍的用法,不過在分布式計算系統中,這個問題往往會變的非常麻煩,因為框架提供的 join 操作一般會將所有數據根據 key 發送到所有的 reduce 分區中去,也就是 shuffle 的過程。造成大量的網絡以及磁盤IO消耗,運行效率極其低下,這個過程一般被稱為 reduce-side-join。如果其中有張表較小的話,我們則可以自己實現在 map 端實現數據關聯,跳過大量數據進行 shuffle 的過程,運行時間得到大量縮短,根據不同數據可能會有幾倍到數十倍的性能提升。
備注:這個題目面試中非常非常大概率見到,務必搜索相關資料掌握,這里拋磚引玉。
介紹一下cogroup rdd實現原理,你在什么場景下用過這個rdd?
答:cogroup的函數實現:這個實現根據兩個要進行合並的兩個RDD操作,生成一個CoGroupedRDD的實例,這個RDD的返回結果是把相同的key中兩個RDD分別進行合並操作,最后返回的RDD的value是一個Pair的實例,這個實例包含兩個Iterable的值,第一個值表示的是RDD1中相同KEY的值,第二個值表示的是RDD2中相同key的值.由於做cogroup的操作,需要通過partitioner進行重新分區的操作,因此,執行這個流程時,需要執行一次shuffle的操作(如果要進行合並的兩個RDD的都已經是shuffle后的rdd,同時他們對應的partitioner相同時,就不需要執行shuffle,),
場景:表關聯查詢
kafka工作原理?
producer向broker發送事件,consumer從broker消費事件。
事件由topic區分開,每個consumer都會屬於一個group。
相同group中的consumer不能重復消費事件,而同一事件將會發送給每個不同group的consumer。