福利 => 每天都推送
不多說,直接上干貨!
從博客分為Spark編程模型(上)、Spark編程模型(中)和Spark編程模型(下)。
一、Spark編程模型(上)
從Hadoop MR到Spark
回顧hadoop—mapreduce計算過程
MR VS Spark
Spark編程模型
核心概念
注意:對比mr里的概念來學習。
這些概念,大家一定要好好理解!!!
Spark Application的組成
其中,Driver Program是運行main函數並且新建SparkContext的程序
Cluster Manager是在集群上獲取資源的外部服務(例如: standalonde、Mesos、Yarn)
Worker Node是集群中任何運行應用代碼的節點
Executor是在一個worker node上為某應用啟動的一個進程,該進程負責運行任務,並且負責將數據存在內存或磁盤上。每個應用都有各自獨立的executors。
Task是被送到某個executor上的工作單元
Spark應用程序的組成
(1)Driver
(2)Executor
注意:對照helloworld來思考
Spark Application基本概念
Spark Application編程模型
Spark 應用程序編程模型
– Driver Program ( SparkContext )
– Executor ( RDD 操作)
(1)輸入Base-> RDD
(2)Transformation RDD->RDD
(3)Action RDD->driver or Base
(4)緩存 Persist or cache()
– 共享變量
(1)broadcast variables(廣播變量)
(2)accumulators(累加器)
回顧Spark Hello World
初識RDD
什么是RDD
定義:Resilient distributed datasets (RDD), an efficient, general-purpose and fault-tolerant abstraction for sharing data in cluster applications.
RDD 是只讀的。 RDD 是分區記錄的集合。 RDD 是容錯的。--- lineage RDD 是高效的。 RDD 不需要物化。---物化:進行實際的變換並最終寫入穩定的存儲器上 RDD 可以緩存的。---課指定緩存級別
RDD是spark的核心,也是整個spark的架構基礎,RDD是彈性分布式集合(Resilient Distributed Datasets)的簡稱,是分布式只讀且已分區集合對象。這些集合是彈性的,如果數據集一部分丟失,則可以對它們進行重建。
RDD接口
RDD的本質特征
RDD--partitions
Spark中將1~100的數組轉換為rdd
通過第15行的size獲得rdd的partition的個數,此處創建rdd顯式指定定分區個數2,默認數值是這個程序所分配到的資源的cpu核的個數
RDD-preferredLocations
返回此RDD的一個partition的數據塊信息,如果一個數據塊(block)有多個備份在返回所有備份的location地址信息
主機ip或域名
作用:spark在進行任務調度室盡可能根據block的地址做到本地計算
RDD-dependencies
RDD之間的依賴關系分為兩類:
(1)窄依賴
每個父RDD的分區都至多被一個子RDD的分區使用,即為OneToOneDependecies;
(2)寬依賴
多個子RDD的分區依賴一個父RDD的分區,即為ShuffleDependency 。例如,map操作是一種窄依賴,而join操作是一種寬依賴(除非父RDD已經基於Hash策略被划分過了,co-partitioned)
(1)窄依賴相比寬依賴更高效資源消耗更少
(2)允許在單個集群節點上流水線式執行,這個節點可以計算所有父級分區。例如,可以逐個元素地依次執行filter操作和map操作。
(3)相反,寬依賴需要所有的父RDD數據可用並且數據已經通過類MapReduce的操作shuffle完成。
(4) 在窄依賴中,節點失敗后的恢復更加高效。因為只有丟失的父級分區需要重新計算,並且這些丟失的父級分區可以並行地在不同節點上重新計算。
(5)與此相反,在寬依賴的繼承關系中,單個失敗的節點可能導致一個RDD的所有先祖RDD中的一些分區丟失,導致計算的重新執行。
RDD-compute
分區計算
Spark對RDD的計算是以partition為最小單位的,並且都是對迭代器進行復合,不需要保存每次的計算結果
RDD- partitioner
分區函數:目前spark中提供兩種分區函數:
(1)HashPatitioner(哈希分區)
(2)RangePatitioner(區域分區)
且partitioner只存在於(K,V)類型的RDD中,rdd本身決定了分區的數量。
RDD- lineage
val lines = sc.textFile("hdfs://...") // transformed RDDs val errors = lines.filter(_.startsWith("ERROR")) val messages = errors.map(_.split("\t")).map(r => r(1)) messages.cache() // action 1 messages.filter(_.contains("mysql")).count() // action 2 messages.filter(_.contains("php")).count()
RDD經過trans或action后產生一個新的RDD,RDD之間的通過lineage來表達依賴關系,lineage是rdd容錯的重要機制,rdd轉換后的分區可能在轉換前分區的節點內存中
典型RDD的特征
不同角度看RDD
Scheduler Optimizations
Spark編程模型(中)
創建RDD
方式一:從集合創建RDD
(1)makeRDD
(2)Parallelize
注意:makeRDD可以指定每個分區perferredLocations參數,而parallelize則沒有。
方式二:讀取外部存儲創建RDD
Spark與Hadoop完全兼容,所以對Hadoop所支持的文件類型或者數據庫類型,Spark同樣支持。
(1)多文件格式支持:
(2)多文件系統支持:
1)本地文件系統
2)S3
3)HDFS
(3)數據庫
1)JdbcRDD
2)spark-cassandra-connector(datastax/spark-cassandra-connector)
3)org.apache.hadoop.hbase.mapreduce.TableInputFormat(SparkContext.newAPIHadoopRDD)
4)Elasticsearch-Hadoop
transformation操作
惰性求值
(1)RDD 的轉化操作都是惰性求值的。這意味着在被調用行動操作之前Spark 不會開始計算
(2)讀取數據到RDD的操作也是惰性的
(3)惰性求值的好處:
a.Spark 使用惰性求值可以把一些操作合並到一起來減少計算數據的步驟。在類似 Hadoop MapReduce 的系統中,開發者常常花費大量時間考慮如何把操作組合到一起,以減少MapReduce 的周期數。
b.而在Spark 中,寫出一個非常復雜的映射並不見得能比使用很多簡單的連續操作獲得好很多的性能。因此,用戶可以用更小的操作來組織他們的程序,這樣也使這些操作更容易管理。
轉換操作
RDD 的轉化操作是返回新RDD 的操作
我們不應該把RDD 看作存放着特定數據的數據集,而最好把每個RDD 當作我們通過轉化操作構建出來的、記錄如何計算數據的指令列表。
基本轉換操作1
基本轉換操作2
控制操作
(1)persist操作,可以將RDD持久化到不同層次的存儲介質,以便后續操作重復使用。
1)cache:RDD[T]
2)persist:RDD[T]
3)Persist(level:StorageLevel):RDD[T]
(2)checkpoint
將RDD持久化到HDFS中,與persist操作不同的是checkpoint會切斷此RDD之前的依賴關系,而persist依然保留RDD的依賴關系。
注意:控制操作的細節會在后續博客專門講解
action操作
Spark編程模型(下)
什么是Pair RDD
(1)包含鍵值對類型的RDD被稱作Pair RDD
(2)Pair RDD通常用來進行聚合計算
(3)Pair RDD通常由普通RDD做ETL轉換而來
創建Pair RDD
Python語言
pairs = lines.map(lambda x: (x.split(" ")[0], x))
scala語言
val pairs = lines.map(x => (x.split(" ")(0), x))
Java語言
PairFunction keyData =
new PairFunction() {
public Tuple2 call(String x) { return new Tuple2(x.split(" ")[0], x); } }; JavaPairRDD pairs = lines.mapToPair(keyData);
Pair RDD的transformation操作
Pair RDD轉換操作1
Pair RDD 可以使用所有標准RDD 上轉化操作,還提供了特有的轉換操作。
Pair RDD轉換操作2
Pair RDD的action操作
Pair RDD轉換操作1
所有基礎RDD 支持的行動操作也都在pair RDD 上可用
Pair RDD的分區控制
Pair RDD的分區控制
(1) Spark 中所有的鍵值對RDD 都可以進行分區控制---自定義分區
(2)自定義分區的好處:
1)避免數據傾斜
2)控制task並行度
自定義分區方式
class DomainNamePartitioner(numParts: Int) extends Partitioner { override def numPartitions: Int = numParts override def getPartition(key: Any): Int = { val domain = new Java.net.URL(key.toString).getHost() val code = (domain.hashCode % numPartitions) if(code < 0) { code + numPartitions // 使其非負 }else{ code } } // 用來讓Spark區分分區函數對象的Java equals方法 override def equals(other: Any): Boolean = other match { case dnp: DomainNamePartitioner => dnp.numPartitions == numPartitions case _ => false }
同時,大家可以關注我的個人博客:
http://www.cnblogs.com/zlslch/ 和 http://www.cnblogs.com/lchzls/ http://www.cnblogs.com/sunnyDream/
詳情請見:http://www.cnblogs.com/zlslch/p/7473861.html
人生苦短,我願分享。本公眾號將秉持活到老學到老學習無休止的交流分享開源精神,匯聚於互聯網和個人學習工作的精華干貨知識,一切來於互聯網,反饋回互聯網。
目前研究領域:大數據、機器學習、深度學習、人工智能、數據挖掘、數據分析。 語言涉及:Java、Scala、Python、Shell、Linux等 。同時還涉及平常所使用的手機、電腦和互聯網上的使用技巧、問題和實用軟件。 只要你一直關注和呆在群里,每天必須有收獲
對應本平台的討論和答疑QQ群:大數據和人工智能躺過的坑(總群)(161156071)
打開百度App,掃碼,精彩文章每天更新!歡迎關注我的百家號: 九月哥快訊