面試中的問題(重點)**
1. RDD的特性(RDD的解釋)
1.RDD可以看做是一些列partition所組成的
2.RDD之間的依賴關系
3.算子是作用在partition之上的
4.分區器是作用在kv形式的RDD上
5.partition提供的最佳計算位置,利於數據處理的本地化即計算向數據移動而不是移動數據
ps:RDD本身是不存儲數據,可以看做RDD本身是一個引用數據
RDD彈性
1) 自動進行內存和磁盤數據存儲的切換
Spark優先把數據放到內存中,如果內存放不下,就會放到磁盤里面,程序進行自動的存儲切換
2) 基於血統的高效容錯機制
在RDD進行轉換和動作的時候,會形成RDD的Lineage依賴鏈,當某一個RDD失效的時候,可以通過重新計算上游的RDD來重新生成丟失的RDD數據。
3) Task如果失敗會自動進行特定次數的重試
RDD的計算任務如果運行失敗,會自動進行任務的重新計算,默認次數是4次。
4) Stage如果失敗會自動進行特定次數的重試
如果Job的某個Stage階段計算失敗,框架也會自動進行任務的重新計算,默認次數也是4次。
5) Checkpoint和Persist可主動或被動觸發
RDD可以通過Persist持久化將RDD緩存到內存或者磁盤,當再次用到該RDD時直接讀取就行。也可以將RDD進行檢查點,檢查點會將數據存儲在HDFS中,該RDD的所有父RDD依賴都會被移除。
6) 數據調度彈性
Spark把這個JOB執行模型抽象為通用的有向無環圖DAG,可以將多Stage的任務串聯或並行執行,調度引擎自動處理Stage的失敗以及Task的失敗。
7) 數據分片的高度彈性
可以根據業務的特征,動態調整數據分片的個數,提升整體的應用執行效率。
2. RDD的兩類算子
RDD編程API
RDD支持兩種操作:轉化操作和行動操作。RDD 的轉化操作是返回一個新的 RDD的操作,比如 map()和 filter(),而行動操作則是向驅動器程序返回結果或把結果寫入外部系統的操作。比如 count() 和 first()。
Spark采用惰性計算模式,RDD只有第一次在一個行動操作中用到時,才會真正計算。Spark可以優化整個計算過程。默認情況下,Spark 的 RDD 會在你每次對它們進行行動操作時重新計算。如果想在多個行動操作中重用同一個 RDD,可以使用 RDD.persist() 讓 Spark 把這個 RDD 緩存下來。
3.25.17 Transformation算子(重要)
RDD中的所有轉換都是延遲加載的,也就是說,它們並不會直接計算結果。相反的,它們只是記住這些應用到基礎數據集(例如一個文件)上的轉換動作。只有當發生一個要求返回結果給Driver的動作時,這些轉換才會真正運行。這種設計讓Spark更加有效率地運行。
轉換 含義
map(func) 返回一個新的RDD,該RDD由每一個輸入元素經過func函數轉換后組成
filter(func) 返回一個新的RDD,該RDD由經過func函數計算后返回值為true的輸入元素組成
flatMap(func) 類似於map,但是每一個輸入元素可以被映射為0或多個輸出元素(所以func應該返回一個序列,而不是單一元素)
mapPartitions(func) 類似於map,但獨立地在RDD的每一個分片上運行,因此在類型為T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 類似於mapPartitions,但func帶有一個整數參數表示分片的索引值,因此在類型為T的RDD上運行時,func的函數類型必須是(Int, Iterator[T]) => Iterator[U]
sample(withReplacement, fraction, seed) 根據fraction指定的比例對數據進行采樣,可以選擇是否使用隨機數進行替換,seed用於指定隨機數生成器種子
union(otherDataset) 對源RDD和參數RDD求並集后返回一個新的RDD
intersection(otherDataset) 對源RDD和參數RDD求交集后返回一個新的RDD
distinct([numTasks])) 對源RDD進行去重后返回一個新的RDD
groupByKey([numTasks]) 在一個(K,V)的RDD上調用,返回一個(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]) 在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一起,與groupByKey類似,reduce任務的個數可以通過第二個可選的參數來設置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 相同的Key值進行聚合操作,在聚合過程中同樣使用了一個中立的初始值zeroValue:中立值,定義返回value的類型,並參與運算seqOp:用來在同一個partition中合並值combOp:用來在不同partiton中合並值
sortByKey([ascending], [numTasks]) 在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]) 與sortByKey類似,但是更靈活
join(otherDataset, [numTasks]) 在類型為(K,V)和(K,W)的RDD上調用,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks]) 在類型為(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable<V>,Iterable<W>))類型的RDD
cartesian(otherDataset) 笛卡爾積
pipe(command, [envVars]) 將一些shell命令用於Spark中生成新的RDD
coalesce(numPartitions) 重新分區
repartition(numPartitions) 重新分區
repartitionAndSortWithinPartitions(partitioner) 重新分區和排序
3.25.18 Action算子(重要)
在RDD上運行計算,並返回結果給Driver或寫入文件系統
動作 含義
reduce(func) 通過func函數聚集RDD中的所有元素,這個功能必須是可交換且可並聯的
collect() 在驅動程序中,以數組的形式返回數據集的所有元素
count() 返回RDD的元素個數
first() 返回RDD的第一個元素(類似於take(1))
take(n) 返回一個由數據集的前n個元素組成的數組
takeSample(withReplacement,num, [seed]) 返回一個數組,該數組由從數據集中隨機采樣的num個元素組成,可以選擇是否用隨機數替換不足的部分,seed用於指定隨機數生成器種子
takeOrdered(n, [ordering]) takeOrdered和top類似,只不過以和top相反的順序返回元素
saveAsTextFile(path) 將數據集的元素以textfile的形式保存到HDFS文件系統或者其他支持的文件系統,對於每個元素,Spark將會調用toString方法,將它裝換為文件中的文本
saveAsSequenceFile(path) 將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統。
saveAsObjectFile(path)
countByKey() 針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每一個key對應的元素個數。
foreach(func) 在數據集的每一個元素上,運行函數func進行更新。
3. 算子原理(shuffle算子原理,引出Shuffle原理)
4. Shuffle原理(和Hadoop的shuffle區別)
shuffle是發生在map和refuce之間,是map和reduce之間的橋梁,他是將一些無規律的數據轉變為有規律的數據的過程,map從數據端拉去數據,經過排序和合成后 存入系統磁盤,reduc端從map端磁盤讀取處理好的數據進行排序分組的過程
spark shuffle是spark所使用的hashshuffle
沒有優化前的hashbuffle是每個一個executor存在多個maptask,每個maptask都會產生多個
bucket文件,reduce將這些bucket進行分組,這樣存在大量的bucket,讀寫麻煩,分組慢,影響系統效率
優化一次后的shuffle(1.6-2.0版本)是將maptask后的bucket進行分組,然后在通過reduce再次處理,雖然簡化了bucket的數目,但是仍存在很多的小文件
三、Sort-Based Shuffle
為了緩解Shuffle過程產生文件數過多和Writer緩存開銷過大的問題,spark引入了類似於hadoop Map-Reduce的shuffle機制。該機制每一個ShuffleMapTask不會為后續的任務創建單獨的文件,而是會將所有的Task結果寫入同一個文件,並且對應生成一個索引文件。以前的數據是放在內存緩存中,等到緩存數據讀取完了再刷到磁盤,現在為了減少內存的使用,在內存不夠用的時候,可以將輸出溢寫到磁盤。結束的時候,再將這些不同的文件聯合內存的數據一起進行歸並,從而減少內存的使用量。一方面文件數量顯著減少,另一方面減少Writer緩存所占用的內存大小,而且同時避免GC的風險和頻率。
Sort-Based Shuffle有幾種不同的策略:BypassMergeSortShuffleWriter(Bypass機制)、SortShuffleWriter(普通機制)和UnsafeShuffleWriter。
對於BypassMergeSortShuffleWriter,使用這個模式的特點為:
# 主要用於處理不需要排序和聚合的Shuffle操作,所以數據是直接寫入文件,數據量較大的時候,網絡I/O和內存負擔較重。
# 主要適合處理Reducer任務數量比較少的情況。
# 將每一個分區寫入一個單獨的文件,最后將這些文件合並,減少文件數量。但是這種方式需要並發打開多個文件,對內存消耗比較大。
因為BypassMergeSortShuffleWriter這種方式比SortShuffleWriter更快,所以如果在Reducer數量不大,又不需要在map端聚合和排序,而且Reducer的數目小於spark.shuffle.sort.bypassMergeThreshold指定的閥值(默認200)時,就是用的是這種方式(即啟用條件)。
對於SortShuffleWriter,使用這個模式的特點為:
# 比較適合數據量很大的場景或者集群規模很大。
# 引入了外部排序器,可以支持在Map端進行本地聚合或者不聚合。
# 如果外部排序器enable了spill功能,如果內存不夠,可以先將輸出溢寫到本地磁盤,最后將內存結果和本地磁盤的溢寫文件進行合並。
另外,這個Sort-Based Shuffle跟Executor核數沒有關系,即跟並發度沒有關系,它是每一個ShuffleMapTask都會產生一個data文件和index文件,所謂合並也只是將該ShuffleMapTask的各個partition對應的分區文件合並到data文件而已。所以這個就需要和Hash-BasedShuffle的consolidation機制區別開來。
對於UnsafeShuffleWriter由於需要謹慎使用,我們暫不做分析。
shuffle調優
1. 合並map輸出文件,開啟輸出文件合並機制 spark.shuffle.consolidateFiles 2. 調節map內存緩沖 spark.shuffle.file.buffer 和reduce內存占比 spark.shuffle.memoryFraction,0.2
3. shufflemananger選擇
5. 檢查點、持計划、共享變量
https://blog.csdn.net/wjn19921104/article/details/80268661
6. 分區(自定義分區、默認分區,區別,作用)
7. 並行度(跑任務的並行度)參考調優文檔1.2.3
8. Spark的任務運行原理(重點中的重點)
9. Task原理(本地化級別)
1.概念:task在執行前都會獲取數據的分區信息進行分配,總是會優先將其分配到它要計算的數據所在節點,盡可能的減少網絡傳輸
2.過程:一般會默認3s,重試5次的去分配,一旦超時失敗,將會選擇一個比上一個本地級別差的級別再一次分配,如果發生了數據傳輸,那么task首先通過blockmanager獲取數據,如果本地沒有數據,則通過getRemote方法從數據所在節點的blockmanager獲取數據並返回至task所在節點
3.級別
PROCESS_LOCAL:進程本地化,性能最好。指代碼和數據在同一個進程中,也就是同一個executor 中;計算數據的task由executor執行,此時數據在executor的blockmanager里
NODE_LOCAL:節點本地化。代碼和數據在同一個節點中,數據存儲為節點的hdfs block數據塊,
task在節點的某個executror執行;或者數據和task在同一個節點不同的executor中,數據需要跨進程傳輸
NO_PREF:數據從哪里獲取都一樣,比如從數據庫中獲取數據,對於task而言沒有區別
RACK_LOCAL:數據和task在一個機架的兩個節點上,數據需要通過網絡在節點之間進行傳輸
ANY:數據和task可能在集群中的任何地方,而且不在一個機架中,性能最差
4.調節:spark.locality.wait參數默認是3s,默認情況下,以下幾個參數都是以 spark.locality.wait為默認值, spark.locality.wait.process spark.locality.wait.node spark.locality.wait.rack
實際情況中通過調節不同值達到最優的計算分配效果
10. DAG的原理(源碼級別)
那么在划分Stage的時候,會通過createResultStage方法創建一個ResultStage,然后根據方法 getShuffleDependencies遞歸推導,其原理是將最后一個rdd放入站中,判斷他和其父rdd的關系,如果是shuffle,那么就划分ShuffleMapStage,如果沒有就將其父節點入站繼續往前推,直至切分出所有的依賴,生成一個寬依賴集合,然后交給getOrCreateShuffleMapStage方法生成stage,實現所有的Stage的划分,最后通過submitStage方法中的getMissingParentStages方法遞歸尋找所有stage(類似rdd划分),通過submitStage方法中的submitMissingTasks方法將stage封裝為 task
11. SparkSQL和Hive區別
Ps: https://www.cnblogs.com/lixiaochun/p/9446350.html
12. DF和DS的之間的關系(從它的類型出發)
DataFrame是弱類型,是抽象數據集,Rdd和Schema的集合,可以像二維表一樣操作
DataSet:屬於DataFrame的父類,DataFrame=Dataset[row],強類型
13. 窗口函數(排名函數)
rank()跳躍排序,有兩個第二名時后邊跟着的是第四名 dense_rank() 連續排序,有兩個第二名時仍然跟着第三名 over()開窗函數:
在使用聚合函數后,會將多行變成一行,而開窗函數是將一行變成多行;
並且在使用聚合函數后,如果要顯示其他的列必須將列加入到group by中, 而使用開窗函數后,可以不使用group by,直接將所有信息顯示出來。
開窗函數適用於在每一行的最后一列添加聚合函數的結果。
常用開窗函數:
1.為每條數據顯示聚合信息.(聚合函數() over())
2.為每條數據提供分組的聚合函數結果(聚合函數() over(partition by 字段) as 別名)
--按照字段分組,分組后進行計算
3.與排名函數一起使用(row number() over(order by 字段) as 別名) 常用分析函數:(最常用的應該是1.2.3 的排序)
1、row_number() over(partition by ... order by ...) 2、rank() over(partition by ... order by ...)
3、 dense_rank() over(partition by ... order by ...)
4、 count() over(partition by ... order by ...)
5、 max() over(partition by ... order by ...)
6、 min() over(partition by ... order by ...)
7、 sum() over(partition by ... order by ...)
8、 avg() over(partition by ... order by ...)
9、 first_value() over(partition by ... order by ...)
10、 last_value() over(partition by ... order by ...)
11、 lag() over(partition by ... order by ...) 12、lead() over(partition by ... order by ...)
lag 和lead 可以 獲取結果集中,按一定排序所排列的當前行的上下相鄰若干offset 的某個行的某個列(不用結果集的自關聯); lag ,lead 分別是向前,向后;
lag 和lead 有三個參數,第一個參數是列名,第二個參數是偏移的offset,第三個參數是 超出記錄窗口時的默認值
14. SparkSQL-UDF(自定義函數)
15. SparkStreaming對接Kafka的兩種方式(重點)
16. 如何解決數據積壓問題(反壓機制或者增加分區和消費者)
反壓機制:
原因:Spark Streaming在處理不斷流入的數據是通過每間隔一段時間(batch interval),將這段時間內的流入的數據積累為一個batch,然后以這個batch內的數據作為job DAG的輸入RDD提交新的job運行。當一個batch的處理時間大於batch interval時,意味着數據處理速度跟不上數據接收速度,此時在數據接收端(即Receiver 一般數據接收端都運行在executor上)就會積累數據,而數據是通過BlockManager管理的,如果數據存儲采用MEMORY_ONLY模式就會導致OOM,采用 MEMORY_AND_DISK多余的數據保存到磁盤上反而會增加數據讀取時間。
參數:
spark.streaming.backpressure.enabled 設置為 true 開啟反壓 spark.streaming.kafka.maxRatePerPartition 每個partition每秒最多消費條數 spark.streaming.backpressure.rateEstimator 速率估算器類,默認值為 pid ,目前 Spark 只支持這個。
17. 如何保證數據一致性問題(生產者和消費者)
18. Kafka數據傳輸機制(參考圖)
19. Kafka如何保證數據不丟失(消費者)
20. redis數據類型·
21. redis的持久化
RDB方式的持久化是通過快照(snapshotting)完成的,當符合一定條件時Redis會自動將內存中的數據進行快照並持久化到硬盤。
RDB是Redis默認采用的持久化方式,在redis.conf配置文件中默認有此下配置: save 900 1
通過RDB方式實現持久化,一旦Redis異常退出,就會丟失最后一次快照以后更改的所有數據。這就需要開發者根據具體的應用場合,通過組合設置自動快照條件的方式來將可能發生的數據損失控制在能夠接受的范圍。如果數據很重要以至於無法承受任何損失,則可以考慮使用AOF方式進行持久化。
22. redis的擊穿和雪崩(參考面試寶典,最下面)
23. redis的集群原理(集群實現需要注意什么)
24. SparkStreaming累加操作(參考UpdateStateBykey和MapwithState)
https://blog.csdn.net/zhanglh046/article/details/78505124
https://www.cnblogs.com/yinpin2011/p/5539708.html
25. ForeachRDD和Transform的區別
26. 算子的區別(MapPartitions和Map或者foreach和foreachPartition區別)
27. 什么是DStream
Spark Streaming概述
Spark Streaming類似於Apache Storm,用於流式數據的處理。根據其官方文檔介紹,Spark Streaming有高吞吐量和容錯能力強等特點。Spark Streaming支持的數據輸入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和簡單的TCP套接字等等。數據輸入后可以用Spark的高度抽象原語如:map、reduce、join、window等進行運算。而結果也能保存在很多地方,如HDFS,數據庫等。另外Spark Streaming也能和MLlib(機器學習)以及Graphx完美融合。
DStream的概念
Discretized Stream是Spark Streaming的基礎抽象,代表持續性的數據流和經過各種Spark原語操作后的結果數據流。在內部實現上,DStream是一系列連續的RDD來表示。每個RDD含有一段時間間隔內的數據,如下圖:
DStream原語類型介紹(重要)
DStream上的原語與RDD的類似,分為Transformations(轉換)和Output Operations(輸出)兩種,此外轉換操作中還有一些比較特殊的原語,如:updateStateByKey()、transform()以及各種Window相關的原語。
詳解:見Spark Streaming課件
28. 調優文檔(參考Spark調優)
29. 數據傾斜解決方案(參考Spark內核解析和調優指南)
30. JVM調優(參考Spark調優文檔)
31. GC垃圾回收機制(算法原理)
32. 手寫快排或者其他算法(基礎算法)
33. Spark的內存模型
34. 手寫單例模式 hadoop
1. HDFS文件存儲機制(讀寫流程)
2. MR的原理(Map和Reduce) 3. Shuffle原理
4. Hive的內部和外部表
5. Hive的動態分區
6. hive分區分桶
7. Hive和Mysql區別 8. Hive和Hbase區別
9. Hive的調優(參考面試寶典)
10. HBASE的熱點問題(什么時候會觸發,如何避免)
一、 出現熱點問題原因
1、 hbase的中的數據是按照字典序排序的,當大量連續的rowkey集中寫在個別的region,各個region之間數據分布不均衡;
2、 創建表時沒有提前預分區,創建的表默認只有一個region,大量的數據寫入當前 region;
3、 創建表已經提前預分區,但是設計的rowkey沒有規律可循,設計的rowkey應該由 regionNo+messageId組成
二、 解決方案加鹽
這里所說的加鹽不是密碼學中的加鹽,而是在rowkey的前面增加隨機數,具體就是給rowkey分配一個隨機前綴以使得它和之前的rowkey的開頭不同。分配的前綴種類數量應該和你想使用數據分散到不同的 region的數量一致。加鹽之后的rowkey就會根據隨機生成的前綴分散到各個region上,以避免熱點。
哈希
哈希會使同一行永遠用一個前綴加鹽。哈希也可以使負載分散到整個集群,但是讀卻是可以預測的。使用確定的哈希可以讓客戶端重構完整的rowkey,可以使用get操作准確獲取某一個行數據反轉
第三種防止熱點的方法時反轉固定長度或者數字格式的rowkey。這樣可以使得rowkey中經常改變的部分(最沒有意義的部分)放在前面。這樣可以有效的隨機rowkey,但是犧牲了rowkey的有序性。
反轉rowkey的例子以手機號為rowkey,可以將手機號反轉后的字符串作為rowkey,這樣的就避免了以手機號那樣比較固定開頭導致熱點問題時間戳反轉
一個常見的數據處理問題是快速獲取數據的最近版本,使用反轉的時間戳作為rowkey的一部分對這個問題十分有用,可以用 Long.Max_Value - timestamp 追加到key的末尾,例如 [key]
[reverse_timestamp] , [key] 的最新值可以通過scan [key]獲得[key]的第一
11. Hbase的原理(讀取和存儲)
12. HbaseRowKey設計原理
1.RowKey長度原則,最大長度為64KB,一般設計成定長。建議是越短越好,不要超過16個字節,過長占用HFile和內存空間
2.RowKey散列原則:如果RowKey是按時間戳的方式遞增,不要將時間放在二進制碼的前面,建議將
RowKey的高位作為散列字段,由程序循環生成,低位放時間字段,
3.RowKey唯一原則:必須在設計上保證其唯一性。RowKey是按照字典排序存儲的,要充分利用這個排序特點,將經常一起讀取的數據存儲到一塊,將最近可能會被訪問的數據放在一塊。
13. Hbase和其他數據庫相比的優勢(特點)
14. Flume的Source源有哪些
15. Flume高可用(怎么實現高可用)
16. Linux命令、HDFS命令(基礎命令)
17. Zookeeper的選舉機制(內部如何實現)
18. Oozie和Azkaban區別(主要是配置)
19. 布隆過濾器(原理)
掌握一到兩種算法(原理)
需要掌握的事情
1. 項目架構:所有的
2. 項目流程:數據走向,數據處理時候的指標
3. 項目人員搭配
4. 項目的數據量
5. 項目的問題 kafka問題 居多(數據一致性,數據完整性,分區動態擴容,數據積壓,kafka吞吐量) sparkStreaming問題(批次數據量,批次間隔 ,job等待)
6. 集群規模 (實現高可用)
7. 項目描述