一、spark streaming和storm有何區別?
一個實時毫秒,一個准實時亞秒,不過storm的吞吐率比較低。
二、spark有哪些組件?
Master:管理集群和節點,不參與計算。
Worker:計算節點,進程本身不參與計算,和master匯報。
Driver:運行程序的main方法,創建sparkcontext對象。
Spark context:控制整個application的生命周期,包括DAGSchedular和TaskSchedular等組件。
Client:用戶提交程序的入口。
三、spark的工作機制
用戶在client端提交作業后,會由Driver運行main方法並創建spark context。
執行RDD算子,形成DAG圖輸入DAGSchedular,按照RDD之間的依賴關系划分stage輸入TaskSchedular。
TaskSchedular會將stage划分為task set分發到各個節點的executor中執行。
四、spark中的寬窄依賴
RDD和他依賴的父RDD的關系有兩種不同的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。
寬依賴:指的是多個子RDD的partition會依賴同一個父RDD的partition。
窄依賴:指的是每一個父RDD的partition最多被子RDD的一個Partition使用。
父RDD中,每個分區內的數據,都只會被子RDD中特定的分區所消費,為窄依賴:
父RDD中,分區內的數據,會被子RDD內多個分區消費,則為寬依賴:
五、spark中如何划分stage?
1.spark application中可以因為不同的action觸發眾多的job,一個Application中可以有很多job,每個job是有一個或多個stage構成的,后面的stage依賴於前面的stage,也就是說只有前面的stage計算完畢后,后面的stage才會運行。
2.stage划分的依據是寬依賴,何時產生寬依賴,例如ReduceBykey,GroupByKey的算子,會導致寬依賴的產生。
3.由Action算子(例如collect)導致了SparkContext.RunJob的執行,最終導致了DAGSchedular的submitJob的執行,其核心是通過發送一個case class Jobsubmitted對象給eventProcessLoop。
EventProcessLoop是DAGSchedularEventProcessLoop的具體事例,而DAGSchedularEventProcessLoop是eventLoop的子類,具體實現EventLoop的onReceiver方法,onReceiver方法轉過來回調doOnReceive。
4.在handleJobSubmitted中首先創建finalStage,創建finalStage時候會建立父Stage的依賴鏈條。
總結:依賴是從代碼的邏輯層面上來展開說的,可以簡單點說:寫介紹什么是RDD中的寬窄依賴,然后再根據DAG有向無環圖進行划分,從當前job的最后一個算子往前推,遇到寬依賴,那么當前在這個批次中的所有算子操作都划分成一個stage,然后繼續按照這種方式再繼續往前推,如再遇到寬依賴,又划分成一個stage,一直到最前面的一個算子。最后整個job會被划分成多個stage,而stage之間又存在依賴關系,后面的stage依賴於前面的stage。
六、spark-submit的時候如何引入外部jar包
在通過spark-submit提交任務時,可以通過添加配置參數來指定:
--driver-class-path 外部jar包
--jars 外部jar包
七、spark中cache和persist的區別?
Cache:緩存數據,默認是緩存在內存中,,其本質還是調用persist
Persist:緩存數據,有豐富的緩存策略。數據可以保存在內存也可以保存在磁盤中,使用的時候指定對應的緩存級別。
八、flume整合Spark Streaming問題。
(1)如何實現Spark Streaming讀取flume中的數據
可以這樣說:
前期經過技術調研,在查看官網資料,發現Spark Streaming整合flume有兩種方式:拉模式,推模式。
拉模式:Flume把數據push到Spark Streaming
推模式:Spark Streaming從flume中poll數據
(2)在實際開發的時候是如何保證數據不丟失的
可以這樣說:
flume那邊采用的channel是將數據落地到磁盤中,保證數據源端安全性(可以在補充一下,flume在這里的channel可以設置為memory內存中,提高數據接收處理的效率,但是由於數據在內存中,安全機制保證不了,故選擇channel為磁盤存儲。整個流程運行有一點的延遲性)
Spark Streaming通過拉模式整合的時候,使用了FlumeUtils這樣一個類,該類是需要依賴一個額外的jar包(spark-streaming-flume_2.10)
要想保證數據不丟失,數據的准確性,可以在構建StreamingConext的時候,利用StreamingContext.getOrCreate(checkpoint, creatingFunc: () => StreamingContext)來創建一個StreamingContext,使用StreamingContext.getOrCreate來創建StreamingContext對象,傳入的第一個參數是checkpoint的存放目錄,第二參數是生成StreamingContext對象的用戶自定義函數。如果checkpoint的存放目錄存在,則從這個目錄中生成StreamingContext對象;如果不存在,才會調用第二個函數來生成新的StreamingContext對象。在creatingFunc函數中,除了生成一個新的StreamingContext操作,還需要完成各種操作,然后調用ssc.checkpoint(checkpointDirectory)來初始化checkpoint功能,最后再返回StreamingContext對象。
這樣,在StreamingContext.getOrCreate之后,就可以直接調用start()函數來啟動(或者是從中斷點繼續運行)流式應用了。如果有其他在啟動或繼續運行都要做的工作,可以在start()調用前執行。
流失計算中使用checkpoint的作用:
保存元數據,包括流式應用的配置、流式沒崩潰之前定義的各種操作、未完成所有操作的batch。元數據被存儲到容忍失敗的存儲系統上,如HDFS。這種ckeckpoint主要針對driver失敗后的修復。
保存流式數據,也是存儲到容忍失敗的存儲系統上,如HDFS。這種ckeckpoint主要針對window operation、有狀態的操作。無論是driver失敗了,還是worker失敗了,這種checkpoint都夠快速恢復,而不需要將很長的歷史數據都重新計算一遍(以便得到當前的狀態)。
設置流式數據checkpoint的周期
對於一個需要做checkpoint的DStream結構,可以通過調用DStream.checkpoint(checkpointInterval)來設置ckeckpoint的周期,經驗上一般將這個checkpoint周期設置成batch周期的5至10倍。
使用write ahead logs功能
這是一個可選功能,建議加上。這個功能將使得輸入數據寫入之前配置的checkpoint目錄。這樣有狀態的數據可以從上一個checkpoint開始計算。開啟的方法是把spark.streaming.receiver.writeAheadLogs.enable這個property設置為true。另外,由於輸入RDD的默認StorageLevel是MEMORY_AND_DISK_2,即數據會在兩台worker上做replication。實際上,Spark Streaming模式下,任何從網絡輸入數據的Receiver(如kafka、flume、socket)都會在兩台機器上做數據備份。如果開啟了write ahead logs的功能,建議把StorageLevel改成MEMORY_AND_DISK_SER。修改的方法是,在創建RDD時由參數傳入。
使用以上的checkpoint機制,確實可以保證數據0丟失。但是一個前提條件是,數據發送端必須要有緩存功能,這樣才能保證在spark應用重啟期間,數據發送端不會因為spark streaming服務不可用而把數據丟棄。而flume具備這種特性,同樣kafka也具備。
九、hadoop和spark的shuffle相同和差異?
(1)從high-level的角度來看,兩者並沒有大的差別。都是將mapper(Spark中是ShuffleMapTask)的輸出進行partition,不同的partition送到不同的reducer(Spark里的reducer可能是下一個stage的ShuffleMapTask,也可能是ResultTask)。Reducer以內存做緩沖區,邊shuffle邊aggregate數據,等數據aggregate好之后再進行reduce()(Spark里可能是后續的一系列操作)
(2)從low-level的角度來看,兩者差距不小。Hadoop MapReduce是sort-based,進入combiner()和reduce()的records必須先sort。這樣的好處在於combiner()/reduce()可以處理大規模的數據,因為其輸入數據可以通過外排得到(mapper對每段數據先做排序,reducer的shuffle對排好序的每段數據做歸並)。目前spark選擇的是hash-based,通常使用HashMap對shuffle來的數據進行aggregate,不會對數據進行提前排序。如果用戶需要進行排序的數據,那么要自己調用類似SortByKey()的操作。
(3)從現實角度來看,兩者也有不小差距。Hadoop MapReduce將處理流程划分出明顯的幾個階段:map(),spill,merge,shuffle,sort,reduce()等。每個階段各司機制,可以按照過程式的編程思想來逐一實現每個階段的功能。在Spark中,沒有這樣功能明確的階段,只有不同的stage和一系列的transformation(),所以spill、sort、aggregate等操作需要蘊含在transformation()中。如果我們將map()端划分數據、持久化數據的過程稱為shuffle write,而將reducer讀入數據、aggregate數據的過程稱為shuffle read。那么在spark中,問題就變成怎么在job的邏輯或者物理執行圖中加入shuffle write、shuffle read的處理邏輯,以及兩個處理邏輯怎么高效實現。Shuffle write由於不要求數據有序,shuffle write的任務很簡單:將數據partition好,並持久化。之所以要持久化,一方面是要減少內存存儲空間壓力,另一方面也是為了fault-tolerance。
十、RDD的五大特性
(1) A list of partition
一個RDD有一系列的分區/分片
A function for computing each split/partition
對RDD的每一個分區/分片都作用同一個函數
A list of dependencies on others RDDs
有一些依賴,在其他的RDD上
Optionally,a Partitioner for key-value RDDs(e.g to say that the RDD is hash-partitioned)
可選的,對於key-value的RDD的分區策略。
Optionally,a list of preferred locations to compute each split on(e.g. block locations for an HDFS file)
可選的,數據在哪兒優先把作業調度到數據所在節點進行計算:移動數據不如移動計算
十一、spark的優勢和劣勢
優勢:
1.速度快
2.其次,Spark是一個靈活的運算框架,適合做批次處理、工作流、交互式分析、流量處理等不同類型的應用,因此spark也可以成為一個用途廣泛的運算引擎,並在未來取代MapReduce的地位
3.最后,Spark可以與Hadoop生態系統的很多組件互相操作。Spark可以運行在新一代資源管理框架YARN上,它還可以讀取已有並存放在Hadoop上的數據,這是個非常大的優勢
劣勢:
1.穩定性方面
2.不能處理大數據
3.不能支持復雜的SQL統計
十二、spark的shuffle過程
1.Spark的shuffle總體而言就包括兩個基本的過程:Shuffle write和Shuffle read。ShuffleMapTask的整個執行過程就是Shuffle write。將數據根據hash的結果,將各個Reduce分區的數據寫到各自的磁盤中,寫數據時不做排序操作。
2.首先是將map的輸出結果送到對應的緩沖區bucket中,每個bucket里的文件都會被寫入本地磁盤文件ShuffleBlockFile中,形成一個FileSegment文件。
3.Shuffle Read指的是reducer對屬於自己的FileSegment文件進行fetch操作,這里采用的netty框架,fetch操作會等到所有的Shuffle write過程結束后再進行,.reducer通過fetch得到的FileSegment先放在緩沖區softBuffer中,默認大小45MB。
十三、spark sql為什么比hive快?
1.消除了冗余的HDFS讀寫
2.消除了冗余的MapReduce階段
3.JVM的優化
十四、Spark工作的一個流程
1.構造Spark Application的運行環境(啟動SparkContext),SparkContext向資源管理器(可以是standalone、Mesos或Yarn)注冊並申請運行Executor資源;
2.資源管理器分配Executor資源,Executor運行情況將隨着心跳發送到資源管理器上;
3.SparkContext構建DAG圖,將DAG圖分解成Stage,並將Taskset發送給TaskSchedular。Executor向SparkContext申請Task,TaskSchedular將Task發送給Executor運行同時SparkContext將應用程序代碼發送給Executor。
4.Task在Executor上運行,運行完畢釋放所有資源。
十五、對Spark streaming進行性能優化?
1.降低批次處理時間:
①數據接收並行度。
(1)增加DStream:接收網絡數據(如Kafka,flume,Socket等)時會對數據進行反序列化再存儲在Spark,由於一個DStream只有Receiver對象,如果成為瓶頸可考慮增加DStream。
(2)設置”spark.streaming.blockInterval”參數:接受的數據被存儲在Spark內存前,會被合並成block,而block數量決定了task數量;舉例,當批次時間間隔為2秒且block時間間隔為200毫秒時,Task數量約為10;如果Task數量過低,則浪費了cpu資源;推薦的最小block時間間隔為50ms。
(3)顯式對Input DStream重新分區:再進行更深層次處理前,先對輸入數據進行重新分區。
②數據處理並行度:reduceByKey,reduceByKeyAndWindow等operation可通過設置”spark.default.parallelism”參數或顯式設置並行度方法參數控制。
③數據序列化:可配置更高效的kryo序列化。
2.設置合理批次時間間隔:
①原則:處理數據的速度應大於或等於數據輸入的速度,即批次處理時間大於或等於批次時間間隔。
②方法:
(1)先設置批次時間間隔為5~10秒數據輸入速度;
(2)再通過查看log4j日志中的”Total delay”,逐步調整批次時間間隔,保證”Total delay”小於批次時間間隔。
3.內存調優:
①持久化級別:開啟壓縮,設置參數”spark.rdd.compress”;
②GC策略:在Driver和Executor上開啟CMS(Content Management System 內容管理系統)
十六、Spark on Yarn VS standalone
Yarn:你只需要一個節點,然后提交作業即可。這個是不需要spark集群的(不需要啟動master和worker)
Standalone:你的spark集群上每個節點上都要部署spark,然后需要啟動spark集群。
十七、Spark on Yarn的兩種模式
Spark on Yarn支持client和cluster模式:driver運行在哪里
Client:driver運行在本地,提交作業的進程是不能停止的,否則作業就掛了。
Cluster:提交完作業,那么提交作業端就可以斷開了,因為driver運行在am(application master)端。
十八、Spark和Hadoop重要概念區分
十九、spark優化之內存管理。
Spark中的內存管理主要分為兩個方面:執行和存儲。
執行端的內存主要是涉及到shuffle,join,sorts和aggregatations時的計算,存儲端的內存主要涉及到cache。在spark中,執行和存儲都是共享一個統一的region。當執行端沒有使用內存時,存儲端就能獲得所有的內存信息,反之一樣。在必要的時候,執行可以剔除存儲,但是存儲的時候可以設置一個閾值。
還可以看一個RDD消耗多少內存,在webUI或者使用SizeEstimator’s estimate方法。
內存使用的百分比是(堆內存-300MB)*0.6,執行和存儲各占50%
二十、spark優化之廣播變量。
使用廣播變量在sparkContext中,可以大幅降低每一個序列化task這個對象的大小,集群中啟動一個job的成本也會降低。如果你的task中使用了一個大對象(large object),考慮把他優化成一個廣播變量。通常來說,一個task大於20KB就值得優化。
Spark兩種共享變量:廣播變量(broadcast variable)與累加器accumulator
累加器用來對信息進行聚合,而廣播變量用來高效分發較大的對象。
共享變量出現的原因:
通常在向 Spark 傳遞函數時,比如使用 map() 函數或者用 filter() 傳條件時,可以使用驅動器程序中定義的變量,但是集群中運行的每個任務都會得到這些變量的一份新的副本,更新這些副本的值也不會影響驅動器中的對應變量。Spark 的兩個共享變量,累加器與廣播變量,分別為結果聚合與廣播這兩種常見的通信模式突破了這一限制。
使用廣播變量的過程很簡單:
(1) 通過對一個類型 T 的對象調用 SparkContext.broadcast 創建出一個 Broadcast[T] 對象。任何可序列化的類型都可以這么實現。
(2) 通過 value 屬性訪問該對象的值(在 Java 中為 value() 方法)。
(3) 變量只會被發到各個節點一次,應作為只讀值處理(修改這個值不會影響到別的節點)
累加器的用法如下所示:
案例如下:
object BroadcastTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("broadcast")
val sc = new SparkContext(conf)
val list = List("hello java")
val broadcast = sc.broadcast(list)
val linesRDD = sc.textFile("./word")
linesRDD.filter(line => {
broadcast.value.contains(line)
}).foreach(println)
sc.stop()
}
}
注意事項:
能不能將一個RDD使用廣播變量廣播出去?
不能,因為RDD是不存儲數據的。可以將RDD的結果廣播出去。
廣播變量只能在Driver端定義,不能在Executor端定義。
(1)通過在driver中調用 SparkContext.accumulator(initialValue) 方法,創建出存有初始值的累加器。返回值為 org.apache.spark.Accumulator[T] 對象,其中 T 是初始值initialValue 的類型。
(2)Spark閉包(函數序列化)里的excutor代碼可以使用累加器的 += 方法(在Java中是 add )增加累加器的值。
(3)driver程序可以調用累加器的 value 屬性(在 Java 中使用 value() 或 setValue() )來訪問累加器的值。
案例如下:
object AccumulatorTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("accumulator")
val sc = new SparkContext(conf)
val accumulator = sc.accumulator(0); //創建accumulator並初始化為0
val linesRDD = sc.textFile("./word")
val result = linesRDD.map(s => {
accumulator.add(1) //有一條數據就增加1
s
})
result.collect();
println("words lines is :" + accumulator.value)
sc.stop()
}
}
累加器在Driver端定義賦初始值,累加器只能在Driver端讀取,在Excutor端更新
二十一、spark優化之數據本地性。
數據本地性是有很大的影響在Spark job的程序中。如果數據和代碼在一起,計算速度就會非常快。但是如果數據和代碼是分開的,一個必須要移動到另外一個上去。通常情況下是把序列化后的代碼移動到數據所在的節點上,因為代碼的大小比數據小很多(移動計算,而不是移動數據)。Spark構建的調度就是基於數據本地性。
數據本地性指的是數據和代碼有多近(close)。由近及遠有下面locality level:
1.PROCESS_LOCAL:數據在一個相同的正在運行的代碼的JVM中。
2.NODE_LOCAL:數據在同一個節點。
3.NO_PREF:數據不管在哪里都可以快速的訪問到。(無本地性)
4.RACK_LOCAL:數據在相同的機架上。但是數據在同一個機架的不同server上,需要通過網絡傳輸。
5.ANY:數據在網絡的其他地方,不在一個機架上。
Spark會優先安排作業在最佳的locality level上,但是不太可能。
二十二、Spark on Yarn 模式有哪些優點?
1)與其他計算框架共享集群資源(eg.Spark框架與MapReduce框架同時運行,如果不用Yarn進行資源分配,MapReduce分到的內存資源會很少,效率低下);資源按需分配,進而提高集群資源利用等。
2)相較於Spark自帶的Standalone模式,Yarn的資源分配更加細致
3)Application部署簡化,例如Spark,Storm等多種框架的應用由客戶端提交后,由Yarn負責資源的管理和調度,利用Container作為資源隔離的單位,以它為單位去使用內存,cpu等。
4)Yarn通過隊列的方式,管理同時運行在Yarn集群中的多個服務,可根據不同類型的應用程序負載情況,調整對應的資源使用量,實現資源彈性管理。
二十三、spark中task有幾種類型?
2種類型:1)result task類型,最后一個task,
2)是shuffleMapTask類型,除了最后一個task都是。
二十四、spark中map和mapPartition的區別?
rdd的mapPartitions是map的一個變種,它們都可進行分區的並行處理。
兩者的主要區別是調用的粒度不一樣:map的輸入變換函數是應用於RDD中每個元素,而mapPartitions的輸入函數是應用於每個分區。
假設一個rdd有10個元素,分成3個分區。如果使用map方法,map中的輸入函數會被調用10次;而使用mapPartitions方法的話,其輸入函數會只會被調用3次,每個分區調用1次。
這兩個方法的另一個區別是在大數據集情況下的資源初始化開銷和批處理處理,如果在map和mapPartition中都要初始化一個耗時的資源,然后使用,比如數據庫連接。在上面的例子中,mapPartition只需初始化3個資源(3個分區每個1次),而map要初始化10次(10個元素每個1次),顯然在大數據集情況下(數據集中元素個數遠大於分區數),mapPartitons的開銷要小很多,也便於進行批處理操作。
mapPartitionsWithIndex和mapPartitons類似,只是其參數多了個分區索引號。
二十四、python開發spark如何在提交作業的時候添加python的第三方模塊?
可以使用--py--files參數,但是應放在運行腳本的前面。所有的import操作必須在context完成之后。
二十五、什么是Spark Executor?
當SparkContext連接到集群管理器時,它會在集群中的節點上獲取Executor。 executor是Spark進程,它運行計算並將數據存儲在工作節點上。 SparkContext的最終任務被轉移到executors以執行它們。