Spark簡介
spark 可以很容易和yarn結合,直接調用HDFS、Hbase上面的數據,和hadoop結合。配置很容易。
spark發展迅猛,框架比hadoop更加靈活實用。減少了延時處理,提高性能效率實用靈活性。也可以與hadoop切實相互結合。
spark核心部分分為RDD。Spark SQL、Spark Streaming、MLlib、GraphX、Spark R等核心組件解決了很多的大數據問題,其完美的框架日受歡迎。其相應的生態環境包括zepplin等可視化方面,正日益壯大。大型公司爭相實用spark來代替原有hadoop上相應的功能模塊。Spark讀寫過程不像hadoop溢出寫入磁盤,都是基於內存,因此速度很快。另外DAG作業調度系統的寬窄依賴讓Spark速度提高。
Spark核心組成
1、RDD
是彈性分布式數據集,完全彈性的,如果數據丟失一部分還可以重建。有自動容錯、位置感知調度和可伸縮性,通過數據檢查點和記錄數據更新金象容錯性檢查。通過SparkContext.textFile()加載文件變成RDD,然后通過transformation構建新的RDD,通過action將RDD存儲到外部系統。
RDD使用延遲加載,也就是懶加載,只有當用到的時候才加載數據。如果加載存儲所有的中間過程會浪費空間。因此要延遲加載。一旦spark看到整個變換鏈,他可以計算僅需的結果數據,如果下面的函數不需要數據那么數據也不會再加載。轉換RDD是惰性的,只有在動作中才可以使用它們。
Spark分為driver和executor,driver提交作業,executor是application早worknode上的進程,運行task,driver對應為sparkcontext。Spark的RDD操作有transformation、action。Transformation對RDD進行依賴包裝,RDD所對應的依賴都進行DAG的構建並保存,在worknode掛掉之后除了通過備份恢復還可以通過元數據對其保存的依賴再計算一次得到。當作業提交也就是調用runJob時,spark會根據RDD構建DAG圖,提交給DAGScheduler,這個DAGScheduler是在SparkContext創建時一同初始化的,他會對作業進行調度處理。當依賴圖構建好以后,從action開始進行解析,每一個操作作為一個task,每遇到shuffle就切割成為一個taskSet,並把數據輸出到磁盤,如果不是shuffle數據還在內存中存儲。就這樣再往前推進,直到沒有算子,然后運行從前面開始,如果沒有action的算子在這里不會執行,直到遇到action為止才開始運行,這就形成了spark的懶加載,taskset提交給TaskSheduler生成TaskSetManager並且提交給Executor運行,運行結束后反饋給DAGScheduler完成一個taskSet,之后再提交下一個,當TaskSet運行失敗時就返回DAGScheduler並重新再次創建。一個job里面可能有多個TaskSet,一個application可能包含多個job。
2、Spark Streaming
通過對kafka數據讀取,將Stream數據分成小的時間片段(幾秒),以類似batch批處理的方式來處理這一部分小數據,每個時間片生成一個RDD,有高效的容錯性,對小批量數據可以兼容批量實時數據處理的邏輯算法,用一些歷史數據和實時數據聯合進行分析,比如分類算法等。也可以對小批量的stream進行mapreduce、join等操作,而保證其實時性。針對數據流時間要求不到毫秒級的工程性問題都可以。
Spark Streaming也有一個StreamingContext,其核心是DStream,是通過以組時間序列上的連續RDD來組成的,包含一個有Time作為key、RDD作為value的結構體,每一個RDD都包含特定時間間隔的數據流,可以通過persist將其持久化。在接受不斷的數據流后,在blockGenerator中維護一個隊列,將流數據放到隊列中,等處理時間間隔到來后將其中的所有數據合並成為一個RDD(這一間隔中的數據)。其作業提交和spark相似,只不過在提交時拿到DStream內部的RDD並產生Job提交,RDD在action觸發之后,將job提交給jobManager中的JobQueue,又jobScheduler調度,JobScheduler將job提交到spark的job調度器,然后將job轉換成為大量的任務分發給spark集群執行。Job從outputStream中生成的,然后觸發反向回溯執行DStreamDAG。在流數據處理的過程中,一般節點失效的處理比離線數據要復雜。Spark streamin在1.3之后可以周期性的將DStream寫入HDFS,同時將offset也進行存儲,避免寫到zk。一旦主節點失效,會通過checkpoint的方式讀取之前的數據。當worknode節點失效,如果HDFS或文件作為輸入源那Spark會根據依賴關系重新計算數據,如果是基於Kafka、Flume等網絡數據源spark會將手機的數據源在集群中的不同節點進行備份,一旦有一個工作節點失效,系統能夠根據另一份還存在的數據重新計算,但是如果接受節點失效會丟失一部分數據,同時接受線程會在其他的節點上重新啟動並接受數據。
3、Graphx
主要用於圖的計算。核心算法有PageRank、SVD奇異矩陣、TriangleConut等。
4、Spark SQL
是Spark新推出的交互式大數據SQL技術。把sql語句翻譯成Spark上的RDD操作可以支持Hive、Json等類型的數據。
5、Spark R
通過R語言調用spark,目前不會擁有像Scala或者java那樣廣泛的API,Spark通過RDD類提供Spark API,並且允許用戶使用R交互式方式在集群中運行任務。同時集成了MLlib機器學習類庫。
6、MLBase
從上到下包括了MLOptimizer(給使用者)、MLI(給算法使用者)、MLlib(給算法開發者)、Spark。也可以直接使用MLlib。ML Optimizer,一個優化機器學習選擇更合適的算法和相關參數的模塊,還有MLI進行特征抽取和高級ML編程 抽象算法實現API平台,MLlib分布式機器學習庫,可以不斷擴充算法。MLRuntime基於spark計算框架,將Spark的分布式計算應用到機器學習領域。MLBase提供了一個簡單的聲明方法指定機器學習任務,並且動態地選擇最優的學習算法。
7、Tachyon
高容錯的分布式文件系統。宣稱其性能是HDFS的3000多倍。有類似java的接口,也實現了HDFS接口,所以Spark和MR程序不需要任何的修改就可以運行。目前支持HDFS、S3等。
8、Spark算子
1、Map。對原數據進行處理,類似於遍歷操作,轉換成MappedRDD,原分區不變。
2、flatMap。將原來的RDD中的每一個元素通過函數轉換成新的元素,將RDD的每個集合中的元素合並成一個集合。比如一個元素里面多個list,通過這個函數都合並成一個大的list,最經典的就是wordcount中將每一行元素進行分詞以后成為,通過flapMap變成一個個的單詞,line.flapMap(_.split(“ ”)).map((_,1))如果通過map就會將一行的單詞變成一個list。
3、mapPartitions。對每個分區進行迭代,生成MapPartitionsRDD。
4、Union。是將兩個RDD合並成一個。使用這個函數要保證兩個RDD元素的數據類型相同,返回的RDD的數據類型和被合並的RDD數據類型相同。
5、Filter。其功能是對元素進行過濾,對每個元素調用f函數,返回值為true的元素就保留在RDD中。
6、Distinct。對RDD中元素進行去重操作。
7、Subtract。對RDD1中取出RDD1與RDD2交集中的所有元素。
8、Sample。對RDD中的集合內元素進行采樣,第一個參數withReplacement是true表示有放回取樣,false表示無放回。第二個參數表示比例,第三個參數是隨機種子。如data.sample(true, 0.3,new Random().nextInt())。
9、takeSample。和sample用法相同,只不第二個參數換成了個數。返回也不是RDD,而是collect。
10、Cache。將RDD緩存到內存中。相當於persist(MEMORY_ONLY)。可以通過參數設置緩存和運行內存之間的比例,如果數據量大於cache內存則會丟失。
11、Persist。里面參數可以選擇DISK_ONLY/MEMORY_ONLY/MEMORY_AND_DISK等,其中的MEMORY_AND_DISK當緩存空間滿了后自動溢出到磁盤。
12、MapValues。針對KV數據,對數據中的value進行map操作,而不對key進行處理。
13、reduceByKey。針對KV數據將相同key的value聚合到一起。與groupByKey不同,會進行一個類似mapreduce中的combine操作,減少相應的數據IO操作,加快效率。如果想進行一些非疊加操作,我們可以將value組合成字符串或其他格式將相同key的value組合在一起,再通過迭代,組合的數據拆開操作。
14、partitionBy。可以將RDD進行分區,重新生成一個ShuffleRDD,進行一個shuffle操作,對后面進行頻繁的shuffle操作可以加快效率。
15、randomSplit。對RDD進行隨機切分。如data.randomSplit(new double[]{0.7, 0.3})返回一個RDD的數組。
16、Cogroup。對兩個RDD中的KV元素,每個RDD中相同key中的元素分別聚合成一個集合。與reduceByKey不同的是針對兩個RDD中相同的key的元素進行合並。
17、Join。相當於inner join。對兩個需要連接的RDD進行cogroup,然后對每個key下面的list進行笛卡爾積的操作,輸出兩兩相交的兩個集合作為value。 相當於sql中where a.key=b.key。
18、leftOutJoin,rightOutJoin。在數據庫中左連接以左表為坐標將表中所有的數據列出來,右面不存在的用null填充。在這里面對join的基礎上判斷左側的RDD元素是否是空,如果是空則填充。右連接則相反。
19、saveAsTestFile。將數據輸出到HDFS的指定目錄。
20、saveAsObjectFile。寫入HDFS為SequenceFile格式。
21、Collect、collectAsMap。將RDD轉換成list或者Map。結果以List或者HashMap的方式輸出。
22、Count。對RDD的元素進行統計,返回個數。
23、Top(k)。返回最大的k個元素,返回List的形式。
24、Take返回數據的前k個元素。
25、takeOrdered。返回數據的最小的k個元素,並在返回中保持元素的順序。
9、Tips
1、RDD.repartition(n)可以在最初對RDD進行分區操作,這個操作實際上是一個shuffle,可能比較耗時,但是如果之后的action比較多的話,可以減少下面操作的時間。其中的n值看cpu的個數,一般大於2倍cpu,小於1000。
2、Action不能夠太多,每一次的action都會將以上的taskset划分一個job,這樣當job增多,而其中task並不釋放,會占用更多的內存,使得gc拉低效率。
3、在shuffle前面進行一個過濾,減少shuffle數據,並且過濾掉null值,以及空值。
4、groupBy盡量通過reduceBy替代。reduceBy會在work節點做一次reduce,在整體進行reduce,相當於做了一次hadoop中的combine操作,而combine操作和reduceBy邏輯一致,這個groupBy不能保證。
5、做join的時候,盡量用小RDD去join大RDD,用大RDD去join超大的RDD。
6、避免collect的使用。因為collect如果數據集超大的時候,會通過各個work進行收集,io增多,拉低性能,因此當數據集很大時要save到HDFS。
7、RDD如果后面使用迭代,建議cache,但是一定要估計好數據的大小,避免比cache設定的內存還要大,如果大過內存就會刪除之前存儲的cache,可能導致計算錯誤,如果想要完全的存儲可以使用persist(MEMORY_AND_DISK),因為cache就是persist(MEMORY_ONLY)。
8、設置spark.cleaner.ttl,定時清除task,因為job的原因可能會緩存很多執行過去的task,所以定時回收可能避免集中gc操作拉低性能。
9、適當pre-partition,通過partitionBy()設定,每次partiti
轉:http://www.cnblogs.com/hellochennan/p/5372946.html