Apache Spark是一個開源分布式運算框架,最初是由加州大學柏克萊分校AMPLab所開發。
Hadoop MapReduce的每一步完成必須將數據序列化寫到分布式文件系統導致效率大幅降低。Spark盡可能地在內存上存儲中間結果, 極大地提高了計算速度。
MapReduce是一路計算的優秀解決方案, 但對於多路計算的問題必須將所有作業都轉換為MapReduce模式並串行執行。
Spark擴展了MapReduce模型,允許開發者使用有向無環圖(DAG)開發復雜的多步數據管道。並且支持跨有向無環圖的內存數據共享,以便不同的作業可以共同處理同一個數據
Spark不是Hadoop的替代方案而是其計算框架Hadoop MapReduce的替代方案。Hadoop更多地作為集群管理系統為Spark提供底層支持。
Spark可以使用本地Spark, Hadoop YARN或Apache Mesos作為集群管理系統。Spark支持HDFS,Cassandra, OpenStack Swift作為分布式存儲解決方案。
Spark采用Scala語言開發運行於JVM上,並提供了Scala,Python, Java和R語言API,可以使用其中的Scala和Python進行交互式操作。
本文測試環境為Spark 2.1.0, Python API.
初識Spark
彈性分布式數據集(Resilient Distributed Dataset, RDD)是Saprk的基本數據結構, 代表可以跨機器進行分割的只讀對象集合。
RDD可以由Hadoop InputFormats創建(比如HDFS上的文件)或者由其它RDD轉換而來, RDD一旦創建便不可改變。RDD操作分為變換和行動兩種:
-
變換(Transformation): 接受一個RDD作為參數,返回一個新的RDD, 原RDD不變。
包括:map,filter,flatMap,groupByKey,reduceByKey,aggregateByKey,pipe以及coalesce -
行動(Action): 接受一個RDD作為參數, 進行查詢並返回一個值。
包括: reduce,collect,count,first,take,countByKey以及foreach
Spark的核心組件包括:
-
Spark Core: 核心功能, 提供RDD及其API和操作。
-
Spark SQL: 提供通過Apache Hive的SQL變體HiveQL與Spark進行交互的API。每個數據表被當做一個RDD,Spark SQL查詢被轉換為Spark操作。
-
Spark Streaming:允許對實時數據流進行處理和控制,park Streaming允許程序能夠像普通RDD一樣處理實時數據。
-
MLlib:一個常用機器學習算法庫,算法被實現為對RDD的Spark操作。這個庫包含可擴展的學習算法,比如分類、回歸等需要對大量數據集進行迭代的操作
-
GraphX: 圖計算框架, GraphX擴展了RDD API,包含控制圖、創建子圖、訪問路徑上所有頂點的操作。
體驗Spark
對於Linux和Mac用戶只需要在本地安裝java運行環境並在官網中下載Pre-built版本的壓縮包, 解壓縮之后即可以單機模式使用Spark。
進入解壓后的spark目錄, 其中包含一些腳本和二進制程序:
sbin
: 管理員命令目錄spark-config.sh
將spark運行配置寫入環境變量spark-daemon.sh
在本地啟動守護進程spark-daemons.sh
在所有slave主機上啟動守護進程start-master.sh
啟動master進程start-slave.sh
在本地上啟動slave進程start-slaves.sh
根據conf/slaves配置文件在slave主機上啟動slave進程start-all.sh
啟動所有守護進程,啟動本地master進程, 根據conf/slaves啟動slave進程stop-all.sh
停止所有守護進程及其下的master/slave進程stop-master.sh
停止master進程stop-slave.sh
停止本地的slave進程stop-slaves.sh
停止所有slave進程- 其它服務控制腳本
bin
普通用戶工具目錄pyspark
: python交互環境spark-shell
scala交互環境sparkR
R交互環境spark-submit
將Spark應用提交到集群上運行spark-sql
spark-sql交互環境run-example
運行示例
使用pyspark交互界面
使用sbin/start-all.sh
啟動spark然后調用bin/pyspark
進入Python交互界面:
SparkSession和SparkContext初始化成功后, 可以確認交互界面已正確啟動。
>>> txt = sc.textFile("README.md")
>>> txt.count()
104
上述代碼中,sc是SparkContext的別名, 我們根據"README.md"的內容創建了一個RDD並用count()
方法取出RDD中項目的數量。
使用spark-submit提交python作業
bin/spark-submit
可以將使用python編寫的Spark應用提交到集群上運行。
我們將上文中的示例寫成腳本, 與交互模式不同的是腳本需要手動進行一些配置:
from pyspark import SparkConf, SparkContext
APP_NAME = "My Spark Application"
MASTER_URL = "local[*]"
conf = SparkConf().setAppName(APP_NAME)
conf = conf.setMaster(MASTER_URL)
sc = SparkContext(conf=conf)
def main(sc):
txt = sc.textFile("README.md")
print(txt.count())
if __name__ == '__main__':
main(sc)
保存為my_test.py
, 使用spark-submit提交作業:
$ bin/spark-submit my_test.py
104
現在對上述代碼做一些說明。
APP_NAME
是應用的名稱由程序員自定義,MASTER_URL
用於指定集群Master的位置:
URL | 含義 |
---|---|
local | 用一個worker線程本地運行Spark |
local[K] | 用k個worker線程本地運行Spark(通常設置為機器核心數) |
local[*] | 用盡可能多的worker線程本地運行Spark |
spark://HOST:PORT | 連接到給定的Spark獨立部署集群master, 默認端口7077 |
mesos://HOST:PORT | 連接到給定的mesos集群 |
yarn-client | 以client模式連接到Yarn集群。集群位置將基於通過HADOOP_CONF_DIR變量找到 |
yarn-cluster | 以cluster模式連接到Yarn集群。群集位置將基於通過HADOOP_CONF_DIR變量找到 |
RDD基本操作
創建RDD
並行集合(Parallelized collections)基於python可迭代對象(iterable)創建:
>>> data = [1,2,3,4]
>>> para_data = sc.parallelize(data)
>>> para_data
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:475
>>> para_data.reduce(lambda x, y: x+y)
10
RDD一旦創建即可以並行模式運算.
除了使用內部的iterable對象創建RDD外, 也可以使用外部數據源創建RDD.
Spark 可以從任何一個 Hadoop 支持的存儲源創建分布式數據集,包括你的本地文件系統,HDFS,Cassandra,HBase等.
>>> src_uri = "README.md"
>>> txt = sc.textFile(src_uri)
>>> txt.count()
104
Spark支持textFile, SequenceFile和其它Hadoop InputFormat作為外部數據源。
src_uri支持的協議包括hdfs://
, s3n://
和file://
等。直接填寫路徑則默認采用file://
即本地文件系統路徑.
如果src_uri使用本地文件系統路徑,文件必須能在 worker 節點上用相同的路徑訪問到。要么復制文件到所有的 workers,要么使用網絡共享文件系統.
操作RDD
Spark采用惰性求值的機制進行運算, 我們用一個簡單的例子說明Spark的運算過程:
lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)
第一行從外部數據集創建了一個名為lines的RDD, lines只是一個指針文件內容沒有真的被讀入內存。
第二行執行了map操作, 同樣的lineLength並沒有被立即求值。
第三行執行了reduce操作, Spark 把計算分成多個任務(task),並且讓它們運行在多個機器上。每台機器都運行自己的 map 部分和本地 reduce 部分, 並把結果返回給Master。
transformation
前文已經說明transformation是根據RDD創建新的RDD的操作,這里將說明一些常用的操作,更多內容請參見官方文檔.
-
rdd.map(func): 將數據源的每個元素傳遞給func函數, 得到func的返回值組成新RDD
在示例lines.map(lambda s: len(s))
中lines的元素類型為str, map函數將其映射為長度元素長度的集合。 -
rdd.flatMap(func): func接受一個元素為參數,返回一個可迭代對象。對rdd中所有元素應用func函數, 將返回的列表合並為新的RDD。
>>> r = sc.parallelize([1,2,3,4]).flatMap(lambda x: [x, x+1])
>>> r.collect() # show all elements
>>> [1, 2, 2, 3, 3, 4, 4, 5]
-
rdd.filter(func): 將數據源的每個元素傳遞給func函數, 使func返回True的元素加入到結果RDD中
-
rdd1.union(rdd2): 求rdd1與rdd2的並集
-
rdd1.intersection(rdd2): 求rdd1和rdd2的交集
-
rdd.distinct(): 返回去除重復元素后的rdd
action
action是對RDD進行查詢並返回單個元素的操作, 這里將說明一些常用的操作,更多內容請參見官方文檔。
-
rdd.reduce(func): func是接受兩個參數並返回一個值的函數, reduce使用func對集合進行聚合。
這個過程可以理解為從集合中任取兩個元素傳給func, 然后將返回值加入集合中並刪除兩個參數, 反復迭代直至集合只有一個元素, 該元素即為最后的返回值。
示例lineLengths.reduce(lambda a, b: a + b)
中, reduce函數對RDD內所有函數進行了求和。 -
rdd.collect(): 以python list的形式返回集合中所有元素
-
rdd.first(): 返回集合中第一個元素, 對集合不產生影響
-
rdd.take(n): 返回集合中前n個元素組成的list, 下標從1開始
-
rdd.count(): 返回集合中元素的數目
-
rdd.foreach(func): func是接受一個參數的函數, 對集合中每個元素調用func函數, foreach返回None
使用鍵值對
上文中的RDD對元素的類型是基本沒有限制的, 類似於python內置的list(其實更類似於ORM的查詢集)。RDD在使用二元組作為元素時, Spark會將二元組作為一個鍵值對處理, 二元組的第一個元素被認為是鍵, 第二個元素認為是值。
元素為二元組的RDD仍然可以使用普通RDD的操作,Spark也為這類RDD定義了一些基於鍵值對的操作:
-
groupByKey():將key相同的鍵值對合並為一個鍵值對:
(key,[val, val, ...])
-
reduceByKey(func): 對key相同的鍵值對應用func進行聚合:
(key,RDD<val>.reduce(func))
-
rdd.sortByKey(ascending=True): 按key對鍵值對進行排序,默認為升序
-
rdd1.join(rdd2): 對兩個鍵值對形式的rdd進行合並,(k, v)和(k,w)將被合並為(k, (v,w))
-
countByKey(): 返回每個鍵對應鍵值對的個數(key, count), 返回值為dict而非RDD.
RDD持久化
RDD持久化是Spark的一個重要功能, 上文已經提及Spark提供了持久化到內存的功能極大的提高了運算速度, 也是Spark比Hadoop MapReduce更先進的原因之一。
rdd.persist([level])可以根據指定等級執行持久化:
>>> from pyspark import StorageLevel
>>> r.persist(StorageLevel.MEMORY_ONLY)
PythonRDD[16]
Spark支持的持久化級別包括:
-
MEMORY_ONLY: 將RDD作為java對象存儲在JVM中,若RDD的某部分無法作為java對象存儲,則不對該部分進行緩存。默認緩存級別。
-
MEMORY_AND_DISK: 將RDD作為java對象存儲在JVM中,若RDD的某部分無法作為java對象存儲,則將該部分用pickle序列化后緩存到磁盤上。
-
MEMORY_ONLY_SER: 將RDD序列化后作為java byte[]存儲在內存中,不合適的分區不緩存。 比較節省內存但是消耗時間
-
MEMORY_AND_DISK_SER: 將RDD序列化后作為java byte[]存儲在內存中,不合適的分區序列化后存儲到磁盤上
-
DISK_ONLY: 序列化后僅存儲在磁盤上
-
MEMORY_ONLY_2, MEMORY_AND_DISK_2等: 與上述存儲級別類似, 不過是存儲到兩個節點上
-
OFF_HEAP: 將RDD序列化后緩存到分布式內存存儲Tachyon上
Spark官方文檔給出了一些關於選擇存儲級別的建議:
-
如果你的RDD適合默認的存儲級別(MEMORY_ONLY),就選擇默認的存儲級別。因為這是cpu利用率最高的選項,會使RDD上的操作盡可能的快。
-
如果不適合用默認的級別,選擇MEMORY_ONLY_SER。選擇一個更快的序列化庫提高對象的空間使用率,但是仍能夠相當快的訪問。
-
除非函數計算RDD的花費較大或者它們需要過濾大量的數據,不要將RDD存儲到磁盤上,否則,重復計算一個分區就會和重磁盤上讀取數據一樣慢。
-
如果你希望更快的錯誤恢復,可以利用重復(replicated)存儲級別。所有的存儲級別都可以通過重復計算丟失的數據來支持完整的容錯,但是重復的數據能夠使你在RDD上繼續運行任務,而不需要重復計算丟失的數據。
-
在擁有大量內存的環境中或者多應用程序的環境中,OFF_HEAP具有如下優勢:
- 它運行多個執行者共享Tachyon中相同的內存池
- 它顯著地減少垃圾回收的花費
- 如果單個的執行者崩潰,緩存的數據不會丟失
Spark提供了rdd.cache()
方法, 它與rdd.persist(StorageLevel.MEMORY_ONLY)
功能相同。
Spark自動的監控每個節點緩存的使用情況,利用最近最少使用原則刪除老舊的數據。rdd.unpersist()
可以手動刪除緩存。
使用共享變量
一個傳遞給Spark操作(例如map和reduce)的函數在遠程節點上面運行時,Spark操作實際上操作的是這個函數所用變量的一個獨立副本。
這些變量被復制到每台機器上,並且這些變量在遠程機器上 的所有更新都不會傳遞回驅動程序,通常這種跨任務的讀寫變量是低效的。
Spark提供了兩個共享變量: 廣播變量(broadcast variable)和累加器(accumulator)進行跨任務共享。
廣播變量
廣播變量在每台機器上面緩存一個只讀變量,而不是每個任務保存一個副本。Spark也嘗試着利用有效的廣播算法去分配廣播變量,以減少通信的成本。
>>> broadcastVar = sc.broadcast([1, 2, 3])
>>> broadcastVar.value
[1, 2, 3]
廣播變量創建后我們可以使用它代替原變量,其操作與RDD基本相同。
累加器
累加器特性與廣播變量類似, 另外定義了add方法用於累加。
>>> accum = sc.accumulator(0)
>>> accum
Accumulator<id=0, value=0>
>>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
>>> accum.value
10
累加器默認使用python內置int類型計數, 我們可以自定義計數類型。通常自定義類型為多維向量,用來進行復雜計數。