Apache Spark是用 Scala編程語言 編寫的。為了用Spark支持Python,Apache Spark社區發布了一個工具PySpark。使用PySpark,您也可以使用Python編程語言中的 RDD 。正是由於一個名為 Py4j 的庫,他們才能實現這一目標。
它將創建一個目錄 spark-2.1.0-bin-hadoop2.7 。在啟動PySpark之前,需要設置以下環境來設置Spark路徑和 Py4j路徑 。
export SPARK_HOME = /home/hadoop/spark-2.1.0-bin-hadoop2.7 export PATH = $PATH:/home/hadoop/spark-2.1.0-bin-hadoop2.7/bin export PYTHONPATH = $SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH export PATH = $SPARK_HOME/python:$PATH
或者,要全局設置上述環境,請將它們放在 .bashrc文件中 。然后運行以下命令以使環境正常工作。
# source .bashrc
SparkContext是任何spark功能的入口點。當我們運行任何Spark應用程序時,啟動一個驅動程序,它具有main函數,並在此處啟動SparkContext。然后,驅動程序在工作節點上的執行程序內運行操作。
SparkContext使用Py4J啟動 JVM 並創建 JavaSparkContext。默認情況下,PySpark將SparkContext作為 'sc'提供 ,因此創建新的SparkContext將不起作用。

以下代碼塊包含PySpark類的詳細信息以及SparkContext可以采用的參數。
class pyspark.SparkContext ( master = None, appName = None, sparkHome = None, pyFiles = None, environment = None, batchSize = 0, serializer = PickleSerializer(), conf = None, gateway = None, jsc = None, profiler_cls = <class 'pyspark.profiler.BasicProfiler'> )
參數
以下是SparkContext的參數。
-
Master - 它是連接到的集群的URL。
-
appName - 您的工作名稱。
-
sparkHome - Spark安裝目錄。
-
pyFiles - 要發送到集群並添加到PYTHONPATH的.zip或.py文件。
-
environment - 工作節點環境變量。
-
batchSize - 表示為單個Java對象的Python對象的數量。 設置1以禁用批處理,設置0以根據對象大小自動選擇批處理大小,或設置為-1以使用無限批處理大小。
-
serializer - RDD序列化器。
-
Conf - L {SparkConf}的一個對象,用於設置所有Spark屬性。
-
gateway - 使用現有網關和JVM,否則初始化新JVM。
-
JSC - JavaSparkContext實例。
-
profiler_cls - 用於進行性能分析的一類自定義Profiler(默認為pyspark.profiler.BasicProfiler)。
在上述參數中,主要使用 master 和 appname 。任何PySpark程序的前兩行如下所示
from pyspark import SparkContext sc = SparkContext("local", "First App")
SparkContext示例 - PySpark Shell
現在你已經對SparkContext有了足夠的了解,讓我們在PySpark shell上運行一個簡單的例子。在此示例中,我們將計算 README.md 文件中帶有字符“a”或“b”的行 數 。那么,讓我們說一個文件中有5行,3行有'a'字符,那么輸出將是→ Line with a:3 。字符'b'也是如此。
注 - 我們不會在以下示例中創建任何SparkContext對象,因為默認情況下,當PySpark shell啟動時,Spark會自動創建名為sc的SparkContext對象。 如果您嘗試創建另一個SparkContext對象,您將收到以下錯誤 “ValueError:無法一次運行多個SparkContexts”。

<<< logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md" <<< logData = sc.textFile(logFile).cache() <<< numAs = logData.filter(lambda s: 'a' in s).count() <<< numBs = logData.filter(lambda s: 'b' in s).count() <<< print "Lines with a: %i, lines with b: %i" % (numAs, numBs) Lines with a: 62, lines with b: 30
SparkContext示例 - Python程序
讓我們使用Python程序運行相同的示例。創建一個名為 firstapp.py 的Python文件,並在該文件中輸入以下代碼。
----------------------------------------firstapp.py--------------------------------------- from pyspark import SparkContext logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md" sc = SparkContext("local", "first app") logData = sc.textFile(logFile).cache() numAs = logData.filter(lambda s: 'a' in s).count() numBs = logData.filter(lambda s: 'b' in s).count() print "Lines with a: %i, lines with b: %i" % (numAs, numBs) ----------------------------------------firstapp.py---------------------------------------
然后我們將在終端中執行以下命令來運行此Python文件。我們將得到與上面相同的輸出。
$SPARK_HOME/bin/spark-submit firstapp.py Output: Lines with a: 62, lines with b: 30
現在我們已經在我們的系統上安裝並配置了PySpark,我們可以在Apache Spark上用Python編程。但在此之前,讓我們了解Spark - RDD中的一個基本概念。
RDD代表 Resilient Distributed Dataset,它們是在多個節點上運行和操作以在集群上進行並行處理的元素。RDD是不可變元素,這意味着一旦創建了RDD,就無法對其進行更改。RDD也具有容錯能力,因此在發生任何故障時,它們會自動恢復。您可以在這些RDD上應用多個操作來完成某項任務。
要對這些RDD進行操作,有兩種方法
- Transformation
- Action
讓我們詳細了解這兩種方式。
轉換 - 這些操作應用於RDD以創建新的RDD。 Filter,groupBy和map是轉換的例子。
操作 - 這些是應用於RDD的操作,它指示Spark執行計算並將結果發送回驅動程序。
要在PySpark中應用任何操作,我們首先需要創建一個 PySpark RDD 。以下代碼塊具有PySpark RDD類的詳細信息
class pyspark.RDD ( jrdd, ctx, jrdd_deserializer = AutoBatchedSerializer(PickleSerializer()) )
讓我們看看如何使用PySpark運行一些基本操作。Python文件中的以下代碼創建RDD單詞,其中存儲了一組提到的單詞。
words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] )
我們現在將對單詞進行一些操作。
count()
返回RDD中的元素數。
----------------------------------------count.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "count app") words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) counts = words.count() print "Number of elements in RDD -> %i" % (counts) ----------------------------------------count.py---------------------------------------
命令 - count()的命令是
$SPARK_HOME/bin/spark-submit count.py
輸出 - 上述命令的輸出是
Number of elements in RDD → 8
搜集()
返回RDD中的所有元素。
----------------------------------------collect.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Collect app") words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) coll = words.collect() print "Elements in RDD -> %s" % (coll) ----------------------------------------collect.py---------------------------------------
命令 - collect()的命令是
$SPARK_HOME/bin/spark-submit collect.py
輸出 - 上述命令的輸出是
Elements in RDD -> [ 'scala', 'java', 'hadoop', 'spark', 'akka', 'spark vs hadoop', 'pyspark', 'pyspark and spark' ]
foreach(F)
僅返回滿足foreach內函數條件的元素。在下面的示例中,我們在foreach中調用print函數,它打印RDD中的所有元素。
----------------------------------------foreach.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "ForEach app") words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) def f(x): print(x) fore = words.foreach(f) ----------------------------------------foreach.py---------------------------------------
命令 - foreach(f)的命令是
$SPARK_HOME/bin/spark-submit foreach.py
輸出 - 上述命令的輸出是
scala
java hadoop spark akka spark vs hadoop pyspark pyspark and spark
filter(f)
返回一個包含元素的新RDD,它滿足過濾器內部的功能。在下面的示例中,我們過濾掉包含''spark'的字符串。
----------------------------------------filter.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Filter app") words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) words_filter = words.filter(lambda x: 'spark' in x) filtered = words_filter.collect() print "Fitered RDD -> %s" % (filtered) ----------------------------------------filter.py----------------------------------------
命令 - 過濾器(f)的命令是
$SPARK_HOME/bin/spark-submit filter.py
輸出 - 上述命令的輸出是
Fitered RDD -> [ 'spark', 'spark vs hadoop', 'pyspark', 'pyspark and spark' ]
map(f,preservesPartitioning = False)
通過將函數應用於RDD中的每個元素來返回新的RDD。在下面的示例中,我們形成一個鍵值對,並將每個字符串映射為值1。
----------------------------------------map.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Map app") words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) words_map = words.map(lambda x: (x, 1)) mapping = words_map.collect() print "Key value pair -> %s" % (mapping) ----------------------------------------map.py---------------------------------------
命令 - map命令(f,preservesPartitioning = False)是
$SPARK_HOME/bin/spark-submit map.py
輸出 - 上述命令的輸出是
Key value pair -> [ ('scala', 1), ('java', 1), ('hadoop', 1), ('spark', 1), ('akka', 1), ('spark vs hadoop', 1), ('pyspark', 1), ('pyspark and spark', 1) ]
reduce(F)
執行指定的可交換和關聯二進制操作后,將返回RDD中的元素。在下面的示例中,我們從運算符導入add包並將其應用於'num'以執行簡單的加法運算。
----------------------------------------reduce.py--------------------------------------- from pyspark import SparkContext from operator import add sc = SparkContext("local", "Reduce app") nums = sc.parallelize([1, 2, 3, 4, 5]) adding = nums.reduce(add) print "Adding all the elements -> %i" % (adding) ----------------------------------------reduce.py---------------------------------------
命令 - reduce(f)的命令是
$SPARK_HOME/bin/spark-submit reduce.py
輸出 - 上述命令的輸出是
Adding all the elements -> 15
join(other,numPartitions = None)
它返回RDD,其中包含一對帶有匹配鍵的元素以及該特定鍵的所有值。在以下示例中,兩個不同的RDD中有兩對元素。在連接這兩個RDD之后,我們得到一個RDD,其元素具有匹配的鍵及其值。
----------------------------------------join.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Join app") x = sc.parallelize([("spark", 1), ("hadoop", 4)]) y = sc.parallelize([("spark", 2), ("hadoop", 5)]) joined = x.join(y) final = joined.collect() print "Join RDD -> %s" % (final) ----------------------------------------join.py---------------------------------------
命令 - 連接命令(其他,numPartitions =無)是
$SPARK_HOME/bin/spark-submit join.py
輸出 - 上述命令的輸出是
Join RDD -> [ ('spark', (1, 2)), ('hadoop', (4, 5)) ]
cache()
使用默認存儲級別(MEMORY_ONLY)保留此RDD。您還可以檢查RDD是否被緩存。
----------------------------------------cache.py--------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Cache app") words = sc.parallelize ( ["scala", "java", "hadoop", "spark", "akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) words.cache() caching = words.persist().is_cached print "Words got chached > %s" % (caching) ----------------------------------------cache.py---------------------------------------
命令 - cache()的命令是
$SPARK_HOME/bin/spark-submit cache.py
輸出 - 上述程序的輸出是
Words got cached -> True
這些是在PySpark RDD上完成的一些最重要的操作。
對於並行處理,Apache Spark使用共享變量。當驅動程序將任務發送到集群上的執行程序時,共享變量的副本將在集群的每個節點上運行,以便可以將其用於執行任務。
Apache Spark支持兩種類型的共享變量
- Broadcast
- Accumulator
讓我們詳細了解它們。
廣播
廣播變量用於跨所有節點保存數據副本。此變量緩存在所有計算機上,而不是在具有任務的計算機上發送。以下代碼塊包含PySpark的Broadcast類的詳細信息。
class pyspark.Broadcast ( sc = None, value = None, pickle_registry = None, path = None )
以下示例顯示如何使用Broadcast變量。Broadcast變量有一個名為value的屬性,它存儲數據並用於返回廣播值。
----------------------------------------broadcast.py-------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Broadcast app") words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"]) data = words_new.value print "Stored data -> %s" % (data) elem = words_new.value[2] print "Printing a particular element in RDD -> %s" % (elem) ----------------------------------------broadcast.py--------------------------------------
命令 - 廣播變量的命令如下
$SPARK_HOME/bin/spark-submit broadcast.py
輸出 - 以下命令的輸出如下。
Stored data -> [ 'scala', 'java', 'hadoop', 'spark', 'akka' ] Printing a particular element in RDD -> hadoop
累加器
累加器變量用於通過關聯和交換操作聚合信息。例如,您可以使用累加器進行求和操作或計數器(在MapReduce中)。以下代碼塊包含PySpark的Accumulator類的詳細信息。
class pyspark.Accumulator(aid, value, accum_param)
以下示例顯示如何使用Accumulator變量。Accumulator變量有一個名為value的屬性,類似於廣播變量。它存儲數據並用於返回累加器的值,但僅在驅動程序中可用。
在此示例中,累加器變量由多個工作程序使用並返回累計值。
----------------------------------------accumulator.py------------------------------------ from pyspark import SparkContext sc = SparkContext("local", "Accumulator app") num = sc.accumulator(10) def f(x): global num num+=x rdd = sc.parallelize([20,30,40,50]) rdd.foreach(f) final = num.value print "Accumulated value is -> %i" % (final) ----------------------------------------accumulator.py------------------------------------
命令 - 累加器變量的命令如下
$SPARK_HOME/bin/spark-submit accumulator.py
輸出 - 上面命令的輸出如下。
Accumulated value is -> 150
要在本地/集群上運行Spark應用程序,您需要設置一些配置和參數,這是SparkConf幫助的。它提供運行Spark應用程序的配置。以下代碼塊包含PySpark的SparkConf類的詳細信息。
class pyspark.SparkConf ( loadDefaults = True, _jvm = None, _jconf = None )
最初,我們將使用SparkConf()創建一個SparkConf對象,它將從 spark。* Java系統屬性加載值。現在,您可以使用SparkConf對象設置不同的參數,它們的參數將優先於系統屬性。
在SparkConf類中,有一些setter方法,它們支持鏈接。例如,您可以編寫 conf.setAppName(“PySparkApp”)。setMaster(“local”) 。一旦我們將SparkConf對象傳遞給Apache Spark,任何用戶都無法修改它。
以下是SparkConf最常用的一些屬性
-
set(key,value) - 設置配置屬性。
-
setMaster(value) - 設置主URL。
-
setAppName(value) - 設置應用程序名稱。
-
get(key,defaultValue = None) - 獲取密鑰的配置值。
-
setSparkHome(value) - 在工作節點上設置Spark安裝路徑。
讓我們考慮以下在PySpark程序中使用SparkConf的示例。在此示例中,我們將spark應用程序名稱設置為 PySpark App,並將spark應用程序的主URL設置為→ spark:// master:7077 。
以下代碼塊包含這些行,當它們添加到Python文件中時,它會設置運行PySpark應用程序的基本配置。
---------------------------------------------------------------------------------------
from pyspark import SparkConf, SparkContext conf = SparkConf().setAppName("PySpark App").setMaster("spark://master:7077") sc = SparkContext(conf=conf) ---------------------------------------------------------------------------------------
在Apache Spark中,您可以使用 sc.addFile 上傳文件(sc是您的默認SparkContext),並使用 SparkFiles.get 獲取工作者的路徑。因此,SparkFiles解析通過 SparkContext.addFile() 添加的文件的路徑。
SparkFiles包含以下類方法
- get(filename)
- getrootdirectory()
讓我們詳細了解它們。
get(filename)
它指定通過SparkContext.addFile()添加的文件的路徑。
getrootdirectory()
它指定根目錄的路徑,該目錄包含通過SparkContext.addFile()添加的文件。
----------------------------------------sparkfile.py------------------------------------ from pyspark import SparkContext from pyspark import SparkFiles finddistance = "/home/hadoop/examples_pyspark/finddistance.R" finddistancename = "finddistance.R" sc = SparkContext("local", "SparkFile App") sc.addFile(finddistance) print "Absolute Path -> %s" % SparkFiles.get(finddistancename) ----------------------------------------sparkfile.py------------------------------------
命令 - 命令如下
$SPARK_HOME/bin/spark-submit sparkfiles.py
輸出 - 上述命令的輸出是
Absolute Path -> /tmp/spark-f1170149-af01-4620-9805-f61c85fecee4/userFiles-641dfd0f-240b-4264-a650-4e06e7a57839/finddistance.R
StorageLevel決定如何存儲RDD。在Apache Spark中,StorageLevel決定RDD是應該存儲在內存中還是存儲在磁盤上,或兩者都存儲。它還決定是否序列化RDD以及是否復制RDD分區。
以下代碼塊具有StorageLevel的類定義
class pyspark.StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication = 1)
現在,為了確定RDD的存儲,有不同的存儲級別,如下所示 -
-
DISK_ONLY = StorageLevel(True,False,False,False,1)
-
DISK_ONLY_2 = StorageLevel(True,False,False,False,2)
-
MEMORY_AND_DISK = StorageLevel(True,True,False,False,1)
-
MEMORY_AND_DISK_2 = StorageLevel(True,True,False,False,2)
-
MEMORY_AND_DISK_SER = StorageLevel(True,True,False,False,1)
-
MEMORY_AND_DISK_SER_2 = StorageLevel(True,True,False,False,2)
-
MEMORY_ONLY = StorageLevel(False,True,False,False,1)
-
MEMORY_ONLY_2 = StorageLevel(False,True,False,False,2)
-
MEMORY_ONLY_SER = StorageLevel(False,True,False,False,1)
-
MEMORY_ONLY_SER_2 = StorageLevel(False,True,False,False,2)
-
OFF_HEAP = StorageLevel(True,True,True,False,1)
讓我們考慮以下StorageLevel示例,其中我們使用存儲級別 MEMORY_AND_DISK_2, 這意味着RDD分區將具有2的復制。
------------------------------------storagelevel.py------------------------------------- from pyspark import SparkContext import pyspark sc = SparkContext ( "local", "storagelevel app" ) rdd1 = sc.parallelize([1,2]) rdd1.persist( pyspark.StorageLevel.MEMORY_AND_DISK_2 ) rdd1.getStorageLevel() print(rdd1.getStorageLevel()) ------------------------------------storagelevel.py-------------------------------------
命令 - 命令如下
$SPARK_HOME/bin/spark-submit storagelevel.py
輸出 - 上述命令的輸出如下
Disk Memory Serialized 2x Replicated
Apache Spark提供了一個名為 MLlib 的機器學習API。PySpark也在Python中使用這個機器學習API。它支持不同類型的算法,如下所述
-
mllib.classification - spark.mllib 包支持二進制分類,多類分類和回歸分析的各種方法。分類中一些最流行的算法是 隨機森林,朴素貝葉斯,決策樹 等。
-
mllib.clustering - 聚類是一種無監督的學習問題,您可以根據某些相似概念將實體的子集彼此分組。
-
mllib.fpm - 頻繁模式匹配是挖掘頻繁項,項集,子序列或其他子結構,這些通常是分析大規模數據集的第一步。 多年來,這一直是數據挖掘領域的一個活躍的研究課題。
-
mllib.linalg - 線性代數的MLlib實用程序。
-
mllib.recommendation - 協同過濾通常用於推薦系統。 這些技術旨在填寫用戶項關聯矩陣的缺失條目。
-
spark.mllib - 它目前支持基於模型的協同過濾,其中用戶和產品由一小組可用於預測缺失條目的潛在因素描述。 spark.mllib使用交替最小二乘(ALS)算法來學習這些潛在因素。
-
mllib.regression - 線性回歸屬於回歸算法族。 回歸的目標是找到變量之間的關系和依賴關系。使用線性回歸模型和模型摘要的界面類似於邏輯回歸案例。
還有其他算法,類和函數也作為mllib包的一部分。截至目前,讓我們了解一下 pyspark.mllib 的演示。
以下示例是使用ALS算法進行協同過濾以構建推薦模型並在訓練數據上進行評估。
使用數據集 - test.data
1,1,5.0 1,2,1.0 1,3,5.0 1,4,1.0 2,1,5.0 2,2,1.0 2,3,5.0 2,4,1.0 3,1,1.0 3,2,5.0 3,3,1.0 3,4,5.0 4,1,1.0 4,2,5.0 4,3,1.0 4,4,5.0
--------------------------------------recommend.py---------------------------------------- from __future__ import print_function from pyspark import SparkContext from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating if __name__ == "__main__": sc = SparkContext(appName="Pspark mllib Example") data = sc.textFile("test.data") ratings = data.map(lambda l: l.split(','))\ .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2]))) # Build the recommendation model using Alternating Least Squares rank = 10 numIterations = 10 model = ALS.train(ratings, rank, numIterations) # Evaluate the model on training data testdata = ratings.map(lambda p: (p[0], p[1])) predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2])) ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions) MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean() print("Mean Squared Error = " + str(MSE)) # Save and load model model.save(sc, "target/tmp/myCollaborativeFilter") sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter") --------------------------------------recommend.py----------------------------------------
命令 - 命令如下
$SPARK_HOME/bin/spark-submit recommend.py
輸出 - 上述命令的輸出將為
Mean Squared Error = 1.20536041839e-05
序列化用於Apache Spark的性能調優。通過網絡發送或寫入磁盤或持久存儲在內存中的所有數據都應序列化。序列化在昂貴的操作中起着重要作用。
PySpark支持用於性能調優的自定義序列化程序。PySpark支持以下兩個序列化程序
MarshalSerializer
使用Python的Marshal Serializer序列化對象。此序列化程序比PickleSerializer更快,但支持更少的數據類型。
class pyspark.MarshalSerializer
PickleSerializer
使用Python的Pickle Serializer序列化對象。此序列化程序幾乎支持任何Python對象,但可能不如更專業的序列化程序快。
class pyspark.PickleSerializer
讓我們看一下PySpark序列化的例子。在這里,我們使用MarshalSerializer序列化數據。
--------------------------------------serializing.py------------------------------------- from pyspark.context import SparkContext from pyspark.serializers import MarshalSerializer sc = SparkContext("local", "serialization app", serializer = MarshalSerializer()) print(sc.parallelize(list(range(1000))).map(lambda x: 2 * x).take(10)) sc.stop() --------------------------------------serializing.py-------------------------------------
Command - 命令如下
$SPARK_HOME/bin/spark-submit serializing.py
輸出 - 上述命令的輸出是
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
