Spark Overview(Spark概述)
·Apache Spark是一種快速通用的集群計算系統。
·它提供Java,Scala,Python和R中的高級API,以及支持通用執行圖的優化引擎。
·它還支持豐富的高級工具集,包括用於SQL和結構化數據處理的Spark SQL,用於機器學習的MLlib,用於圖形處理的GraphX和Spark Streaming
Security(安全性)
·Spark中的安全性默認為OFF。
·這可能意味着您很容易受到默認攻擊。
·在下載和運行Spark之前,請參閱Spark Security
Downloading
·從項目網站的下載頁面獲取Spark。
·本文檔適用於Spark版本2.4.2。
·Spark使用Hadoop的客戶端庫來實現HDFS和YARN。
·下載是針對少數流行的Hadoop版本預先打包的。
·用戶還可以通過增加Spark的類路徑下載“Hadoop免費”二進制文件並使用任何Hadoop版本運行Spark。
·Scala和Java用戶可以使用Maven坐標在他們的項目中包含Spark,並且將來Python用戶也可以從PyPI安裝Spark。
·如果您想從源代碼構建Spark,請訪問Building Spark。
·Spark在Windows和類UNIX系統(例如Linux,Mac OS)上運行。
·在一台機器上本地運行很容易 - 您只需要在系統PATH上安裝Java,或者指向Java安裝的JAVA_HOME環境變量。
·Spark運行在Java 8 +,Python 2.7 + / 3.4 +和R 3.1+上。
·對於Scala API,Spark 2.4.2使用Scala 2.12。
·您需要使用兼容的Scala版本(2.12.x)。
·請注意,自Spark 2.2.0起,對2.6.5之前的Java 7,Python 2.6和舊Hadoop版本的支持已被刪除。
·自2.3.0起,對Scala 2.10的支持被刪除。
·自Spark 2.4.1起,對Scala 2.11的支持已被棄用,將在Spark 3.0中刪除。
Running the Examples and Shell(運行示例和Shell)
·Spark附帶了幾個示例程序。
·Scala,Java,Python和R示例位於examples / src / main目錄中。
·要運行其中一個Java或Scala示例程序,請在頂級Spark目錄中使用bin / run-example [params]。
·(在幕后,這將調用更常用的spark-submit腳本來啟動應用程序)。
·例如
./bin/run-example SparkPi 10
·您還可以通過Scala shell的修改版本以交互方式運行Spark。
·這是學習框架的好方法。
./bin/spark-shell --master local[2]
·--master選項指定分布式集群的主URL,或本地在一個線程上本地運行,或本地[N]在本地運行N個線程。
·您應該首先使用local進行測試。
·有關選項的完整列表,請使用--help選項運行Spark shell。
·Spark還提供了一個Python API。
·要在Python解釋器中以交互方式運行Spark,請使用bin / pyspark:
./bin/pyspark --master local[2]
·Python中也提供了示例應用程序。
·例如:
./bin/spark-submit examples/src/main/python/pi.py 10
Quick Start(快速開始)
·本教程簡要介紹了如何使用Spark。
·我們將首先通過Spark的交互式shell(在Python或Scala中)介紹API,然后展示如何使用Java,Scala和Python編寫應用程序。
·要繼續本指南,首先,從Spark網站下載Spark的打包版本。
·由於我們不會使用HDFS,您可以下載任何版本的Hadoop的軟件包。
·請注意,在Spark 2.0之前,Spark的主要編程接口是Resilient Distributed Dataset(RDD)。
·在Spark 2.0之后,RDD被數據集取代,數據集像RDD一樣強類型,但在底層有更豐富的優化。
·仍然支持RDD接口,您可以在RDD編程指南中獲得更詳細的參考。
·但是,我們強烈建議您切換到使用數據集,它具有比RDD更好的性能。
·請參閱SQL編程指南以獲取有關數據集的更多信息
Interactive Analysis with the Spark Shell(使用Spark Shell進行交互式分析)
Basics(基本)
·Spark的shell提供了一種學習API的簡單方法,以及一種以交互方式分析數據的強大工具。
·它可以在Scala(在Java VM上運行,因此是使用現有Java庫的好方法)或Python中使用。
·通過在Spark目錄中運行以下命令來啟動它:
./bin/pyspark
或者如果在當前環境中使用pip安裝了PySpark:
pyspark
·Spark的主要抽象是一個名為Dataset的分布式項目集合。
·可以從Hadoop InputFormats(例如HDFS文件)或通過轉換其他數據集來創建數據集。
·由於Python的動態特性,我們不需要在Python中強類型數據集。
·因此,Python中的所有數據集都是Dataset [Row],我們稱之為DataFrame與Pandas和R中的數據框概念一致。讓我們從Spark源目錄中的README文件的文本中創建一個新的DataFrame:
>>> textFile = spark.read.text("README.md")
·您可以通過調用某些操作直接從DataFrame獲取值,也可以轉換DataFrame以獲取新值。
·有關更多詳細信息,請閱讀API文檔。
>>> textFile.count() # Number of rows in this DataFrame 126 >>> textFile.first() # First row in this DataFrame Row(value=u'# Apache Spark')
·現在讓我們將這個DataFrame轉換為一個新的DataFrame。
·我們調用filter來返回一個新的DataFrame,其中包含文件中的一行子集。
>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))
我們可以將轉換和行動聯系在一起:
>>> textFile.filter(textFile.value.contains("Spark")).count() # How many lines contain "Spark"? 15
More on Dataset Operations(有關數據集操作的更多信息)
·數據集操作和轉換可用於更復雜的計算。
·假設我們想要找到含有最多單詞的行:
>>> from pyspark.sql.functions import * >>> textFile.select(size(split(textFile.value, "\s+")).name("numWords")).agg(max(col("numWords"))).collect() [Row(max(numWords)=15)]
·這首先將一行映射為整數值,並將其別名為“numWords”,從而創建一個新的DataFrame。
·在該DataFrame上調用agg以查找最大字數。
·select和agg的參數都是Column,我們可以使用df.colName從DataFrame中獲取一列。
·我們還可以導入pyspark.sql.functions,它提供了許多方便的功能來從舊的列構建一個新的列。
·一個常見的數據流模式是MapReduce,由Hadoop推廣。
·Spark可以輕松實現MapReduce流程:
>>> wordCounts = textFile.select(explode(split(textFile.value, "\s+")).alias("word")).groupBy("word").count()
·在這里,我們使用select中的explode函數,將行數據集轉換為單詞數據集,然后將groupBy和count結合起來計算文件中的每個單詞計數,作為2列的DataFrame:“word”和“
·計數”。
·要在我們的shell中收集單詞count,我們可以調用collect:
>>> wordCounts.collect() [Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...]
Caching(高速緩存)
·Spark還支持將數據集提取到群集范圍的內存緩存中。
·這在重復訪問數據時非常有用,例如查詢小的“熱”數據集或運行像PageRank這樣的迭代算法時。
·舉個簡單的例子,讓我們標記要緩存的linesWithSpark數據集:
>>> linesWithSpark.cache() >>> linesWithSpark.count() 15 >>> linesWithSpark.count() 15
·使用Spark來探索和緩存100行文本文件似乎很愚蠢。
·有趣的是,這些相同的功能可用於非常大的數據集,即使它們跨越數十個或數百個節點進行條帶化。
·您也可以通過將bin / pyspark連接到群集來交互式地執行此操作,如RDD編程指南中所述。
Self-Contained Applications(自包含的應用程序)
·假設我們希望使用Spark API編寫一個自包含的應用程序。
·我們將在Scala(使用sbt),Java(使用Maven)和Python(pip)中使用簡單的應用程序。
·現在我們將展示如何使用Python API(PySpark)編寫應用程序。
·如果要構建打包的PySpark應用程序或庫,可以將其添加到setup.py文件中:
install_requires=[ 'pyspark=={site.SPARK_VERSION}' ]
作為示例,我們將創建一個簡單的Spark應用程序SimpleApp.py:
"""SimpleApp.py""" from pyspark.sql import SparkSession logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system spark = SparkSession.builder.appName("SimpleApp").getOrCreate() logData = spark.read.text(logFile).cache() numAs = logData.filter(logData.value.contains('a')).count() numBs = logData.filter(logData.value.contains('b')).count() print("Lines with a: %i, lines with b: %i" % (numAs, numBs)) spark.stop()
·該程序只計算包含'a'的行數和包含文本文件中'b'的數字。
·請注意,您需要將YOUR_SPARK_HOME替換為安裝Spark的位置。
·與Scala和Java示例一樣,我們使用SparkSession來創建數據集。
·對於使用自定義類或第三方庫的應用程序,我們還可以通過將它們打包到.zip文件中來添加代碼依賴關系以通過其--py-files參數進行spark-submit(有關詳細信息,請參閱spark-submit --help)。
·SimpleApp非常簡單,我們不需要指定任何代碼依賴項。
我們可以使用bin / spark-submit腳本運行此應用程序:
# Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit \ --master local[4] \ SimpleApp.py ... Lines with a: 46, Lines with b: 23
如果您的環境中安裝了PySpark pip(例如,pip install pyspark),您可以使用常規Python解釋器運行您的應用程序,或者根據您的喜好使用提供的“spark-submit”。
# Use the Python interpreter to run your application $ python SimpleApp.py ... Lines with a: 46, Lines with b: 23
RDD Programming Guide(RDD編程指南)
Overview(概觀)
·在較高的層次上,每個Spark應用程序都包含一個驅動程序,該程序運行用戶的主要功能並在群集上執行各種並行操作。
·Spark提供的主要抽象是彈性分布式數據集(RDD),它是跨群集節點分區的元素的集合,可以並行操作。
·RDD是通過從Hadoop文件系統(或任何其他Hadoop支持的文件系統)中的文件或驅動程序中的現有Scala集合開始並對其進行轉換而創建的。
·用戶還可以要求Spark在內存中保留RDD,允許它在並行操作中有效地重用。
·最后,RDD會自動從節點故障中恢復。
·Spark中的第二個抽象是可以在並行操作中使用的共享變量。
·默認情況下,當Spark並行運行一個函數作為不同節點上的一組任務時,它會將函數中使用的每個變量的副本發送給每個任務。
·有時,變量需要跨任務共享,或者在任務和驅動程序之間共享。
·Spark支持兩種類型的共享變量:廣播變量,可用於緩存所有節點的內存中的值;累加器,它們是僅“添加”到的變量,例如計數器和總和。
·本指南以Spark支持的每種語言顯示了這些功能。
·如果你啟動Spark的交互式shell,最簡單的方法就是 - 用於Scala shell的bin / spark-shell或用於Python的bin / pyspark。
Linking with Spark(與Spark鏈接)
·Spark 2.4.2適用於Python 2.7+或Python 3.4+。
·它可以使用標准的CPython解釋器,因此可以使用像NumPy這樣的C庫。
·它也適用於PyPy 2.3+。
·Spark 2.2.0中刪除了Python 2.6支持。
·Python中的Spark應用程序可以使用bin / spark-submit腳本運行,該腳本在運行時包含Spark,也可以將其包含在setup.py中:
install_requires=[ 'pyspark=={site.SPARK_VERSION}' ]
·要在不使用pip安裝PySpark的情況下在Python中運行Spark應用程序,請使用位於Spark目錄中的bin / spark-submit腳本。
·此腳本將加載Spark的Java / Scala庫,並允許您將應用程序提交到群集。
·您還可以使用bin / pyspark來啟動交互式Python shell。
·如果您希望訪問HDFS數據,則需要使用PySpark構建鏈接到您的HDFS版本。
·Spark主頁上還提供了預構建的軟件包,可用於常見的HDFS版本。
·最后,您需要將一些Spark類導入到您的程序中。
·添加以下行:
from pyspark import SparkContext, SparkConf
·PySpark在驅動程序和工作程序中都需要相同的次要版本的Python。
·它使用PATH中的默認python版本,您可以指定PYSPARK_PYTHON要使用的Python版本,例如:
$ PYSPARK_PYTHON=python3.4 bin/pyspark $ PYSPARK_PYTHON=/opt/pypy-2.5/bin/pypy bin/spark-submit examples/src/main/python/pi.py
Initializing Spark(初始化Spark)
·Spark程序必須做的第一件事是創建一個SparkContext對象,它告訴Spark如何訪問集群。
·要創建SparkContext,首先需要構建一個包含有關應用程序信息的SparkConf對象。
conf = SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf)
·appName參數是應用程序在集群UI上顯示的名稱。
·master是Spark,Mesos或YARN群集URL,或者是以本地模式運行的特殊“本地”字符串。
·實際上,在群集上運行時,您不希望在程序中對master進行硬編碼,而是使用spark-submit啟動應用程序並在那里接收它。
·但是,對於本地測試和單元測試,您可以傳遞“local”以在進程中運行Spark。
Using the Shell(使用Shell)
·在PySpark shell中,已經為你創建了一個特殊的解釋器感知SparkContext,名為sc。
·制作自己的SparkContext將無法正常工作。
·您可以使用--master參數設置上下文連接到的主服務器,並且可以通過將逗號分隔的列表傳遞給--py-files將Python .zip,.egg或.py文件添加到運行時路徑。
·您還可以通過向--packages參數提供以逗號分隔的Maven坐標列表,將依賴項(例如Spark包)添加到shell會話中。
·任何可能存在依賴關系的其他存儲庫(例如Sonatype)都可以傳遞給--repositories參數。
·必要時,必須使用pip手動安裝Spark軟件包具有的任何Python依賴項(在該軟件包的requirements.txt中列出)。
·例如,要在四個核心上運行bin / pyspark,請使用:
$ ./bin/pyspark --master local[4]
或者,要將code.py添加到搜索路徑(以便以后能夠導入代碼),請使用:
$ ./bin/pyspark --master local[4] --py-files code.py
·有關選項的完整列表,請運行pyspark --help。
·在幕后,pyspark調用更一般的spark-submit腳本。
·也可以在增強的Python解釋器IPython中啟動PySpark shell。
·PySpark適用於IPython 1.0.0及更高版本。
·要使用IPython,請在運行bin / pyspark時將PYSPARK_DRIVER_PYTHON變量設置為ipython:
$ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark
要使用Jupyter notebook(以前稱為IPython notebook)
$ PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=notebook ./bin/pyspark
·您可以通過設置PYSPARK_DRIVER_PYTHON_OPTS來自定義ipython或jupyter命令。
·啟動Jupyter Notebook服務器后,您可以從“文件”選項卡創建一個新的“Python 2”筆記本。
·在筆記本內部,您可以在開始嘗試使用Jupyter notebook中的Spark之前輸入命令%pylab inline作為筆記本的一部分。
Resilient Distributed Datasets (彈性分布式數據集)(RDDs)
·Spark圍繞彈性分布式數據集(RDD)的概念展開,RDD是一個可以並行操作的容錯的容錯集合。
·創建RDD有兩種方法:並行化驅動程序中的現有集合,或引用外部存儲系統中的數據集,例如共享文件系統,HDFS,HBase或提供Hadoop InputFormat的任何數據源。
Parallelized Collections(並行化集合)
·通過在驅動程序中的現有可迭代或集合上調用SparkContext的parallelize方法來創建並行化集合。
·復制集合的元素以形成可以並行操作的分布式數據集。
·例如,以下是如何創建包含數字1到5的並行化集合:
data = [1, 2, 3, 4, 5] distData = sc.parallelize(data)
·一旦創建,分布式數據集(distData)可以並行操作。
·例如,我們可以調用distData.reduce(lambda a,b:a + b)來添加列表的元素。
·我們稍后將描述對分布式數據集的操作。
·並行集合的一個重要參數是將數據集切割為的分區數。
·Spark將為群集的每個分區運行一個任務。
·通常,您希望群集中的每個CPU有2-4個分區。
·通常,Spark會嘗試根據您的群集自動設置分區數。
·但是,您也可以通過將其作為第二個參數傳遞給並行化來手動設置它(例如sc.parallelize(data,10))。
·注意:代碼中的某些位置使用術語切片(分區的同義詞)來保持向后兼容性。
External Datasets(外部數據集)
·PySpark可以從Hadoop支持的任何存儲源創建分布式數據集,包括本地文件系統,HDFS,Cassandra,HBase,Amazon S3等.Spark支持文本文件,SequenceFiles和任何其他Hadoop InputFormat。
·可以使用SparkContext的textFile方法創建文本文件RDD。
·此方法獲取文件的URI(計算機上的本地路徑,或hdfs://,s3a://等URI)並將其作為行集合讀取。
·這是一個示例調用:
>>> distFile = sc.textFile("data.txt")
·創建后,distFile可以由數據集操作執行。
·例如,我們可以使用map添加所有行的大小,並按如下方式減少操作:distFile.map(lambda s:len(s))。reduce(lambda a,b:a + b)。
·有關使用Spark讀取文件的一些注意事項
·如果在本地文件系統上使用路徑,則還必須可以在工作節點上的相同路徑上訪問該文件。
·將文件復制到所有工作者或使用網絡安裝的共享文件系統。
·Spark的所有基於文件的輸入方法(包括textFile)都支持在目錄,壓縮文件和通配符上運行。
·例如,您可以使用textFile(“/ my / directory”),textFile(“/ my / directory / * .txt”)和textFile(“/ my / directory / * .gz”)。
·textFile方法還采用可選的第二個參數來控制文件的分區數。
·默認情況下,Spark為文件的每個塊創建一個分區(HDFS中默認為128MB),但您也可以通過傳遞更大的值來請求更多的分區。
·請注意,您不能擁有比塊少的分區。
·除文本文件外,Spark的Python API還支持其他幾種數據格式:
·SparkContext.wholeTextFiles允許您讀取包含多個小文本文件的目錄,並將它們作為(文件名,內容)對返回。
·這與textFile形成對比,textFile將在每個文件中每行返回一條記錄。
·RDD.saveAsPickleFile和SparkContext.pickleFile支持以包含pickle Python對象的簡單格式保存RDD。
·批處理用於pickle序列化,默認批處理大小為10。
·SequenceFile和Hadoop輸入/輸出格式
·請注意,此功能目前標記為“實驗”,適用於高級用戶。
·將來可能會使用基於Spark SQL的讀/寫支持替換它,在這種情況下,Spark SQL是首選方法。
·可寫支持
·PySpark SequenceFile支持在Java中加載鍵值對的RDD,將Writable轉換為基本Java類型,並使用Pyrolite挖掘生成的Java對象。
·將鍵值對的RDD保存到SequenceFile時,PySpark會反過來。
·它將Python對象解開為Java對象,然后將它們轉換為Writable。
·以下Writable會自動轉換:
Writable Type(可寫類型) | Python Type |
---|---|
Text | unicode str |
IntWritable | int |
FloatWritable | float |
DoubleWritable | float |
BooleanWritable | bool |
BytesWritable | bytearray |
NullWritable | None |
MapWritable | dict |
·數組不是開箱即用的。
·用戶在讀取或寫入時需要指定自定義ArrayWritable子類型。
·編寫時,用戶還需要指定將數組轉換為自定義ArrayWritable子類型的自定義轉換器。
·在讀取時,默認轉換器將自定義ArrayWritable子類型轉換為Java Object [],然后將其pickle到Python元組。
·要為原始類型的數組獲取Python array.array,用戶需要指定自定義轉換器。
Saving and Loading SequenceFiles(保存和加載SequenceFiles)
·與文本文件類似,可以通過指定路徑來保存和加載SequenceFiles。
·可以指定鍵和值類,但對於標准Writable,這不是必需的。
$ ./bin/pyspark --jars /path/to/elasticsearch-hadoop.jar >>> conf = {"es.resource" : "index/type"} # assume Elasticsearch is running on localhost defaults >>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat", "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf) >>> rdd.first() # the result is a MapWritable that is converted to a Python dict (u'Elasticsearch ID', {u'field1': True, u'field2': u'Some Text', u'field3': 12345})
·請注意,如果InputFormat僅依賴於Hadoop配置和/或輸入路徑,並且可以根據上表輕松轉換鍵和值類,則此方法應適用於此類情況。
·如果您有自定義序列化二進制數據(例如從Cassandra / HBase加載數據),那么您首先需要將Scala / Java端的數據轉換為可由Pyrolite的pickler處理的數據。
·為此提供了轉換器特性。
·只需擴展此特征並在convert方法中實現轉換代碼。
·請記住確保將此類以及訪問InputFormat所需的任何依賴項打包到Spark作業jar中並包含在PySpark類路徑中。
·有關使用Cassandra / HBase InputFormat和OutputFormat以及自定義轉換器的示例,請參閱Python示例和Converter示例。
RDD Operations(RDD操作)
·RDD支持兩種類型的操作:轉換(從現有數據集創建新數據集)和操作(在數據集上運行計算后將值返回到驅動程序)。
·例如,map是一個轉換,它通過一個函數傳遞每個數據集元素,並返回一個表示結果的新RDD。
·另一方面,reduce是一個使用某個函數聚合RDD的所有元素的操作,並將最終結果返回給驅動程序(盡管還有一個返回分布式數據集的並行reduceByKey)。
·Spark中的所有轉換都是惰性的,因為它們不會立即計算結果。
·相反,他們只記得應用於某些基礎數據集(例如文件)的轉換。
·僅當操作需要將結果返回到驅動程序時才會計算轉換。
·這種設計使Spark能夠更有效地運行。
·例如,我們可以意識到通過map創建的數據集將用於reduce,並且僅將reduce的結果返回給驅動程序,而不是更大的映射數據集。
·默認情況下,每次對其執行操作時,都可以重新計算每個轉換后的RDD。
·但是,您也可以使用持久化(或緩存)方法在內存中保留RDD,在這種情況下,Spark會在群集上保留元素,以便在下次查詢時更快地訪問。
·還支持在磁盤上保留RDD或在多個節點上復制。
Basics(基本)
為了說明RDD基礎知識,請考慮以下簡單程序:
lines = sc.textFile("data.txt") lineLengths = lines.map(lambda s: len(s)) totalLength = lineLengths.reduce(lambda a, b: a + b)
·第一行定義來自外部文件的基本RDD。
·此數據集未加載到內存中或以其他方式執行:行僅僅是指向文件的指針。
·第二行將lineLengths定義為地圖轉換的結果。
·同樣,由於懶惰,lineLengths不會立即計算。
·最后,我們運行reduce,這是一個動作。
·此時,Spark將計算分解為在不同機器上運行的任務,並且每台機器都運行其部分映射和本地縮減,僅返回其對驅動程序的答案。
·如果我們以后想再次使用lineLengths,我們可以添加:
lineLengths.persist()
在reduce之前,這將導致lineLengths在第一次計算之后保存在內存中。
Passing Functions to Spark(將函數傳遞給Spark)
·Spark的API在很大程度上依賴於在驅動程序中傳遞函數以在集群上運行。
·有三種建議的方法可以做到這一點:
·Lambda表達式,用於可以作為表達式編寫的簡單函數。
·(Lambdas不支持多語句函數或不返回值的語句。)
·調用Spark的函數內部的本地defs,用於更長的代碼。
·模塊中的頂級函數。
·例如,要傳遞比使用lambda支持的更長的函數,請考慮以下代碼:
"""MyScript.py""" if __name__ == "__main__": def myFunc(s): words = s.split(" ") return len(words) sc = SparkContext(...) sc.textFile("file.txt").map(myFunc)
·請注意,雖然也可以將引用傳遞給類實例中的方法(而不是單例對象),但這需要發送包含該類的對象以及方法。
·例如,考慮:
class MyClass(object): def func(self, s): return s def doStuff(self, rdd): return rdd.map(self.func)
·在這里,如果我們創建一個新的MyClass並在其上調用doStuff,那里的map會引用該MyClass實例的func方法,因此需要將整個對象發送到集群。
·以類似的方式,訪問外部對象的字段將引用整個對象:
class MyClass(object): def __init__(self): self.field = "Hello" def doStuff(self, rdd): return rdd.map(lambda s: self.field + s)
要避免此問題,最簡單的方法是將字段復制到局部變量中,而不是從外部訪問它:
def doStuff(self, rdd): field = self.field return rdd.map(lambda s: field + s)
Understanding closures(理解閉包)
·Spark的一個難點是在跨集群執行代碼時理解變量和方法的范圍和生命周期。
·修改其范圍之外的變量的RDD操作可能經常引起混淆。
·在下面的示例中,我們將查看使用foreach()遞增計數器的代碼,但同樣的問題也可能發生在其他操作中。
Example
·考慮下面的天真RDD元素總和,根據執行是否在同一JVM中發生,它可能表現不同。
·一個常見的例子是在本地模式下運行Spark(--master = local [n])而不是將Spark應用程序部署到集群(例如通過spark-submit to YARN):
counter = 0 rdd = sc.parallelize(data) # Wrong: Don't do this!! def increment_counter(x): global counter counter += x rdd.foreach(increment_counter) print("Counter value: ", counter)
Local vs. cluster modes(本地與群集模式)
·上述代碼的行為未定義,可能無法按預期工作。
·為了執行作業,Spark將RDD操作的處理分解為任務,每個任務都由執行程序執行。
·在執行之前,Spark計算任務的閉包。
·閉包是那些變量和方法,它們必須是可見的,以便執行程序在RDD上執行其計算(在本例中為foreach())。
·該閉包被序列化並發送給每個執行者。
·發送給每個執行程序的閉包內的變量現在是副本,因此,當在foreach函數中引用計數器時,它不再是驅動程序節點上的計數器。
·驅動程序節點的內存中仍然有一個計數器,但執行程序不再可見!
·執行程序只能看到序列化閉包中的副本。
·因此,計數器的最終值仍然為零,因為計數器上的所有操作都引用了序列化閉包內的值。
·在本地模式下,在某些情況下,foreach函數實際上將在與驅動程序相同的JVM中執行,並將引用相同的原始計數器,並且可能實際更新它。
·為了確保在這些場景中定義良好的行為,應該使用累加器。
·Spark中的累加器專門用於提供一種機制,用於在跨集群中的工作節點拆分執行時安全地更新變量。
·本指南的“累加器”部分更詳細地討論了這些內容。
·通常,閉包 - 類似循環或本地定義的方法的構造不應該用於改變某些全局狀態。
·Spark沒有定義或保證從閉包外部引用的對象的突變行為。
·執行此操作的某些代碼可能在本地模式下工作,但這只是偶然的,並且此類代碼在分布式模式下不會按預期運行。
·如果需要某些全局聚合,請使用累加器。
Printing elements of an RDD(打印RDD的元素)
·另一個常見的習慣用法是嘗試使用rdd.foreach(println)或rdd.map(println)打印出RDD的元素。
·在一台機器上,這將生成預期的輸出並打印所有RDD的元素。
·但是,在集群模式下,執行程序調用的stdout輸出現在寫入執行程序的stdout,而不是驅動程序上的那個,因此驅動程序上的stdout不會顯示這些!
·要打印驅動程序上的所有元素,可以使用collect()方法首先將RDD帶到驅動程序節點:rdd.collect()。foreach(println)。
·但是,這會導致驅動程序內存不足,因為collect()會將整個RDD提取到一台機器上;
·如果你只需要打印RDD的一些元素,更安全的方法是使用take():rdd.take(100).foreach(println)。
Working with Key-Value Pairs(使用鍵值對)
·雖然大多數Spark操作都適用於包含任何類型對象的RDD,但一些特殊操作僅適用於鍵值對的RDD。
·最常見的是分布式“隨機”操作,例如通過密鑰對元素進行分組或聚合。
·在Python中,這些操作適用於包含內置Python元組的RDD,如(1,2)。
·只需創建這樣的元組,然后調用您想要的操作。
·例如,以下代碼對鍵值對使用reduceByKey操作來計算文件中每行文本出現的次數:
lines = sc.textFile("data.txt") pairs = lines.map(lambda s: (s, 1)) counts = pairs.reduceByKey(lambda a, b: a + b)
例如,我們也可以使用counts.sortByKey()來按字母順序對這些對進行排序,最后使用counts.collect()將它們作為對象列表返回到驅動程序。
Transformations(轉換)
·下表列出了Spark支持的一些常見轉換。
·有關詳細信息,請參閱RDD API文檔(Scala,Java,Python,R)並配對RDD函數doc(Scala,Java)。
·轉型意義
·map(func)返回通過函數func傳遞源的每個元素形成的新分布式數據集。
·filter(func)返回通過選擇func返回true的源元素形成的新數據集。
·flatMap(func)與map類似,但每個輸入項可以映射到0個或更多輸出項(因此func應返回Seq而不是單個項)。
·mapPartitions(func)與map類似,但在RDD的每個分區(塊)上單獨運行,因此當在類型T的RDD上運行時,func必須是Iterator => Iterator
類型。
·mapPartitionsWithIndex(func)與mapPartitions類似,但也為func提供了一個表示分區索引的整數值,因此當在RDD類型上運行時,func必須是類型(Int,Iterator )=> Iterator
·T.
·sample(withReplacement,fraction,seed)使用給定的隨機數生成器種子,使用或不使用替換對數據的一小部分進行采樣。
·union(otherDataset)返回一個新數據集,其中包含源數據集和參數中元素的並集。
·intersection(otherDataset)返回包含源數據集和參數中元素交集的新RDD。
·distinct([numPartitions]))返回包含源數據集的不同元素的新數據集。
·groupByKey([numPartitions])在(K,V)對的數據集上調用時,返回(K,Iterable )對的數據集。
·注意:如果要對每個鍵執行聚合(例如總和或平均值)進行分組,則使用reduceByKey或aggregateByKey將產生更好的性能。
·注意:默認情況下,輸出中的並行級別取決於父RDD的分區數。
·您可以傳遞可選的numPartitions參數來設置不同數量的任務。
·reduceByKey(func,[numPartitions])當在(K,V)對的數據集上調用時,返回(K,V)對的數據集,其中使用給定的reduce函數func聚合每個鍵的值,該函數必須是
·type(V,V)=> V.與groupByKey類似,reduce任務的數量可通過可選的第二個參數進行配置。
·aggregateByKey(zeroValue)(seqOp,combOp,[numPartitions])在(K,V)對的數據集上調用時,返回(K,U)對的數據集,其中使用給定的組合函數聚合每個鍵的值,
·中性的“零”值。
·允許與輸入值類型不同的聚合值類型,同時避免不必要的分配。
·與groupByKey類似,reduce任務的數量可通過可選的第二個參數進行配置。
·sortByKey([ascending],[numPartitions])在K實現Ordered的(K,V)對的數據集上調用時,返回按鍵按升序或降序排序的(K,V)對數據集,如
·布爾升序參數。
·join(otherDataset,[numPartitions])當調用類型為(K,V)和(K,W)的數據集時,返回(K,(V,W))對的數據集以及每個鍵的所有元素對。
·通過leftOuterJoin,rightOuterJoin和fullOuterJoin支持外連接。
·cogroup(otherDataset,[numPartitions])當調用類型為(K,V)和(K,W)的數據集時,返回(K,(Iterable ,Iterable ))元組的數據集。
·此操作也稱為groupWith。
·cartesian(otherDataset)當調用類型為T和U的數據集時,返回(T,U)對的數據集(所有元素對)。
·pipe(command,[envVars])通過shell命令管道RDD的每個分區,例如:
·一個Perl或bash腳本。
·RDD元素被寫入進程的stdin,並且輸出到其stdout的行將作為字符串的RDD返回。
·coalesce(numPartitions)將RDD中的分區數減少為numPartitions。
·過濾大型數據集后,可以更有效地運行操作。
·repartition(numPartitions)隨機重新調整RDD中的數據以創建更多或更少的分區並在它們之間進行平衡。
·這總是隨機播放網絡上的所有數據。
·repartitionAndSortWithinPartitions(partitioner)根據給定的分區程序重新分區RDD,並在每個生成的分區中按鍵對記錄進行排序。
·這比調用重新分區然后在每個分區內排序更有效,因為它可以將排序推送到shuffle機器中。
Actions(動作)
·下表列出了Spark支持的一些常見操作。
·請參閱RDD API文檔(Scala,Java,Python,R)
·並配對RDD函數doc(Scala,Java)以獲取詳細信息。
·行動意義
·reduce(func)使用函數func(它接受兩個參數並返回一個)來聚合數據集的元素。
·該函數應該是可交換的和關聯的,以便可以並行正確計算。
·collect()在驅動程序中將數據集的所有元素作為數組返回。
·在過濾器或其他返回足夠小的數據子集的操作之后,這通常很有用。
·count()返回數據集中的元素數。
·first()返回數據集的第一個元素(類似於take(1))。
·take(n)返回包含數據集的前n個元素的數組。
·takeSample(withReplacement,num,[seed])返回一個數組,其中包含數據集的num個元素的隨機樣本,有或沒有替換,可選地預先指定隨機數生成器種子。
·takeOrdered(n,[ordering])使用自然順序或自定義比較器返回RDD的前n個元素。
·saveAsTextFile(path)將數據集的元素寫為本地文件系統,HDFS或任何其他Hadoop支持的文件系統中給定目錄中的文本文件(或文本文件集)。
·Spark將在每個元素上調用toString,將其轉換為文件中的一行文本。
·saveAsSequenceFile(路徑)
·(Java和Scala)將數據集的元素作為Hadoop SequenceFile寫入本地文件系統,HDFS或任何其他Hadoop支持的文件系統中的給定路徑中。
·這可以在實現Hadoop的Writable接口的鍵值對的RDD上使用。
·在Scala中,它也可以在可隱式轉換為Writable的類型上使用(Spark包括基本類型的轉換,如Int,Double,String等)。
·saveAsObjectFile(路徑)
·(Java和Scala)使用Java序列化以簡單格式編寫數據集的元素,然后可以使用SparkContext.objectFile()加載它。
·countByKey()僅適用於類型為(K,V)的RDD。
·返回(K,Int)對的散列映射,其中包含每個鍵的計數。
·foreach(func)對數據集的每個元素運行函數func。
·這通常用於副作用,例如更新累加器或與外部存儲系統交互。
·注意:在foreach()之外修改除累加器之外的變量可能會導致未定義的行為。
·有關詳細信息,請參閱了解閉包。
·Spark RDD API還公開了某些操作的異步版本,例如foreach的foreachAsync,它會立即將一個FutureAction返回給調用者,而不是在完成操作時阻塞。
·這可用於管理或等待操作的異步執行。
Shuffle operations(隨機操作)
·Spark中的某些操作會觸發稱為shuffle的事件。
·隨機播放是Spark的重新分配數據的機制,因此它可以跨分區進行不同的分組。
·這通常涉及跨執行程序和機器復制數據,使得混洗成為復雜且昂貴的操作。
Background(背景)
·為了理解在shuffle期間發生的事情,我們可以考慮reduceByKey操作的示例。
·reduceByKey操作生成一個新的RDD,其中單個鍵的所有值都組合成一個元組 - 鍵和對與該鍵關聯的所有值執行reduce函數的結果。
·挑戰在於,並非單個密鑰的所有值都必須位於同一個分區,甚至是同一個機器上,但它們必須位於同一位置才能計算結果。
·在Spark中,數據通常不跨分區分布,以便在特定操作的必要位置。
·在計算過程中,單個任務將在單個分區上運行 - 因此,要組織單個reduceByKey reduce任務執行的所有數據,Spark需要執行全部操作。
·它必須從所有分區讀取以查找所有鍵的所有值,然后將分區中的值匯總在一起以計算每個鍵的最終結果 - 這稱為shuffle。
·盡管新洗牌數據的每個分區中的元素集將是確定性的,並且分區本身的排序也是如此,但這些元素的排序不是。
·如果在隨機播放后需要可預測的有序數據,則可以使用:
·mapPartitions使用例如.sorted對每個分區進行排序
·repartitionAndSortWithinPartitions在同時重新分區的同時有效地對分區進行排序
·sortBy來創建一個全局排序的RDD
·可以導致混洗的操作包括重新分區操作,如重新分區和合並,“ByKey操作(計數除外),如groupByKey和reduceByKey,以及聯合操作,如cogroup和join。
Performance Impact(績效影響)
·Shuffle是一項昂貴的操作,因為它涉及磁盤I / O,數據序列化和網絡I / O.
·為了組織shuffle的數據,Spark生成了一系列任務 - 映射任務以組織數據,以及一組reduce任務來聚合它。
·這個術語來自MapReduce,並不直接與Spark的地圖和減少操作相關。
·在內部,各個地圖任務的結果會保留在內存中,直到它們無法適應。
·然后,這些基於目標分區進行排序並寫入單個文件。
·在reduce方面,任務讀取相關的排序塊。
·某些shuffle操作會消耗大量的堆內存,因為它們使用內存中的數據結構來在傳輸記錄之前或之后組織記錄。
·具體來說,reduceByKey和aggregateByKey在地圖側創建這些結構,並且'ByKey操作在reduce側生成這些結構。
·當數據不適合內存時,Spark會將這些表溢出到磁盤,從而導致磁盤I / O的額外開銷和垃圾收集增加。
·Shuffle還會在磁盤上生成大量中間文件。
·從Spark 1.3開始,這些文件將被保留,直到不再使用相應的RDD並進行垃圾回收。
·這樣做是為了在重新計算譜系時不需要重新創建shuffle文件。
·如果應用程序保留對這些RDD的引用或GC不經常啟動,則垃圾收集可能僅在很長一段時間后才會發生。
·這意味着長時間運行的Spark作業可能會占用大量磁盤空間。
·配置Spark上下文時,spark.local.dir配置參數指定臨時存儲目錄。
·可以通過調整各種配置參數來調整隨機行為。
·請參閱“Spark配置指南”中的“隨機行為”部分。
RDD Persistence(RDD持久性)
·Spark中最重要的功能之一是跨操作在內存中持久化(或緩存)數據集。
·當您持久保存RDD時,每個節點都會存儲它在內存中計算的任何分區,並在該數據集(或從中派生的數據集)的其他操作中重用它們。
·這使得未來的行動更快(通常超過10倍)。
·緩存是迭代算法和快速交互式使用的關鍵工具。
·您可以使用persist()或cache()方法標記要保留的RDD。
·第一次在動作中計算它,它將保留在節點的內存中。
·Spark的緩存是容錯的 - 如果丟失了RDD的任何分區,它將使用最初創建它的轉換自動重新計算。
·此外,每個持久化RDD可以使用不同的存儲級別進行存儲,例如,允許您將數據集保留在磁盤上,將其保留在內存中,但作為序列化Java對象(以節省空間),跨節點復制它。
·通過將StorageLevel對象(Scala,Java,Python)傳遞給persist()來設置這些級別。
·cache()方法是使用默認存儲級別的簡寫,即StorageLevel.MEMORY_ONLY(在內存中存儲反序列化的對象)。
·完整的存儲級別是:
·存儲級別含義
·MEMORY_ONLY將RDD存儲為JVM中的反序列化Java對象。
·如果RDD不適合內存,則某些分區將不會被緩存,並且每次需要時都會重新計算。
·這是默認級別。
·MEMORY_AND_DISK將RDD存儲為JVM中的反序列化Java對象。
·如果RDD不適合內存,請存儲不適合磁盤的分區,並在需要時從那里讀取它們。
·MEMORY_ONLY_SER
·(Java和Scala)將RDD存儲為序列化Java對象(每個分區一個字節數組)。
·這通常比反序列化對象更節省空間,特別是在使用快速序列化器時,但讀取CPU密集程度更高。
·MEMORY_AND_DISK_SER
·(Java和Scala)與MEMORY_ONLY_SER類似,但是將不適合內存的分區溢出到磁盤,而不是每次需要時動態重新計算它們。
·DISK_ONLY僅將RDD分區存儲在磁盤上。
·MEMORY_ONLY_2,MEMORY_AND_DISK_2等。與上面的級別相同,但復制兩個群集節點上的每個分區。
·OFF_HEAP(實驗)與MEMORY_ONLY_SER類似,但將數據存儲在堆外內存中。
·這需要啟用堆外內存。
·注意:在Python中,存儲的對象將始終使用Pickle庫進行序列化,因此您是否選擇序列化級別並不重要。
·Python中的可用存儲級別包括MEMORY_ONLY,MEMORY_ONLY_2,MEMORY_AND_DISK,MEMORY_AND_DISK_2,DISK_ONLY和DISK_ONLY_2。
·即使沒有用戶調用持久性,Spark也會在隨機操作(例如reduceByKey)中自動保留一些中間數據。
·這樣做是為了避免在shuffle期間節點發生故障時重新計算整個輸入。
·我們仍然建議用戶在生成的RDD上調用persist,如果他們計划重用它。
Which Storage Level to Choose?(選擇哪種存儲級別?)
·Spark的存儲級別旨在提供內存使用和CPU效率之間的不同折衷。
·我們建議您通過以下流程選擇一個:
·如果您的RDD與默認存儲級別(MEMORY_ONLY)很舒適,請保持這種狀態。
·這是CPU效率最高的選項,允許RDD上的操作盡可能快地運行。
·如果沒有,請嘗試使用MEMORY_ONLY_SER並選擇快速序列化庫,以使對象更節省空間,但仍然可以快速訪問。
·(Java和Scala)
·除非計算數據集的函數很昂貴,否則它們不會溢出到磁盤,或者它們會過濾大量數據。
·否則,重新計算分區可能與從磁盤讀取分區一樣快。
·如果要快速故障恢復,請使用復制的存儲級別(例如,如果使用Spark來處理來自Web應用程序的請求)。
·所有存儲級別通過重新計算丟失的數據提供完全容錯,但復制的存儲級別允許您繼續在RDD上運行任務,而無需等待重新計算丟失的分區。
Removing Data(刪除數據)
·Spark會自動監視每個節點上的緩存使用情況,並以最近最少使用(LRU)的方式刪除舊數據分區。
·如果您想手動刪除RDD而不是等待它退出緩存,請使用RDD.unpersist()方法。
Shared Variables(共享變量)
·通常,當在遠程集群節點上執行傳遞給Spark操作(例如map或reduce)的函數時,它將在函數中使用的所有變量的單獨副本上工作。
·這些變量將復制到每台計算機,並且遠程計算機上的變量的更新不會傳播回驅動程序。
·支持跨任務的通用,讀寫共享變量效率低下。
·但是,Spark確實為兩種常見的使用模式提供了兩種有限類型的共享變量:廣播變量和累加器。
Broadcast Variables(廣播變量)
·廣播變量允許程序員在每台機器上保留一個只讀變量,而不是隨副本一起發送它的副本。
·例如,它們可用於以有效的方式為每個節點提供大輸入數據集的副本。
·Spark還嘗試使用有效的廣播算法來分發廣播變量,以降低通信成本。
·Spark動作通過一組階段執行,由分布式“shuffle”操作分隔。
·Spark自動廣播每個階段中任務所需的公共數據。
·以這種方式廣播的數據以序列化形式緩存並在運行每個任務之前反序列化。
·這意味着顯式創建廣播變量僅在跨多個階段的任務需要相同數據或以反序列化形式緩存數據很重要時才有用。
·通過調用SparkContext.broadcast(v)從變量v創建廣播變量。
·廣播變量是v的包裝器,可以通過調用value方法訪問其值。
·下面的代碼顯示了這個:
>>> broadcastVar = sc.broadcast([1, 2, 3]) <pyspark.broadcast.Broadcast object at 0x102789f10> >>> broadcastVar.value [1, 2, 3]
·創建廣播變量后,應該在群集上運行的任何函數中使用它而不是值v,這樣v不會多次傳送到節點。
·另外,在廣播之后不應修改對象v,以便確保所有節點獲得廣播變量的相同值(例如,如果稍后將變量發送到新節點)。
Accumulators(累加器)
·累加器是僅通過關聯和交換操作“添加”的變量,因此可以並行有效地支持。
·它們可用於實現計數器(如MapReduce)或總和。
·Spark本身支持數值類型的累加器,程序員可以添加對新類型的支持。
·作為用戶,您可以創建命名或未命名的累加器。
·如下圖所示,命名累加器(在此實例計數器中)將顯示在Web UI中,用於修改該累加器的階段。
·Spark顯示“任務”表中任務修改的每個累加器的值。

跟蹤UI中的累加器對於理解運行階段的進度非常有用(注意:Python中尚不支持)。
·通過調用SparkContext.accumulator(v)從初始值v創建累加器。
·然后,可以使用add方法或+ =運算符將在群集上運行的任務添加到其中。
·但是,他們無法讀懂它的價值。
·只有驅動程序可以使用其value方法讀取累加器的值。
·下面的代碼顯示了一個累加器用於添加數組的元素:
>>> accum = sc.accumulator(0) >>> accum Accumulator<id=0, value=0> >>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x)) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s >>> accum.value 10
·雖然此代碼使用Int類型的累加器的內置支持,但程序員也可以通過繼承AccumulatorParam來創建自己的類型。
·AccumulatorParam接口有兩種方法:零用於為數據類型提供“零值”,addInPlace用於將兩個值一起添加。
·例如,假設我們有一個表示數學向量的Vector類,我們可以寫:
class VectorAccumulatorParam(AccumulatorParam): def zero(self, initialValue): return Vector.zeros(initialValue.size) def addInPlace(self, v1, v2): v1 += v2 return v1 # Then, create an Accumulator of this type: vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())
·對於僅在操作內執行的累加器更新,Spark保證每個任務對累加器的更新僅應用一次,即重新啟動的任務不會更新該值。
·在轉換中,用戶應該知道,如果重新執行任務或作業階段,則可以多次應用每個任務的更新。
·累加器不會改變Spark的惰性評估模型。
·如果在RDD上的操作中更新它們,則只有在RDD作為操作的一部分計算時才更新它們的值。
·因此,在像map()這樣的惰性轉換中進行累積器更新時,不能保證執行累加器更新。
·以下代碼片段演示了此屬性:
accum = sc.accumulator(0) def g(x): accum.add(x) return f(x) data.map(g) # Here, accum is still 0 because no actions have caused the `map` to be computed.
Deploying to a Cluster(部署到群集)
·應用程序提交指南介紹了如何將應用程序提交到群集。
·簡而言之,一旦將應用程序打包到JAR(用於Java / Scala)或一組.py或.zip文件(用於Python),bin / spark-submit腳本允許您將其提交給任何支持的集群管理器。
Launching Spark jobs from Java / Scala(從Java / Scala啟動Spark作業)
org.apache.spark.launcher包提供了使用簡單Java API將Spark作業作為子進程啟動的類。
Unit Testing(單元測試)
·Spark對任何流行的單元測試框架進行單元測試都很友好。
·只需在測試中創建一個SparkContext,主URL設置為local,運行您的操作,然后調用SparkContext.stop()將其拆除。
·確保在finally塊或測試框架的tearDown方法中停止上下文,因為Spark不支持在同一程序中同時運行的兩個上下文。
Where to Go from Here(從這往哪兒走)
You can see some example Spark programs on the Spark website. In addition, Spark includes several samples in the examples
directory (Scala,Java, Python, R). You can run Java and Scala examples by passing the class name to Spark’s bin/run-example
script; for instance:
./bin/run-example SparkPi
對於Python示例,請使用spark-submit代替:
./bin/spark-submit examples/src/main/python/pi.py
Spark SQL, DataFrames and Datasets Guide
·Spark SQL是用於結構化數據處理的Spark模塊。
·與基本的Spark RDD API不同,Spark SQL提供的接口為Spark提供了有關數據結構和正在執行的計算的更多信息。
·在內部,Spark SQL使用此額外信息來執行額外的優化。
·有幾種與Spark SQL交互的方法,包括SQL和Dataset API。
·在計算結果時,使用相同的執行引擎,與您用於表達計算的API /語言無關。
·這種統一意味着開發人員可以輕松地在不同的API之間來回切換,從而提供表達給定轉換的最自然的方式。
·此頁面上的所有示例都使用Spark分發中包含的示例數據,並且可以在spark-shell,pyspark shell或sparkR shell中運行。
SQL
·Spark SQL的一個用途是執行SQL查詢。
·Spark SQL還可用於從現有Hive安裝中讀取數據。
·有關如何配置此功能的更多信息,請參閱Hive Tables部分。
·從其他編程語言中運行SQL時,結果將作為數據集/數據框返回。
·您還可以使用命令行或JDBC / ODBC與SQL接口進行交互。
Datasets and DataFrames
·數據集是分布式數據集合。
·數據集是Spark 1.6中添加的一個新接口,它提供了RDD的優勢(強類型,使用強大的lambda函數的能力)和Spark SQL優化執行引擎的優點。
·數據集可以從JVM對象構造,然后使用功能轉換(map,flatMap,filter等)進行操作。
·數據集API在Scala和Java中可用。
·Python沒有對Dataset API的支持。
·但由於Python的動態特性,數據集API的許多好處已經可用(即您可以通過名稱自然地訪問行的字段row.columnName)。
·R的情況類似。
·DataFrame是一個組織成命名列的數據集。
·它在概念上等同於關系數據庫中的表或R / Python中的數據框,但在底層具有更豐富的優化。
·DataFrame可以從多種來源構建,例如:結構化數據文件,Hive中的表,外部數據庫或現有RDD。
·DataFrame API在Scala,Java,Python和R中可用。在Scala和Java中,DataFrame由行數據集表示。
·在Scala API中,DataFrame只是Dataset [Row]的類型別名。
·而在Java API中,用戶需要使用Dataset 來表示DataFrame。
·在本文檔中,我們經常將行的Scala / Java數據集稱為DataFrame。
Getting Started(入門)
Starting Point: SparkSession(起點:SparkSession)
·Spark中所有功能的入口點是SparkSession類。
·要創建基本的SparkSession,只需使用SparkSession.builder:
from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .config("spark.some.config.option", "some-value") \ .getOrCreate()
·在Spark repo中的“examples / src / main / python / sql / basic.py”中找到完整的示例代碼。
·Spark 2.0中的SparkSession為Hive功能提供內置支持,包括使用HiveQL編寫查詢,訪問Hive UDF以及從Hive表讀取數據的功能。
·要使用這些功能,您無需擁有現有的Hive設置。
Creating DataFrames(創建DataFrame)
·使用SparkSession,應用程序可以從現有RDD,Hive表或Spark數據源創建DataFrame。
·作為示例,以下內容基於JSON文件的內容創建DataFrame:
# spark is an existing SparkSession
df = spark.read.json("examples/src/main/resources/people.json") # Displays the content of the DataFrame to stdout df.show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+
Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.
Untyped Dataset Operations (aka DataFrame Operations)無類型數據集操作(又名DataFrame操作)
·DataFrames為Scala,Java,Python和R中的結構化數據操作提供特定於域的語言。
·如上所述,在Spark 2.0中,DataFrames只是Scala和Java API中Rows的數據集。
·與“類型轉換”相比,這些操作也稱為“無類型轉換”,帶有強類型Scala / Java數據集。
·這里我們包括使用數據集進行結構化數據處理的一些基本示例:
·在Python中,可以通過屬性(df.age)或索引(df ['age'])訪問DataFrame的列。
·雖然前者便於交互式數據探索,但強烈建議用戶使用后一種形式,這是未來的證明,不會破壞也是DataFrame類屬性的列名。
# spark, df are from the previous example
# Print the schema in a tree format df.printSchema() # root # |-- age: long (nullable = true) # |-- name: string (nullable = true) # Select only the "name" column df.select("name").show() # +-------+ # | name| # +-------+ # |Michael| # | Andy| # | Justin| # +-------+ # Select everybody, but increment the age by 1 df.select(df['name'], df['age'] + 1).show() # +-------+---------+ # | name|(age + 1)| # +-------+---------+ # |Michael| null| # | Andy| 31| # | Justin| 20| # +-------+---------+ # Select people older than 21 df.filter(df['age'] > 21).show() # +---+----+ # |age|name| # +---+----+ # | 30|Andy| # +---+----+ # Count people by age df.groupBy("age").count().show() # +----+-----+ # | age|count| # +----+-----+ # | 19| 1| # |null| 1| # | 30| 1| # +----+-----+
·在Spark repo中的“examples / src / main / python / sql / basic.py”中找到完整的示例代碼。
·有關可在DataFrame上執行的操作類型的完整列表,請參閱API文檔。
·除了簡單的列引用和表達式之外,DataFrame還具有豐富的函數庫,包括字符串操作,日期算術,常見的數學運算等。
·完整列表可在DataFrame函數參考中找到。
Running SQL Queries Programmatically(以編程方式運行SQL查詢)
SparkSession上的sql函數使應用程序能夠以編程方式運行SQL查詢並將結果作為DataFrame返回。
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people") sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+
Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.
Global Temporary View(全球臨時觀點)
·Spark SQL中的臨時視圖是會話范圍的,如果創建它的會話終止,它將消失。
·如果您希望擁有一個在所有會話之間共享的臨時視圖並保持活動狀態,直到Spark應用程序終止,您可以創建一個全局臨時視圖。
·全局臨時視圖與系統保留的數據庫global_temp綁定,我們必須使用限定名稱來引用它,例如
·SELECT * FROM global_temp.view1。
# Register the DataFrame as a global temporary view
df.createGlobalTempView("people") # Global temporary view is tied to a system preserved database `global_temp` spark.sql("SELECT * FROM global_temp.people").show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+ # Global temporary view is cross-session spark.newSession().sql("SELECT * FROM global_temp.people").show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+
Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.
Creating Datasets(創建數據集)
·數據集與RDD類似,但是,它們不使用Java序列化或Kryo,而是使用專用的編碼器來序列化對象以便通過網絡進行處理或傳輸。
·雖然編碼器和標准序列化都負責將對象轉換為字節,但編碼器是動態生成的代碼,並使用一種格式,允許Spark執行許多操作,如過濾,排序和散列,而無需將字節反序列化為對象。
case class Person(name: String, age: Long) // Encoders are created for case classes val caseClassDS = Seq(Person("Andy", 32)).toDS() caseClassDS.show() // +----+---+ // |name|age| // +----+---+ // |Andy| 32| // +----+---+ // Encoders for most common types are automatically provided by importing spark.implicits._ val primitiveDS = Seq(1, 2, 3).toDS() primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4) // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name val path = "examples/src/main/resources/people.json" val peopleDS = spark.read.json(path).as[Person] peopleDS.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.
Interoperating with RDDs(與RDD互操作)
·Spark SQL支持兩種不同的方法將現有RDD轉換為數據集。
·第一種方法使用反射來推斷包含特定類型對象的RDD的模式。
·這種基於反射的方法可以提供更簡潔的代碼,並且在您編寫Spark應用程序時已經了解模式時可以很好地工作。
·創建數據集的第二種方法是通過編程接口,允許您構建模式,然后將其應用於現有RDD。
·雖然此方法更詳細,但它允許您在直到運行時才知道列及其類型時構造數據集。
Inferring the Schema Using Reflection(使用反射推斷模式)
·Spark SQL可以將Row對象的RDD轉換為DataFrame,從而推斷出數據類型。
·通過將鍵/值對列表作為kwargs傳遞給Row類來構造行。
·此列表的鍵定義表的列名稱,並通過對整個數據集進行采樣來推斷類型,類似於對JSON文件執行的推斷
from pyspark.sql import Row sc = spark.sparkContext # Load a text file and convert each line to a Row. lines = sc.textFile("examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) people = parts.map(lambda p: Row(name=p[0], age=int(p[1]))) # Infer the schema, and register the DataFrame as a table. schemaPeople = spark.createDataFrame(people) schemaPeople.createOrReplaceTempView("people") # SQL can be run over DataFrames that have been registered as a table. teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") # The results of SQL queries are Dataframe objects. # rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`. teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect() for name in teenNames: print(name) # Name: Justin
Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.
Programmatically Specifying the Schema(以編程方式指定架構)
·當無法提前定義kwargs字典時(例如,記錄結構以字符串形式編碼,或者文本數據集將被解析,字段將以不同方式為不同用戶進行投影),可以使用編程方式創建DataFrame
·三個步驟。
·從原始RDD創建元組或列表的RDD;
·創建由StructType表示的模式,該模式與步驟1中創建的RDD中的元組或列表的結構相匹配。
·通過SparkSession提供的createDataFrame方法將模式應用於RDD。
例如:
# Import data types
from pyspark.sql.types import * sc = spark.sparkContext # Load a text file and convert each line to a Row. lines = sc.textFile("examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) # Each line is converted to a tuple. people = parts.map(lambda p: (p[0], p[1].strip())) # The schema is encoded in a string. schemaString = "name age" fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()] schema = StructType(fields) # Apply the schema to the RDD. schemaPeople = spark.createDataFrame(people, schema) # Creates a temporary view using the DataFrame schemaPeople.createOrReplaceTempView("people") # SQL can be run over DataFrames that have been registered as a table. results = spark.sql("SELECT name FROM people") results.show() # +-------+ # | name| # +-------+ # |Michael| # | Andy| # | Justin| # +-------+
Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.
Aggregations(聚合)
·內置的DataFrames函數提供常見的聚合,如count(),countDistinct(),avg(),max(),min()等。雖然這些函數是為DataFrames設計的,但Spark SQL也有類型安全的版本
·其中一些在Scala和Java中使用強類型數據集。
·此外,用戶不限於預定義的聚合函數,並且可以創建自己的聚合函數。
Untyped User-Defined Aggregate Functions(無用戶定義的聚合函數)
·用戶必須擴展UserDefinedAggregateFunction抽象類以實現自定義無類型聚合函數。
·例如,用戶定義的平均值可能如下所示:
import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.expressions.MutableAggregationBuffer import org.apache.spark.sql.expressions.UserDefinedAggregateFunction import org.apache.spark.sql.types._ object MyAverage extends UserDefinedAggregateFunction { // Data types of input arguments of this aggregate function def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil) // Data types of values in the aggregation buffer def bufferSchema: StructType = { StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil) } // The data type of the returned value def dataType: DataType = DoubleType // Whether this function always returns the same output on the identical input def deterministic: Boolean = true // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides // the opportunity to update its values. Note that arrays and maps inside the buffer are still // immutable. def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0L buffer(1) = 0L } // Updates the given aggregation buffer `buffer` with new input data from `input` def update(buffer: MutableAggregationBuffer, input: Row): Unit = { if (!input.isNullAt(0)) { buffer(0) = buffer.getLong(0) + input.getLong(0) buffer(1) = buffer.getLong(1) + 1 } } // Merges two aggregation buffers and stores the updated buffer values back to `buffer1` def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0) buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1) } // Calculates the final result def evaluate(buffer: Row): Double = buffer.