Spark運行架構:
Spark運行架構包括集群資源管理器(Cluster Manager)、運行作業任務的工作節點(Worker Node)、每個應用的任務控制節點(Driver)和每個工作節點上負責具體任務的執行進程(Executor)

與Hadoop MapReduce計算框架相比,Spark所采用的Executor有兩個優點:
一是利用多線程來執行具體的任務(Hadoop MapReduce采用的是進程模型),減少任務的啟動開銷;
二是Executor中有一個BlockManager存儲模塊,會將內存和磁盤共同作為存儲設備,當需要多輪迭代計算時,可以將中間結果存儲到這個存儲模塊里,下次需要時,就可以直接讀該存儲模塊里的數據,而不需要讀寫到HDFS等文件系統里,因而有效減少了IO開銷;或者在交互式查詢場景下,預先將表緩存到該存儲系統上,從而可以提高讀寫IO性能。
在Spark中,一個應用(Application)由一個任務控制節點(Driver)和若干個作業(Job)構成,一個作業由多個階段(Stage)構成,一個階段由多個任務(Task)組成。當執行一個應用時,任務控制節點會向集群管理器(Cluster Manager)申請資源,啟動Executor,並向Executor發送應用程序代碼和文件,然后在Executor上執行任務,運行結束后,執行結果會返回給任務控制節點,或者寫到HDFS或者其他數據庫中。

Spark的基本運行流程如下:
(1)當一個Spark應用被提交時,首先需要為這個應用構建起基本的運行環境,即由任務控制節點(Driver)創建一個SparkContext,由SparkContext負責和資源管理器(Cluster Manager)的通信以及進行資源的申請、任務的分配和監控等。SparkContext會向資源管理器注冊並申請運行Executor的資源;
(2)資源管理器為Executor分配資源,並啟動Executor進程,Executor運行情況將隨着“心跳”發送到資源管理器上;
(3)SparkContext根據RDD的依賴關系構建DAG圖,DAG圖提交給DAG調度器(DAGScheduler)進行解析,將DAG圖分解成多個“階段”(每個階段都是一個任務集),並且計算出各個階段之間的依賴關系,然后把一個個“任務集”提交給底層的任務調度器(TaskScheduler)進行處理;Executor向SparkContext申請任務,任務調度器將任務分發給Executor運行,同時,SparkContext將應用程序代碼發放給Executor;
(4)任務在Executor上運行,把執行結果反饋給任務調度器,然后反饋給DAG調度器,運行完畢后寫入數據並釋放所有資源。

Spark運行架構具有以下特點:
(1)每個應用都有自己專屬的Executor進程,並且該進程在應用運行期間一直駐留。Executor進程以多線程的方式運行任務,減少了多進程任務頻繁的啟動開銷,使得任務執行變得非常高效和可靠;
(2)Spark運行過程與資源管理器無關,只要能夠獲取Executor進程並保持通信即可;
(3)Executor上有一個BlockManager存儲模塊,類似於鍵值存儲系統(把內存和磁盤共同作為存儲設備),在處理迭代計算任務時,不需要把中間結果寫入到HDFS等文件系統,而是直接放在這個存儲系統上,后續有需要時就可以直接讀取;在交互式查詢場景下,也可以把表提前緩存到這個存儲系統上,提高讀寫IO性能;
(4)任務采用了數據本地性和推測執行等優化機制。數據本地性是盡量將計算移到數據所在的節點上進行,即“計算向數據靠攏”,因為移動計算比移動數據所占的網絡資源要少得多。而且,Spark采用了延時調度機制,可以在更大的程度上實現執行過程優化。比如,擁有數據的節點當前正被其他的任務占用,那么,在這種情況下是否需要將數據移動到其他的空閑節點呢?
答案是不一定。因為,如果經過預測發現當前節點結束當前任務的時間要比移動數據的時間還要少,那么,調度就會等待,直到當前節點可用。
Spark部署模式:
三種模式:
standalone,Spark on Mesos,Spark on YARN
S:不需要依賴其他系統
M:資源調度管理框架,Spark充分支持,官方推薦
Y:與Hadoop統一部署,資源管理和調度依賴YARN,分布式存儲依賴HDFS
用Spark架構同時滿足批處理和流處理:

Spark核心 -- RDD
RDD設計背景:
提供一個抽象數據架構,避免中間結果存儲
RDD概念:
分布式對象集合,本質上是只讀的分區記錄集合
RDD操作:行動(返回非RDD),轉換(返回RDD)
典型執行過程:
1.RDD讀入外部數據
2.進行一系列“轉換”操作,產生不同的RDD
3.最后一個RDD經過“行動”,輸出到外部數據源
“行動”才會真正發生計算,而“轉換”是記錄相互之間的依賴關系
當“行動”要進行輸出時,Spark根據RDD的依賴關系生成DAG,從起點開始計算
建立的“轉換”然后生成DAG圖稱為一個“血緣關系”
血緣關系連接起來的RDD操作實現管道化,這就保證了中途不需要保存數據,而是直接管道式流入下一步
具體例子
val sc= new SparkContext(“spark://localhost:7077”,”Hello World”, “YOUR_SPARK_HOME”,”YOUR_APP_JAR”)
// 創建SC對象
val fileRDD = sc.textFile(“hdfs://192.168.0.103:9000/examplefile”)
// 從HDFS讀取數據創建一個RDD
val filterRDD = fileRDD.filter(_.contains(“Hello World”))
// “轉換”得到一個新的RDD
filterRDD.cache()
// 采用cache接口把RDD保存在內存中,進行持久化
filterRDD.count()
// “行動”,計算包含元素個數
以上執行流程:
1.創建sc對象
2.從外部數據源(HDFS)讀取並創建fileRDD對象
3.構建fileRDD和filterRDD的依賴關系,形成DAG圖(轉換軌跡)
4.觸發計算,把結果持久化到內存
RDD特性:
1.高效的容錯性,只需要記錄粗粒度的轉換,不需細粒度的日志
2.中間結果持久化到內存
3.可以存放Java對象
RDD依賴關系:
分為窄依賴(一父對應一子分區,多子可對應一父)與寬依賴(一父對應多子)
窄依賴對應協同划分(key落在同一子分區),寬依賴對應非協同划分
這種依賴設計具有容錯性,加快了執行速度
窄依賴的恢復更高效,Spark還有數據檢查點和記錄日志,在恢復時不需從頭開始
階段划分:
在DAG反向解析,遇到窄依賴才把當前RDD加入當前階段
DAG划分為多個階段,每個階段時任務集合,任務調度器把任務集合分配到executor
整體執行框圖:

兩種RDD創建方法:
1.通過外部數據集。本地、HDFS文件系統、HBase等外部數據源
2.調用SparkContext的parallelize方法,在Driver的存在集合上創建
准備:
打開hadoop的hdfs和spark
./sbin/start-dfs.sh
./bin/spark-shell
1.textFile()方法:
把文件的URL作為參數,也就是地址
Val lines = sc.textFile(…)
生成的是一個String類型RDD,也就是RDD[String]
輸入參數可以是文件名、目錄和壓縮文件
可以輸入第二個參數來指定分區數,默認為每個block創建一個分區
2.通過並行集合創建RDD
parallelize方法
讀入數組/整數array,得到RDD[Int]
RDD操作具體解釋:
轉換、行動
轉換:惰性求值,只記錄轉換軌跡
* filter(func):篩選出滿足函數func的元素,並返回一個新的數據集
* map(func):將每個元素傳遞到函數func中,並將結果返回為一個新的數據集
* flatMap(func):與map()相似,但每個輸入元素都可以映射到0或多個輸出結果
* groupByKey():應用於(K,V)鍵值對的數據集時,返回一個新的(K, Iterable)形式的數據集
* reduceByKey(func):應用於(K,V)鍵值對的數據集時,返回一個新的(K, V)形式的數據集,其中的每個值是將每個key傳遞到函數func中進行聚合
行動:真正的計算
* count() 返回數據集中的元素個數
* collect() 以數組的形式返回數據集中的所有元素
* first() 返回數據集中的第一個元素
* take(n) 以數組的形式返回數據集中的前n個元素
* reduce(func) 通過函數func(輸入兩個參數並返回一個值)聚合數據集中的元素
* foreach(func) 將數據集中的每個元素傳遞到函數func中運行
惰性機制:通過map切分,接着才reduce處理
Filter 操作
RDD.filter()會遍歷RDD中每行文本,並執行括號內的匿名函數
RDD.filter().count()
括號內可填入line => line.contains(“”) 這種Lamda表達式
執行表達式前,把當前行賦值給line,再執行后面的邏輯,然后放入結果集
等所有行執行完,得到結果集,最后才執行count行動操作
map與reduce操作
RDD.map()把每行都傳遞到括號內函數
Lines.map(line => line.split(“ “).size) 對每一行line先進行切分,得到集合RDD[Array[String]],再求出集合中的個數,所以此步轉換成Int類型的RDD[Int]
Reduce.((a, b) => if (a > b) a else b)
計算出RDD[Int]中的最大值,每次比較中保留較大的數,並用到下一次的比較中,也就是后續每次比較只取一個新的數
持久化
由於spark的惰性,每次行動都會從頭開始算,所以避免計算重復,引入持久化(緩存)
persist() 方法可用來標記RDD持久化,到第一次行動時,就會使標記的RDD真正持久化,持久化后可重復使用
用cache()方法會調用persist(MEMORY_ONLY),只使用內存。而persist(MEMORY_AND_DISK)還會在內存不足時放在硬盤上
分區
RDD分區原則:分區的個數盡量等於集群中的CPU核心(core)數目
用spark.default.parallelism 這個參數來配置默認分區數目
從HDFS讀取文件,分區數為文件分片數
打印
rdd.foreach(println)或者rdd.map(println)
在集群中操作時,用collect()方法,把所有worker節點的RDD都抓到Driver Program中才能把所有元素打印出,但可能會導致內存溢出
可用take()方法打印部分元素
常用的鍵值對RDD轉換操作:
包括reduceByKey()、groupByKey()、sortByKey()、join()、cogroup()等
reduceByKey(func)
功能:使用func函數合並具有相同鍵的值
reduceByKey((a, b) => a + b) 就是把相同鍵的值都加起來,所以a,b都代表值
groupByKey()
功能:對具有相同鍵的值進行分組,若有(hey, 1),(hey, 2) 則會生成(hey, (1, 2))
Keys/values
功能:返回鍵值對的鍵/值
sortedByKey()
功能:返回根據鍵排序的RDD
mapValues(func)
功能:只對value進行函數處理
join
就是連接,包括內鏈接(join)、左外連接(leftOuterJoin)、右外連接(rightOuterJoin)等
把兩個pairRDD的相同鍵對應的值,連接在一起
比如第一個RDD有(“spark”, 1)、(“spark”, 2),第二個RDD有(“spark”, “fast”),那么join的結果是(spark, (1, fast))、(spark, (2, fast))
實例:
rdd.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)).mapValues(x => (x._1 / x._2)).collect()
求相同鍵對應值的平均數
1.第一步建立(x, 1),記錄鍵值對個數
2.把值和個數分別相加
3.把總值和總個數相除
Spark中的共享變量
Spark中的另一個抽象:共享變量
滿足多個任務之間、人物與控制節點之間共享變量
兩種類型:廣播變量(broadcast)、累加器(accumulators)
廣播變量:
在每台機器上緩存一個只讀變量
val broadcastV = SparkContext.broadcast(v)
從一個普通變量v來創建一個廣播變量,在集群中可免去重復分發v,一旦創建后,v的值不能修改
累加器:
用來實現計數器(counter)和求和(sum)
用SparkContext.longAccumulator(名稱)或SparkContext.doubleAccumulator()創建
用add方法把數值累加到累加器中,然后要通過控制節點使用value方法讀取
val accu = sc.longAccumulator
sc.parallelize(Array).foreach(x => accum.add(x))
accum.value
Spark SQL簡介 -- 核心為DataFrame
Spark SQL優化了Shark(Hive on Spark)
僅依賴了HiveQL解析和Hive元數據
由Catalyst(函數式關系查詢優化框架)負責執行計划
增加了SchemaRDD,后來變成了DataFrame
Spark SQL支持的數據格式和編程語言(上面為語言 下面為數據格式):

Spark能實現從MySQL到DataFrame的轉化,支持SQL查詢
DataFrame與RDD的區別:
RDD是分布式的JAVA對象的集合,對具體對象的結構不了解
DataFrame以RDD為基礎,是分布式的Row對象集合,提供了詳細的schema(結構信息),清楚每個列的信息
DataFrame也和RDD一樣有“惰性”機制,生成DAG圖到最后才計算
得到DataFrame的兩種方法:
直接創建:
在json文件中讀取數據並創建DataFrame(記得先開了Hadoop再開Spark-shell!!)
1.導入org.apache.spark.sql.SparkSession
2.創建對象,builder()、getOrCreate()
3.使支持RDD轉換為DataFrames及后續sql操作 import spark.implicits._
4.讀取文件 read,得到DataFrames
5.各種常用操作 show、printSchema(打印模式)、select(選擇某些列)、filter(過濾)、groupBy(分組)、sort(排序 desc倒序)、select as(重命名)后面幾個操作都要接show()
RDD轉換:
兩種方法:1.利用反射推斷RDD的schema 2.使用編程接口構造一個schema
一、利用反射推斷RDD,定義case class,隱式轉成DataFrame:
1.導包:org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
org.apache.spark.sql.sql.Encoder
spark.implicits._ 支持RDD隱式轉成DataFrame
2.創建case class
3.sc對象,textFile()導入文件,map()轉換文件並定義DF,toDF()創建了DataFrame
4.注冊臨時表,DF.createOrReplaceTempView(“”)
5.用spark.sql()引入sql語句可從臨時表生成另一DataFrame,最終通過Show()顯示
二、無法提前定義case class時,采用編程方式定義RDD模式:
1.導包:org.apache.spark.sql.types._
org.apache.spark.sql.Row
2.導入文件生成RDD(sc的textFile)
3.定義模式Schema字符串
4.根據字符串生成模式:split()、map()方法生成StructField,然后StructType()轉化成schema
5.生成rowRDD,然后通過rowRDD和schema用createDataFrame()方法建立關系,並生成DataFrame