Spark譯文(一)


Spark Overview(Spark概述)

·Apache Spark是一種快速通用的集群計算系統。
·它提供Java,Scala,Python和R中的高級API,以及支持通用執行圖的優化引擎。
·它還支持豐富的高級工具集,包括用於SQL和結構化數據處理的Spark SQL,用於機器學習的MLlib,用於圖形處理的GraphX和Spark Streaming

Security(安全性)

·Spark中的安全性默認為OFF。
·這可能意味着您很容易受到默認攻擊。
·在下載和運行Spark之前,請參閱Spark Security

Downloading

·從項目網站的下載頁面獲取Spark。
·本文檔適用於Spark版本2.4.2。
·Spark使用Hadoop的客戶端庫來實現HDFS和YARN。
·下載是針對少數流行的Hadoop版本預先打包的。
·用戶還可以通過增加Spark的類路徑下載“Hadoop免費”二進制文件並使用任何Hadoop版本運行Spark。
·Scala和Java用戶可以使用Maven坐標在他們的項目中包含Spark,並且將來Python用戶也可以從PyPI安裝Spark。
·如果您想從源代碼構建Spark,請訪問Building Spark。
·Spark在Windows和類UNIX系統(例如Linux,Mac OS)上運行。
·在一台機器上本地運行很容易 - 您只需要在系統PATH上安裝Java,或者指向Java安裝的JAVA_HOME環境變量。
·Spark運行在Java 8 +,Python 2.7 + / 3.4 +和R 3.1+上。
·對於Scala API,Spark 2.4.2使用Scala 2.12。
·您需要使用兼容的Scala版本(2.12.x)。
·請注意,自Spark 2.2.0起,對2.6.5之前的Java 7,Python 2.6和舊Hadoop版本的支持已被刪除。
·自2.3.0起,對Scala 2.10的支持被刪除。
·自Spark 2.4.1起,對Scala 2.11的支持已被棄用,將在Spark 3.0中刪除。

Running the Examples and Shell(運行示例和Shell)

·Spark附帶了幾個示例程序。
·Scala,Java,Python和R示例位於examples / src / main目錄中。
·要運行其中一個Java或Scala示例程序,請在頂級Spark目錄中使用bin / run-example [params]。
·(在幕后,這將調用更常用的spark-submit腳本來啟動應用程序)。
·例如
./bin/run-example SparkPi 10
·您還可以通過Scala shell的修改版本以交互方式運行Spark。
·這是學習框架的好方法。
./bin/spark-shell --master local[2]
·--master選項指定分布式集群的主URL,或本地在一個線程上本地運行,或本地[N]在本地運行N個線程。
·您應該首先使用local進行測試。
·有關選項的完整列表,請使用--help選項運行Spark shell。
·Spark還提供了一個Python API。
·要在Python解釋器中以交互方式運行Spark,請使用bin / pyspark:
./bin/pyspark --master local[2]
·Python中也提供了示例應用程序。
·例如:
./bin/spark-submit examples/src/main/python/pi.py 10

Quick Start(快速開始)

·本教程簡要介紹了如何使用Spark。
·我們將首先通過Spark的交互式shell(在Python或Scala中)介紹API,然后展示如何使用Java,Scala和Python編寫應用程序。
·要繼續本指南,首先,從Spark網站下載Spark的打包版本。
·由於我們不會使用HDFS,您可以下載任何版本的Hadoop的軟件包。
·請注意,在Spark 2.0之前,Spark的主要編程接口是Resilient Distributed Dataset(RDD)。
·在Spark 2.0之后,RDD被數據集取代,數據集像RDD一樣強類型,但在底層有更豐富的優化。
·仍然支持RDD接口,您可以在RDD編程指南中獲得更詳細的參考。
·但是,我們強烈建議您切換到使用數據集,它具有比RDD更好的性能。
·請參閱SQL編程指南以獲取有關數據集的更多信息

Interactive Analysis with the Spark Shell(使用Spark Shell進行交互式分析)

Basics(基本)

·Spark的shell提供了一種學習API的簡單方法,以及一種以交互方式分析數據的強大工具。
·它可以在Scala(在Java VM上運行,因此是使用現有Java庫的好方法)或Python中使用。
·通過在Spark目錄中運行以下命令來啟動它:
./bin/pyspark

或者如果在當前環境中使用pip安裝了PySpark:
pyspark
·Spark的主要抽象是一個名為Dataset的分布式項目集合。
·可以從Hadoop InputFormats(例如HDFS文件)或通過轉換其他數據集來創建數據集。
·由於Python的動態特性,我們不需要在Python中強類型數據集。
·因此,Python中的所有數據集都是Dataset [Row],我們稱之為DataFrame與Pandas和R中的數據框概念一致。讓我們從Spark源目錄中的README文件的文本中創建一個新的DataFrame:
>>> textFile = spark.read.text("README.md")
·您可以通過調用某些操作直接從DataFrame獲取值,也可以轉換DataFrame以獲取新值。
·有關更多詳細信息,請閱讀API文檔。
>>> textFile.count() # Number of rows in this DataFrame 126 >>> textFile.first() # First row in this DataFrame Row(value=u'# Apache Spark')
·現在讓我們將這個DataFrame轉換為一個新的DataFrame。
·我們調用filter來返回一個新的DataFrame,其中包含文件中的一行子集。
>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))
我們可以將轉換和行動聯系在一起:
>>> textFile.filter(textFile.value.contains("Spark")).count() # How many lines contain "Spark"? 15

More on Dataset Operations(有關數據集操作的更多信息)

·數據集操作和轉換可用於更復雜的計算。
·假設我們想要找到含有最多單詞的行:
>>> from pyspark.sql.functions import * >>> textFile.select(size(split(textFile.value, "\s+")).name("numWords")).agg(max(col("numWords"))).collect() [Row(max(numWords)=15)]
·這首先將一行映射為整數值,並將其別名為“numWords”,從而創建一個新的DataFrame。
·在該DataFrame上調用agg以查找最大字數。
·select和agg的參數都是Column,我們可以使用df.colName從DataFrame中獲取一列。
·我們還可以導入pyspark.sql.functions,它提供了許多方便的功能來從舊的列構建一個新的列。
·一個常見的數據流模式是MapReduce,由Hadoop推廣。
·Spark可以輕松實現MapReduce流程:
>>> wordCounts = textFile.select(explode(split(textFile.value, "\s+")).alias("word")).groupBy("word").count()
·在這里,我們使用select中的explode函數,將行數據集轉換為單詞數據集,然后將groupBy和count結合起來計算文件中的每個單詞計數,作為2列的DataFrame:“word”和“
·計數”。
·要在我們的shell中收集單詞count,我們可以調用collect:
>>> wordCounts.collect() [Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...]

Caching(高速緩存)

·Spark還支持將數據集提取到群集范圍的內存緩存中。
·這在重復訪問數據時非常有用,例如查詢小的“熱”數據集或運行像PageRank這樣的迭代算法時。
·舉個簡單的例子,讓我們標記要緩存的linesWithSpark數據集:
>>> linesWithSpark.cache() >>> linesWithSpark.count() 15 >>> linesWithSpark.count() 15
·使用Spark來探索和緩存100行文本文件似乎很愚蠢。
·有趣的是,這些相同的功能可用於非常大的數據集,即使它們跨越數十個或數百個節點進行條帶化。
·您也可以通過將bin / pyspark連接到群集來交互式地執行此操作,如RDD編程指南中所述。

Self-Contained Applications(自包含的應用程序)

·假設我們希望使用Spark API編寫一個自包含的應用程序。
·我們將在Scala(使用sbt),Java(使用Maven)和Python(pip)中使用簡單的應用程序。
·現在我們將展示如何使用Python API(PySpark)編寫應用程序。
·如果要構建打包的PySpark應用程序或庫,可以將其添加到setup.py文件中:
install_requires=[ 'pyspark=={site.SPARK_VERSION}' ]
作為示例,我們將創建一個簡單的Spark應用程序SimpleApp.py:
"""SimpleApp.py""" from pyspark.sql import SparkSession logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system spark = SparkSession.builder.appName("SimpleApp").getOrCreate() logData = spark.read.text(logFile).cache() numAs = logData.filter(logData.value.contains('a')).count() numBs = logData.filter(logData.value.contains('b')).count() print("Lines with a: %i, lines with b: %i" % (numAs, numBs)) spark.stop()
·該程序只計算包含'a'的行數和包含文本文件中'b'的數字。
·請注意,您需要將YOUR_SPARK_HOME替換為安裝Spark的位置。
·與Scala和Java示例一樣,我們使用SparkSession來創建數據集。
·對於使用自定義類或第三方庫的應用程序,我們還可以通過將它們打包到.zip文件中來添加代碼依賴關系以通過其--py-files參數進行spark-submit(有關詳細信息,請參閱spark-submit --help)。
·SimpleApp非常簡單,我們不需要指定任何代碼依賴項。
我們可以使用bin / spark-submit腳本運行此應用程序:
# Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit \ --master local[4] \ SimpleApp.py ... Lines with a: 46, Lines with b: 23

如果您的環境中安裝了PySpark pip(例如,pip install pyspark),您可以使用常規Python解釋器運行您的應用程序,或者根據您的喜好使用提供的“spark-submit”。
# Use the Python interpreter to run your application $ python SimpleApp.py ... Lines with a: 46, Lines with b: 23

RDD Programming Guide(RDD編程指南)

Overview(概觀)

·在較高的層次上,每個Spark應用程序都包含一個驅動程序,該程序運行用戶的主要功能並在群集上執行各種並行操作。
·Spark提供的主要抽象是彈性分布式數據集(RDD),它是跨群集節點分區的元素的集合,可以並行操作。
·RDD是通過從Hadoop文件系統(或任何其他Hadoop支持的文件系統)中的文件或驅動程序中的現有Scala集合開始並對其進行轉換而創建的。
·用戶還可以要求Spark在內存中保留RDD,允許它在並行操作中有效地重用。
·最后,RDD會自動從節點故障中恢復。
·Spark中的第二個抽象是可以在並行操作中使用的共享變量。
·默認情況下,當Spark並行運行一個函數作為不同節點上的一組任務時,它會將函數中使用的每個變量的副本發送給每個任務。
·有時,變量需要跨任務共享,或者在任務和驅動程序之間共享。
·Spark支持兩種類型的共享變量:廣播變量,可用於緩存所有節點的內存中的值;累加器,它們是僅“添加”到的變量,例如計數器和總和。
·本指南以Spark支持的每種語言顯示了這些功能。
·如果你啟動Spark的交互式shell,最簡單的方法就是 - 用於Scala shell的bin / spark-shell或用於Python的bin / pyspark。

Linking with Spark(與Spark鏈接)

·Spark 2.4.2適用於Python 2.7+或Python 3.4+。
·它可以使用標准的CPython解釋器,因此可以使用像NumPy這樣的C庫。
·它也適用於PyPy 2.3+。
·Spark 2.2.0中刪除了Python 2.6支持。
·Python中的Spark應用程序可以使用bin / spark-submit腳本運行,該腳本在運行時包含Spark,也可以將其包含在setup.py中:
install_requires=[ 'pyspark=={site.SPARK_VERSION}' ]
·要在不使用pip安裝PySpark的情況下在Python中運行Spark應用程序,請使用位於Spark目錄中的bin / spark-submit腳本。
·此腳本將加載Spark的Java / Scala庫,並允許您將應用程序提交到群集。
·您還可以使用bin / pyspark來啟動交互式Python shell。
·如果您希望訪問HDFS數據,則需要使用PySpark構建鏈接到您的HDFS版本。
·Spark主頁上還提供了預構建的軟件包,可用於常見的HDFS版本。
·最后,您需要將一些Spark類導入到您的程序中。
·添加以下行:
from pyspark import SparkContext, SparkConf
·PySpark在驅動程序和工作程序中都需要相同的次要版本的Python。
·它使用PATH中的默認python版本,您可以指定PYSPARK_PYTHON要使用的Python版本,例如:
$ PYSPARK_PYTHON=python3.4 bin/pyspark $ PYSPARK_PYTHON=/opt/pypy-2.5/bin/pypy bin/spark-submit examples/src/main/python/pi.py

Initializing Spark(初始化Spark)

·Spark程序必須做的第一件事是創建一個SparkContext對象,它告訴Spark如何訪問集群。
·要創建SparkContext,首先需要構建一個包含有關應用程序信息的SparkConf對象。
conf = SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf)
·appName參數是應用程序在集群UI上顯示的名稱。
·master是Spark,Mesos或YARN群集URL,或者是以本地模式運行的特殊“本地”字符串。
·實際上,在群集上運行時,您不希望在程序中對master進行硬編碼,而是使用spark-submit啟動應用程序並在那里接收它。
·但是,對於本地測試和單元測試,您可以傳遞“local”以在進程中運行Spark。

Using the Shell(使用Shell)

·在PySpark shell中,已經為你創建了一個特殊的解釋器感知SparkContext,名為sc。
·制作自己的SparkContext將無法正常工作。
·您可以使用--master參數設置上下文連接到的主服務器,並且可以通過將逗號分隔的列表傳遞給--py-files將Python .zip,.egg或.py文件添加到運行時路徑。
·您還可以通過向--packages參數提供以逗號分隔的Maven坐標列表,將依賴項(例如Spark包)添加到shell會話中。
·任何可能存在依賴關系的其他存儲庫(例如Sonatype)都可以傳遞給--repositories參數。
·必要時,必須使用pip手動安裝Spark軟件包具有的任何Python依賴項(在該軟件包的requirements.txt中列出)。
·例如,要在四個核心上運行bin / pyspark,請使用:
$ ./bin/pyspark --master local[4]

或者,要將code.py添加到搜索路徑(以便以后能夠導入代碼),請使用:
$ ./bin/pyspark --master local[4] --py-files code.py
·有關選項的完整列表,請運行pyspark --help。
·在幕后,pyspark調用更一般的spark-submit腳本。
·也可以在增強的Python解釋器IPython中啟動PySpark shell。
·PySpark適用於IPython 1.0.0及更高版本。
·要使用IPython,請在運行bin / pyspark時將PYSPARK_DRIVER_PYTHON變量設置為ipython:
$ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark

要使用Jupyter notebook(以前稱為IPython notebook)
$ PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=notebook ./bin/pyspark
·您可以通過設置PYSPARK_DRIVER_PYTHON_OPTS來自定義ipython或jupyter命令。
·啟動Jupyter Notebook服務器后,您可以從“文件”選項卡創建一個新的“Python 2”筆記本。
·在筆記本內部,您可以在開始嘗試使用Jupyter notebook中的Spark之前輸入命令%pylab inline作為筆記本的一部分。

Resilient Distributed Datasets (彈性分布式數據集)(RDDs)

·Spark圍繞彈性分布式數據集(RDD)的概念展開,RDD是一個可以並行操作的容錯的容錯集合。
·創建RDD有兩種方法:並行化驅動程序中的現有集合,或引用外部存儲系統中的數據集,例如共享文件系統,HDFS,HBase或提供Hadoop InputFormat的任何數據源。

Parallelized Collections(並行化集合)

·通過在驅動程序中的現有可迭代或集合上調用SparkContext的parallelize方法來創建並行化集合。
·復制集合的元素以形成可以並行操作的分布式數據集。
·例如,以下是如何創建包含數字1到5的並行化集合:
data = [1, 2, 3, 4, 5] distData = sc.parallelize(data)
·一旦創建,分布式數據集(distData)可以並行操作。
·例如,我們可以調用distData.reduce(lambda a,b:a + b)來添加列表的元素。
·我們稍后將描述對分布式數據集的操作。
·並行集合的一個重要參數是將數據集切割為的分區數。
·Spark將為群集的每個分區運行一個任務。
·通常,您希望群集中的每個CPU有2-4個分區。
·通常,Spark會嘗試根據您的群集自動設置分區數。
·但是,您也可以通過將其作為第二個參數傳遞給並行化來手動設置它(例如sc.parallelize(data,10))。
·注意:代碼中的某些位置使用術語切片(分區的同義詞)來保持向后兼容性。

External Datasets(外部數據集)

·PySpark可以從Hadoop支持的任何存儲源創建分布式數據集,包括本地文件系統,HDFS,Cassandra,HBase,Amazon S3等.Spark支持文本文件,SequenceFiles和任何其他Hadoop InputFormat。
·可以使用SparkContext的textFile方法創建文本文件RDD。
·此方法獲取文件的URI(計算機上的本地路徑,或hdfs://,s3a://等URI)並將其作為行集合讀取。
·這是一個示例調用:
>>> distFile = sc.textFile("data.txt")
·創建后,distFile可以由數據集操作執行。
·例如,我們可以使用map添加所有行的大小,並按如下方式減少操作:distFile.map(lambda s:len(s))。reduce(lambda a,b:a + b)。
·有關使用Spark讀取文件的一些注意事項
·如果在本地文件系統上使用路徑,則還必須可以在工作節點上的相同路徑上訪問該文件。
·將文件復制到所有工作者或使用網絡安裝的共享文件系統。
·Spark的所有基於文件的輸入方法(包括textFile)都支持在目錄,壓縮文件和通配符上運行。
·例如,您可以使用textFile(“/ my / directory”),textFile(“/ my / directory / * .txt”)和textFile(“/ my / directory / * .gz”)。
·textFile方法還采用可選的第二個參數來控制文件的分區數。
·默認情況下,Spark為文件的每個塊創建一個分區(HDFS中默認為128MB),但您也可以通過傳遞更大的值來請求更多的分區。
·請注意,您不能擁有比塊少的分區。
·除文本文件外,Spark的Python API還支持其他幾種數據格式:
·SparkContext.wholeTextFiles允許您讀取包含多個小文本文件的目錄,並將它們作為(文件名,內容)對返回。
·這與textFile形成對比,textFile將在每個文件中每行返回一條記錄。
·RDD.saveAsPickleFile和SparkContext.pickleFile支持以包含pickle Python對象的簡單格式保存RDD。
·批處理用於pickle序列化,默認批處理大小為10。
·SequenceFile和Hadoop輸入/輸出格式
·請注意,此功能目前標記為“實驗”,適用於高級用戶。
·將來可能會使用基於Spark SQL的讀/寫支持替換它,在這種情況下,Spark SQL是首選方法。
·可寫支持
·PySpark SequenceFile支持在Java中加載鍵值對的RDD,將Writable轉換為基本Java類型,並使用Pyrolite挖掘生成的Java對象。
·將鍵值對的RDD保存到SequenceFile時,PySpark會反過來。
·它將Python對象解開為Java對象,然后將它們轉換為Writable。
·以下Writable會自動轉換:
Writable Type(可寫類型) Python Type
Text unicode str
IntWritable int
FloatWritable float
DoubleWritable float
BooleanWritable bool
BytesWritable bytearray
NullWritable None
MapWritable dict
·數組不是開箱即用的。
·用戶在讀取或寫入時需要指定自定義ArrayWritable子類型。
·編寫時,用戶還需要指定將數組轉換為自定義ArrayWritable子類型的自定義轉換器。
·在讀取時,默認轉換器將自定義ArrayWritable子類型轉換為Java Object [],然后將其pickle到Python元組。
·要為原始類型的數組獲取Python array.array,用戶需要指定自定義轉換器。
Saving and Loading SequenceFiles(保存和加載SequenceFiles)
·與文本文件類似,可以通過指定路徑來保存和加載SequenceFiles。
·可以指定鍵和值類,但對於標准Writable,這不是必需的。
$ ./bin/pyspark --jars /path/to/elasticsearch-hadoop.jar >>> conf = {"es.resource" : "index/type"} # assume Elasticsearch is running on localhost defaults >>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat", "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf) >>> rdd.first() # the result is a MapWritable that is converted to a Python dict (u'Elasticsearch ID', {u'field1': True, u'field2': u'Some Text', u'field3': 12345})
·請注意,如果InputFormat僅依賴於Hadoop配置和/或輸入路徑,並且可以根據上表輕松轉換鍵和值類,則此方法應適用於此類情況。
·如果您有自定義序列化二進制數據(例如從Cassandra / HBase加載數據),那么您首先需要將Scala / Java端的數據轉換為可由Pyrolite的pickler處理的數據。
·為此提供了轉換器特性。
·只需擴展此特征並在convert方法中實現轉換代碼。
·請記住確保將此類以及訪問InputFormat所需的任何依賴項打包到Spark作業jar中並包含在PySpark類路徑中。
·有關使用Cassandra / HBase InputFormat和OutputFormat以及自定義轉換器的示例,請參閱Python示例和Converter示例。

RDD Operations(RDD操作)

·RDD支持兩種類型的操作:轉換(從現有數據集創建新數據集)和操作(在數據集上運行計算后將值返回到驅動程序)。
·例如,map是一個轉換,它通過一個函數傳遞每個數據集元素,並返回一個表示結果的新RDD。
·另一方面,reduce是一個使用某個函數聚合RDD的所有元素的操作,並將最終結果返回給驅動程序(盡管還有一個返回分布式數據集的並行reduceByKey)。
·Spark中的所有轉換都是惰性的,因為它們不會立即計算結果。
·相反,他們只記得應用於某些基礎數據集(例如文件)的轉換。
·僅當操作需要將結果返回到驅動程序時才會計算轉換。
·這種設計使Spark能夠更有效地運行。
·例如,我們可以意識到通過map創建的數據集將用於reduce,並且僅將reduce的結果返回給驅動程序,而不是更大的映射數據集。
·默認情況下,每次對其執行操作時,都可以重新計算每個轉換后的RDD。
·但是,您也可以使用持久化(或緩存)方法在內存中保留RDD,在這種情況下,Spark會在群集上保留元素,以便在下次查詢時更快地訪問。
·還支持在磁盤上保留RDD或在多個節點上復制。

Basics(基本)

為了說明RDD基礎知識,請考慮以下簡單程序:

lines = sc.textFile("data.txt") lineLengths = lines.map(lambda s: len(s)) totalLength = lineLengths.reduce(lambda a, b: a + b)
·第一行定義來自外部文件的基本RDD。
·此數據集未加載到內存中或以其他方式執行:行僅僅是指向文件的指針。
·第二行將lineLengths定義為地圖轉換的結果。
·同樣,由於懶惰,lineLengths不會立即計算。
·最后,我們運行reduce,這是一個動作。
·此時,Spark將計算分解為在不同機器上運行的任務,並且每台機器都運行其部分映射和本地縮減,僅返回其對驅動程序的答案。
·如果我們以后想再次使用lineLengths,我們可以添加:
lineLengths.persist()

在reduce之前,這將導致lineLengths在第一次計算之后保存在內存中。

Passing Functions to Spark(將函數傳遞給Spark)

·Spark的API在很大程度上依賴於在驅動程序中傳遞函數以在集群上運行。
·有三種建議的方法可以做到這一點:
·Lambda表達式,用於可以作為表達式編寫的簡單函數。
·(Lambdas不支持多語句函數或不返回值的語句。)
·調用Spark的函數內部的本地defs,用於更長的代碼。
·模塊中的頂級函數。
·例如,要傳遞比使用lambda支持的更長的函數,請考慮以下代碼:
"""MyScript.py""" if __name__ == "__main__": def myFunc(s): words = s.split(" ") return len(words) sc = SparkContext(...) sc.textFile("file.txt").map(myFunc)
·請注意,雖然也可以將引用傳遞給類實例中的方法(而不是單例對象),但這需要發送包含該類的對象以及方法。
·例如,考慮:
class MyClass(object): def func(self, s): return s def doStuff(self, rdd): return rdd.map(self.func)
·在這里,如果我們創建一個新的MyClass並在其上調用doStuff,那里的map會引用該MyClass實例的func方法,因此需要將整個對象發送到集群。
·以類似的方式,訪問外部對象的字段將引用整個對象:
class MyClass(object): def __init__(self): self.field = "Hello" def doStuff(self, rdd): return rdd.map(lambda s: self.field + s)
要避免此問題,最簡單的方法是將字段復制到局部變量中,而不是從外部訪問它:
def doStuff(self, rdd): field = self.field return rdd.map(lambda s: field + s)

Understanding closures(理解閉包)

·Spark的一個難點是在跨集群執行代碼時理解變量和方法的范圍和生命周期。
·修改其范圍之外的變量的RDD操作可能經常引起混淆。
·在下面的示例中,我們將查看使用foreach()遞增計數器的代碼,但同樣的問題也可能發生在其他操作中。

Example

·考慮下面的天真RDD元素總和,根據執行是否在同一JVM中發生,它可能表現不同。
·一個常見的例子是在本地模式下運行Spark(--master = local [n])而不是將Spark應用程序部署到集群(例如通過spark-submit to YARN):
counter = 0 rdd = sc.parallelize(data) # Wrong: Don't do this!! def increment_counter(x): global counter counter += x rdd.foreach(increment_counter) print("Counter value: ", counter)

Local vs. cluster modes(本地與群集模式)

·上述代碼的行為未定義,可能無法按預期工作。
·為了執行作業,Spark將RDD操作的處理分解為任務,每個任務都由執行程序執行。
·在執行之前,Spark計算任務的閉包。
·閉包是那些變量和方法,它們必須是可見的,以便執行程序在RDD上執行其計算(在本例中為foreach())。
·該閉包被序列化並發送給每個執行者。
·發送給每個執行程序的閉包內的變量現在是副本,因此,當在foreach函數中引用計數器時,它不再是驅動程序節點上的計數器。
·驅動程序節點的內存中仍然有一個計數器,但執行程序不再可見!
·執行程序只能看到序列化閉包中的副本。
·因此,計數器的最終值仍然為零,因為計數器上的所有操作都引用了序列化閉包內的值。
·在本地模式下,在某些情況下,foreach函數實際上將在與驅動程序相同的JVM中執行,並將引用相同的原始計數器,並且可能實際更新它。
·為了確保在這些場景中定義良好的行為,應該使用累加器。
·Spark中的累加器專門用於提供一種機制,用於在跨集群中的工作節點拆分執行時安全地更新變量。
·本指南的“累加器”部分更詳細地討論了這些內容。
·通常,閉包 - 類似循環或本地定義的方法的構造不應該用於改變某些全局狀態。
·Spark沒有定義或保證從閉包外部引用的對象的突變行為。
·執行此操作的某些代碼可能在本地模式下工作,但這只是偶然的,並且此類代碼在分布式模式下不會按預期運行。
·如果需要某些全局聚合,請使用累加器。

Printing elements of an RDD(打印RDD的元素)

·另一個常見的習慣用法是嘗試使用rdd.foreach(println)或rdd.map(println)打印出RDD的元素。
·在一台機器上,這將生成預期的輸出並打印所有RDD的元素。
·但是,在集群模式下,執行程序調用的stdout輸出現在寫入執行程序的stdout,而不是驅動程序上的那個,因此驅動程序上的stdout不會顯示這些!
·要打印驅動程序上的所有元素,可以使用collect()方法首先將RDD帶到驅動程序節點:rdd.collect()。foreach(println)。
·但是,這會導致驅動程序內存不足,因為collect()會將整個RDD提取到一台機器上;
·如果你只需要打印RDD的一些元素,更安全的方法是使用take():rdd.take(100).foreach(println)。

Working with Key-Value Pairs(使用鍵值對)

·雖然大多數Spark操作都適用於包含任何類型對象的RDD,但一些特殊操作僅適用於鍵值對的RDD。
·最常見的是分布式“隨機”操作,例如通過密鑰對元素進行分組或聚合。
·在Python中,這些操作適用於包含內置Python元組的RDD,如(1,2)。
·只需創建這樣的元組,然后調用您想要的操作。
·例如,以下代碼對鍵值對使用reduceByKey操作來計算文件中每行文本出現的次數:
lines = sc.textFile("data.txt") pairs = lines.map(lambda s: (s, 1)) counts = pairs.reduceByKey(lambda a, b: a + b)

例如,我們也可以使用counts.sortByKey()來按字母順序對這些對進行排序,最后使用counts.collect()將它們作為對象列表返回到驅動程序。

Transformations(轉換)

·下表列出了Spark支持的一些常見轉換。
·有關詳細信息,請參閱RDD API文檔(Scala,Java,Python,R)並配對RDD函數doc(Scala,Java)。
·轉型意義
·map(func)返回通過函數func傳遞源的每個元素形成的新分布式數據集。
·filter(func)返回通過選擇func返回true的源元素形成的新數據集。
·flatMap(func)與map類似,但每個輸入項可以映射到0個或更多輸出項(因此func應返回Seq而不是單個項)。
·mapPartitions(func)與map類似,但在RDD的每個分區(塊)上單獨運行,因此當在類型T的RDD上運行時,func必須是Iterator => Iterator  類型。
·mapPartitionsWithIndex(func)與mapPartitions類似,但也為func提供了一個表示分區索引的整數值,因此當在RDD類型上運行時,func必須是類型(Int,Iterator )=> Iterator
·T.
·sample(withReplacement,fraction,seed)使用給定的隨機數生成器種子,使用或不使用替換對數據的一小部分進行采樣。
·union(otherDataset)返回一個新數據集,其中包含源數據集和參數中元素的並集。
·intersection(otherDataset)返回包含源數據集和參數中元素交集的新RDD。
·distinct([numPartitions]))返回包含源數據集的不同元素的新數據集。
·groupByKey([numPartitions])在(K,V)對的數據集上調用時,返回(K,Iterable )對的數據集。
·注意:如果要對每個鍵執行聚合(例如總和或平均值)進行分組,則使用reduceByKey或aggregateByKey將產生更好的性能。
·注意:默認情況下,輸出中的並行級別取決於父RDD的分區數。
·您可以傳遞可選的numPartitions參數來設置不同數量的任務。
·reduceByKey(func,[numPartitions])當在(K,V)對的數據集上調用時,返回(K,V)對的數據集,其中使用給定的reduce函數func聚合每個鍵的值,該函數必須是
·type(V,V)=> V.與groupByKey類似,reduce任務的數量可通過可選的第二個參數進行配置。
·aggregateByKey(zeroValue)(seqOp,combOp,[numPartitions])在(K,V)對的數據集上調用時,返回(K,U)對的數據集,其中使用給定的組合函數聚合每個鍵的值,
·中性的“零”值。
·允許與輸入值類型不同的聚合值類型,同時避免不必要的分配。
·與groupByKey類似,reduce任務的數量可通過可選的第二個參數進行配置。
·sortByKey([ascending],[numPartitions])在K實現Ordered的(K,V)對的數據集上調用時,返回按鍵按升序或降序排序的(K,V)對數據集,如
·布爾升序參數。
·join(otherDataset,[numPartitions])當調用類型為(K,V)和(K,W)的數據集時,返回(K,(V,W))對的數據集以及每個鍵的所有元素對。
·通過leftOuterJoin,rightOuterJoin和fullOuterJoin支持外連接。
·cogroup(otherDataset,[numPartitions])當調用類型為(K,V)和(K,W)的數據集時,返回(K,(Iterable ,Iterable ))元組的數據集。
·此操作也稱為groupWith。
·cartesian(otherDataset)當調用類型為T和U的數據集時,返回(T,U)對的數據集(所有元素對)。
·pipe(command,[envVars])通過shell命令管道RDD的每個分區,例如:
·一個Perl或bash腳本。
·RDD元素被寫入進程的stdin,並且輸出到其stdout的行將作為字符串的RDD返回。
·coalesce(numPartitions)將RDD中的分區數減少為numPartitions。
·過濾大型數據集后,可以更有效地運行操作。
·repartition(numPartitions)隨機重新調整RDD中的數據以創建更多或更少的分區並在它們之間進行平衡。
·這總是隨機播放網絡上的所有數據。
·repartitionAndSortWithinPartitions(partitioner)根據給定的分區程序重新分區RDD,並在每個生成的分區中按鍵對記錄進行排序。
·這比調用重新分區然后在每個分區內排序更有效,因為它可以將排序推送到shuffle機器中。

Actions(動作)

·下表列出了Spark支持的一些常見操作。
·請參閱RDD API文檔(Scala,Java,Python,R)
·並配對RDD函數doc(Scala,Java)以獲取詳細信息。
·行動意義
·reduce(func)使用函數func(它接受兩個參數並返回一個)來聚合數據集的元素。
·該函數應該是可交換的和關聯的,以便可以並行正確計算。
·collect()在驅動程序中將數據集的所有元素作為數組返回。
·在過濾器或其他返回足夠小的數據子集的操作之后,這通常很有用。
·count()返回數據集中的元素數。
·first()返回數據集的第一個元素(類似於take(1))。
·take(n)返回包含數據集的前n個元素的數組。
·takeSample(withReplacement,num,[seed])返回一個數組,其中包含數據集的num個元素的隨機樣本,有或沒有替換,可選地預先指定隨機數生成器種子。
·takeOrdered(n,[ordering])使用自然順序或自定義比較器返回RDD的前n個元素。
·saveAsTextFile(path)將數據集的元素寫為本地文件系統,HDFS或任何其他Hadoop支持的文件系統中給定目錄中的文本文件(或文本文件集)。
·Spark將在每個元素上調用toString,將其轉換為文件中的一行文本。
·saveAsSequenceFile(路徑)
·(Java和Scala)將數據集的元素作為Hadoop SequenceFile寫入本地文件系統,HDFS或任何其他Hadoop支持的文件系統中的給定路徑中。
·這可以在實現Hadoop的Writable接口的鍵值對的RDD上使用。
·在Scala中,它也可以在可隱式轉換為Writable的類型上使用(Spark包括基本類型的轉換,如Int,Double,String等)。
·saveAsObjectFile(路徑)
·(Java和Scala)使用Java序列化以簡單格式編寫數據集的元素,然后可以使用SparkContext.objectFile()加載它。
·countByKey()僅適用於類型為(K,V)的RDD。
·返回(K,Int)對的散列映射,其中包含每個鍵的計數。
·foreach(func)對數據集的每個元素運行函數func。
·這通常用於副作用,例如更新累加器或與外部存儲系統交互。
·注意:在foreach()之外修改除累加器之外的變量可能會導致未定義的行為。
·有關詳細信息,請參閱了解閉包。
·Spark RDD API還公開了某些操作的異步版本,例如foreach的foreachAsync,它會立即將一個FutureAction返回給調用者,而不是在完成操作時阻塞。
·這可用於管理或等待操作的異步執行。

Shuffle operations(隨機操作)

·Spark中的某些操作會觸發稱為shuffle的事件。
·隨機播放是Spark的重新分配數據的機制,因此它可以跨分區進行不同的分組。
·這通常涉及跨執行程序和機器復制數據,使得混洗成為復雜且昂貴的操作。

Background(背景)

·為了理解在shuffle期間發生的事情,我們可以考慮reduceByKey操作的示例。
·reduceByKey操作生成一個新的RDD,其中單個鍵的所有值都組合成一個元組 - 鍵和對與該鍵關聯的所有值執行reduce函數的結果。
·挑戰在於,並非單個密鑰的所有值都必須位於同一個分區,甚至是同一個機器上,但它們必須位於同一位置才能計算結果。
·在Spark中,數據通常不跨分區分布,以便在特定操作的必要位置。
·在計算過程中,單個任務將在單個分區上運行 - 因此,要組織單個reduceByKey reduce任務執行的所有數據,Spark需要執行全部操作。
·它必須從所有分區讀取以查找所有鍵的所有值,然后將分區中的值匯總在一起以計算每個鍵的最終結果 - 這稱為shuffle。
·盡管新洗牌數據的每個分區中的元素集將是確定性的,並且分區本身的排序也是如此,但這些元素的排序不是。
·如果在隨機播放后需要可預測的有序數據,則可以使用:
·mapPartitions使用例如.sorted對每個分區進行排序
·repartitionAndSortWithinPartitions在同時重新分區的同時有效地對分區進行排序
·sortBy來創建一個全局排序的RDD
·可以導致混洗的操作包括重新分區操作,如重新分區和合並,“ByKey操作(計數除外),如groupByKey和reduceByKey,以及聯合操作,如cogroup和join。

Performance Impact(績效影響)

·Shuffle是一項昂貴的操作,因為它涉及磁盤I / O,數據序列化和網絡I / O.
·為了組織shuffle的數據,Spark生成了一系列任務 - 映射任務以組織數據,以及一組reduce任務來聚合它。
·這個術語來自MapReduce,並不直接與Spark的地圖和減少操作相關。
·在內部,各個地圖任務的結果會保留在內存中,直到它們無法適應。
·然后,這些基於目標分區進行排序並寫入單個文件。
·在reduce方面,任務讀取相關的排序塊。
·某些shuffle操作會消耗大量的堆內存,因為它們使用內存中的數據結構來在傳輸記錄之前或之后組織記錄。
·具體來說,reduceByKey和aggregateByKey在地圖側創建這些結構,並且'ByKey操作在reduce側生成這些結構。
·當數據不適合內存時,Spark會將這些表溢出到磁盤,從而導致磁盤I / O的額外開銷和垃圾收集增加。
·Shuffle還會在磁盤上生成大量中間文件。
·從Spark 1.3開始,這些文件將被保留,直到不再使用相應的RDD並進行垃圾回收。
·這樣做是為了在重新計算譜系時不需要重新創建shuffle文件。
·如果應用程序保留對這些RDD的引用或GC不經常啟動,則垃圾收集可能僅在很長一段時間后才會發生。
·這意味着長時間運行的Spark作業可能會占用大量磁盤空間。
·配置Spark上下文時,spark.local.dir配置參數指定臨時存儲目錄。
·可以通過調整各種配置參數來調整隨機行為。
·請參閱“Spark配置指南”中的“隨機行為”部分。

RDD Persistence(RDD持久性)

·Spark中最重要的功能之一是跨操作在內存中持久化(或緩存)數據集。
·當您持久保存RDD時,每個節點都會存儲它在內存中計算的任何分區,並在該數據集(或從中派生的數據集)的其他操作中重用它們。
·這使得未來的行動更快(通常超過10倍)。
·緩存是迭代算法和快速交互式使用的關鍵工具。
·您可以使用persist()或cache()方法標記要保留的RDD。
·第一次在動作中計算它,它將保留在節點的內存中。
·Spark的緩存是容錯的 - 如果丟失了RDD的任何分區,它將使用最初創建它的轉換自動重新計算。
·此外,每個持久化RDD可以使用不同的存儲級別進行存儲,例如,允許您將數據集保留在磁盤上,將其保留在內存中,但作為序列化Java對象(以節省空間),跨節點復制它。
·通過將StorageLevel對象(Scala,Java,Python)傳遞給persist()來設置這些級別。
·cache()方法是使用默認存儲級別的簡寫,即StorageLevel.MEMORY_ONLY(在內存中存儲反序列化的對象)。
·完整的存儲級別是:
·存儲級別含義
·MEMORY_ONLY將RDD存儲為JVM中的反序列化Java對象。
·如果RDD不適合內存,則某些分區將不會被緩存,並且每次需要時都會重新計算。
·這是默認級別。
·MEMORY_AND_DISK將RDD存儲為JVM中的反序列化Java對象。
·如果RDD不適合內存,請存儲不適合磁盤的分區,並在需要時從那里讀取它們。
·MEMORY_ONLY_SER
·(Java和Scala)將RDD存儲為序列化Java對象(每個分區一個字節數組)。
·這通常比反序列化對象更節省空間,特別是在使用快速序列化器時,但讀取CPU密集程度更高。
·MEMORY_AND_DISK_SER
·(Java和Scala)與MEMORY_ONLY_SER類似,但是將不適合內存的分區溢出到磁盤,而不是每次需要時動態重新計算它們。
·DISK_ONLY僅將RDD分區存儲在磁盤上。
·MEMORY_ONLY_2,MEMORY_AND_DISK_2等。與上面的級別相同,但復制兩個群集節點上的每個分區。
·OFF_HEAP(實驗)與MEMORY_ONLY_SER類似,但將數據存儲在堆外內存中。
·這需要啟用堆外內存。
·注意:在Python中,存儲的對象將始終使用Pickle庫進行序列化,因此您是否選擇序列化級別並不重要。
·Python中的可用存儲級別包括MEMORY_ONLY,MEMORY_ONLY_2,MEMORY_AND_DISK,MEMORY_AND_DISK_2,DISK_ONLY和DISK_ONLY_2。
·即使沒有用戶調用持久性,Spark也會在隨機操作(例如reduceByKey)中自動保留一些中間數據。
·這樣做是為了避免在shuffle期間節點發生故障時重新計算整個輸入。
·我們仍然建議用戶在生成的RDD上調用persist,如果他們計划重用它。

Which Storage Level to Choose?(選擇哪種存儲級別?)

·Spark的存儲級別旨在提供內存使用和CPU效率之間的不同折衷。
·我們建議您通過以下流程選擇一個:
·如果您的RDD與默認存儲級別(MEMORY_ONLY)很舒適,請保持這種狀態。
·這是CPU效率最高的選項,允許RDD上的操作盡可能快地運行。
·如果沒有,請嘗試使用MEMORY_ONLY_SER並選擇快速序列化庫,以使對象更節省空間,但仍然可以快速訪問。
·(Java和Scala)
·除非計算數據集的函數很昂貴,否則它們不會溢出到磁盤,或者它們會過濾大量數據。
·否則,重新計算分區可能與從磁盤讀取分區一樣快。
·如果要快速故障恢復,請使用復制的存儲級別(例如,如果使用Spark來處理來自Web應用程序的請求)。
·所有存儲級別通過重新計算丟失的數據提供完全容錯,但復制的存儲級別允許您繼續在RDD上運行任務,而無需等待重新計算丟失的分區。

Removing Data(刪除數據)

·Spark會自動監視每個節點上的緩存使用情況,並以最近最少使用(LRU)的方式刪除舊數據分區。
·如果您想手動刪除RDD而不是等待它退出緩存,請使用RDD.unpersist()方法。

Shared Variables(共享變量)

·通常,當在遠程集群節點上執行傳遞給Spark操作(例如map或reduce)的函數時,它將在函數中使用的所有變量的單獨副本上工作。
·這些變量將復制到每台計算機,並且遠程計算機上的變量的更新不會傳播回驅動程序。
·支持跨任務的通用,讀寫共享變量效率低下。
·但是,Spark確實為兩種常見的使用模式提供了兩種有限類型的共享變量:廣播變量和累加器。

Broadcast Variables(廣播變量)

·廣播變量允許程序員在每台機器上保留一個只讀變量,而不是隨副本一起發送它的副本。
·例如,它們可用於以有效的方式為每個節點提供大輸入數據集的副本。
·Spark還嘗試使用有效的廣播算法來分發廣播變量,以降低通信成本。
·Spark動作通過一組階段執行,由分布式“shuffle”操作分隔。
·Spark自動廣播每個階段中任務所需的公共數據。
·以這種方式廣播的數據以序列化形式緩存並在運行每個任務之前反序列化。
·這意味着顯式創建廣播變量僅在跨多個階段的任務需要相同數據或以反序列化形式緩存數據很重要時才有用。
·通過調用SparkContext.broadcast(v)從變量v創建廣播變量。
·廣播變量是v的包裝器,可以通過調用value方法訪問其值。
·下面的代碼顯示了這個:
>>> broadcastVar = sc.broadcast([1, 2, 3]) <pyspark.broadcast.Broadcast object at 0x102789f10> >>> broadcastVar.value [1, 2, 3]
·創建廣播變量后,應該在群集上運行的任何函數中使用它而不是值v,這樣v不會多次傳送到節點。
·另外,在廣播之后不應修改對象v,以便確保所有節點獲得廣播變量的相同值(例如,如果稍后將變量發送到新節點)。

Accumulators(累加器)

·累加器是僅通過關聯和交換操作“添加”的變量,因此可以並行有效地支持。
·它們可用於實現計數器(如MapReduce)或總和。
·Spark本身支持數值類型的累加器,程序員可以添加對新類型的支持。
·作為用戶,您可以創建命名或未命名的累加器。
·如下圖所示,命名累加器(在此實例計數器中)將顯示在Web UI中,用於修改該累加器的階段。
·Spark顯示“任務”表中任務修改的每個累加器的值。
跟蹤UI中的累加器對於理解運行階段的進度非常有用(注意:Python中尚不支持)。
·通過調用SparkContext.accumulator(v)從初始值v創建累加器。
·然后,可以使用add方法或+ =運算符將在群集上運行的任務添加到其中。
·但是,他們無法讀懂它的價值。
·只有驅動程序可以使用其value方法讀取累加器的值。
·下面的代碼顯示了一個累加器用於添加數組的元素:
>>> accum = sc.accumulator(0) >>> accum Accumulator<id=0, value=0> >>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x)) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s >>> accum.value 10
·雖然此代碼使用Int類型的累加器的內置支持,但程序員也可以通過繼承AccumulatorParam來創建自己的類型。
·AccumulatorParam接口有兩種方法:零用於為數據類型提供“零值”,addInPlace用於將兩個值一起添加。
·例如,假設我們有一個表示數學向量的Vector類,我們可以寫:
class VectorAccumulatorParam(AccumulatorParam): def zero(self, initialValue): return Vector.zeros(initialValue.size) def addInPlace(self, v1, v2): v1 += v2 return v1 # Then, create an Accumulator of this type: vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())
·對於僅在操作內執行的累加器更新,Spark保證每個任務對累加器的更新僅應用一次,即重新啟動的任務不會更新該值。
·在轉換中,用戶應該知道,如果重新執行任務或作業階段,則可以多次應用每個任務的更新。
·累加器不會改變Spark的惰性評估模型。
·如果在RDD上的操作中更新它們,則只有在RDD作為操作的一部分計算時才更新它們的值。
·因此,在像map()這樣的惰性轉換中進行累積器更新時,不能保證執行累加器更新。
·以下代碼片段演示了此屬性:
accum = sc.accumulator(0) def g(x): accum.add(x) return f(x) data.map(g) # Here, accum is still 0 because no actions have caused the `map` to be computed.

Deploying to a Cluster(部署到群集)

·應用程序提交指南介紹了如何將應用程序提交到群集。
·簡而言之,一旦將應用程序打包到JAR(用於Java / Scala)或一組.py或.zip文件(用於Python),bin / spark-submit腳本允許您將其提交給任何支持的集群管理器。

Launching Spark jobs from Java / Scala(從Java / Scala啟動Spark作業)

org.apache.spark.launcher包提供了使用簡單Java API將Spark作業作為子進程啟動的類。

Unit Testing(單元測試)

·Spark對任何流行的單元測試框架進行單元測試都很友好。
·只需在測試中創建一個SparkContext,主URL設置為local,運行您的操作,然后調用SparkContext.stop()將其拆除。
·確保在finally塊或測試框架的tearDown方法中停止上下文,因為Spark不支持在同一程序中同時運行的兩個上下文。

Where to Go from Here(從這往哪兒走)

You can see some example Spark programs on the Spark website. In addition, Spark includes several samples in the examples directory (Scala,JavaPythonR). You can run Java and Scala examples by passing the class name to Spark’s bin/run-example script; for instance:

./bin/run-example SparkPi

對於Python示例,請使用spark-submit代替:

./bin/spark-submit examples/src/main/python/pi.py

Spark SQL, DataFrames and Datasets Guide

·Spark SQL是用於結構化數據處理的Spark模塊。
·與基本的Spark RDD API不同,Spark SQL提供的接口為Spark提供了有關數據結構和正在執行的計算的更多信息。
·在內部,Spark SQL使用此額外信息來執行額外的優化。
·有幾種與Spark SQL交互的方法,包括SQL和Dataset API。
·在計算結果時,使用相同的執行引擎,與您用於表達計算的API /語言無關。
·這種統一意味着開發人員可以輕松地在不同的API之間來回切換,從而提供表達給定轉換的最自然的方式。
·此頁面上的所有示例都使用Spark分發中包含的示例數據,並且可以在spark-shell,pyspark shell或sparkR shell中運行。

SQL

·Spark SQL的一個用途是執行SQL查詢。
·Spark SQL還可用於從現有Hive安裝中讀取數據。
·有關如何配置此功能的更多信息,請參閱Hive Tables部分。
·從其他編程語言中運行SQL時,結果將作為數據集/數據框返回。
·您還可以使用命令行或JDBC / ODBC與SQL接口進行交互。

Datasets and DataFrames

·數據集是分布式數據集合。
·數據集是Spark 1.6中添加的一個新接口,它提供了RDD的優勢(強類型,使用強大的lambda函數的能力)和Spark SQL優化執行引擎的優點。
·數據集可以從JVM對象構造,然后使用功能轉換(map,flatMap,filter等)進行操作。
·數據集API在Scala和Java中可用。
·Python沒有對Dataset API的支持。
·但由於Python的動態特性,數據集API的許多好處已經可用(即您可以通過名稱自然地訪問行的字段row.columnName)。
·R的情況類似。
·DataFrame是一個組織成命名列的數據集。
·它在概念上等同於關系數據庫中的表或R / Python中的數據框,但在底層具有更豐富的優化。
·DataFrame可以從多種來源構建,例如:結構化數據文件,Hive中的表,外部數據庫或現有RDD。
·DataFrame API在Scala,Java,Python和R中可用。在Scala和Java中,DataFrame由行數據集表示。
·在Scala API中,DataFrame只是Dataset [Row]的類型別名。
·而在Java API中,用戶需要使用Dataset 來表示DataFrame。
·在本文檔中,我們經常將行的Scala / Java數據集稱為DataFrame。

Getting Started(入門)

Starting Point: SparkSession(起點:SparkSession)

·Spark中所有功能的入口點是SparkSession類。
·要創建基本的SparkSession,只需使用SparkSession.builder:
from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .config("spark.some.config.option", "some-value") \ .getOrCreate() 
·在Spark repo中的“examples / src / main / python / sql / basic.py”中找到完整的示例代碼。
·Spark 2.0中的SparkSession為Hive功能提供內置支持,包括使用HiveQL編寫查詢,訪問Hive UDF以及從Hive表讀取數據的功能。
·要使用這些功能,您無需擁有現有的Hive設置。

Creating DataFrames(創建DataFrame)

·使用SparkSession,應用程序可以從現有RDD,Hive表或Spark數據源創建DataFrame。
·作為示例,以下內容基於JSON文件的內容創建DataFrame:
# spark is an existing SparkSession
df = spark.read.json("examples/src/main/resources/people.json") # Displays the content of the DataFrame to stdout df.show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+ 
Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.

Untyped Dataset Operations (aka DataFrame Operations)無類型數據集操作(又名DataFrame操作)

·DataFrames為Scala,Java,Python和R中的結構化數據操作提供特定於域的語言。
·如上所述,在Spark 2.0中,DataFrames只是Scala和Java API中Rows的數據集。
·與“類型轉換”相比,這些操作也稱為“無類型轉換”,帶有強類型Scala / Java數據集。
·這里我們包括使用數據集進行結構化數據處理的一些基本示例:
·在Python中,可以通過屬性(df.age)或索引(df ['age'])訪問DataFrame的列。
·雖然前者便於交互式數據探索,但強烈建議用戶使用后一種形式,這是未來的證明,不會破壞也是DataFrame類屬性的列名。
# spark, df are from the previous example
# Print the schema in a tree format df.printSchema() # root # |-- age: long (nullable = true) # |-- name: string (nullable = true) # Select only the "name" column df.select("name").show() # +-------+ # | name| # +-------+ # |Michael| # | Andy| # | Justin| # +-------+ # Select everybody, but increment the age by 1 df.select(df['name'], df['age'] + 1).show() # +-------+---------+ # | name|(age + 1)| # +-------+---------+ # |Michael| null| # | Andy| 31| # | Justin| 20| # +-------+---------+ # Select people older than 21 df.filter(df['age'] > 21).show() # +---+----+ # |age|name| # +---+----+ # | 30|Andy| # +---+----+ # Count people by age df.groupBy("age").count().show() # +----+-----+ # | age|count| # +----+-----+ # | 19| 1| # |null| 1| # | 30| 1| # +----+-----+ 
·在Spark repo中的“examples / src / main / python / sql / basic.py”中找到完整的示例代碼。
·有關可在DataFrame上執行的操作類型的完整列表,請參閱API文檔。
·除了簡單的列引用和表達式之外,DataFrame還具有豐富的函數庫,包括字符串操作,日期算術,常見的數學運算等。
·完整列表可在DataFrame函數參考中找到。

Running SQL Queries Programmatically(以編程方式運行SQL查詢)

SparkSession上的sql函數使應用程序能夠以編程方式運行SQL查詢並將結果作為DataFrame返回。

# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people") sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+ 
Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.

Global Temporary View(全球臨時觀點)

·Spark SQL中的臨時視圖是會話范圍的,如果創建它的會話終止,它將消失。
·如果您希望擁有一個在所有會話之間共享的臨時視圖並保持活動狀態,直到Spark應用程序終止,您可以創建一個全局臨時視圖。
·全局臨時視圖與系統保留的數據庫global_temp綁定,我們必須使用限定名稱來引用它,例如
·SELECT * FROM global_temp.view1。
# Register the DataFrame as a global temporary view
df.createGlobalTempView("people") # Global temporary view is tied to a system preserved database `global_temp` spark.sql("SELECT * FROM global_temp.people").show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+ # Global temporary view is cross-session spark.newSession().sql("SELECT * FROM global_temp.people").show() # +----+-------+ # | age| name| # +----+-------+ # |null|Michael| # | 30| Andy| # | 19| Justin| # +----+-------+ 
Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.

Creating Datasets(創建數據集)

·數據集與RDD類似,但是,它們不使用Java序列化或Kryo,而是使用專用的編碼器來序列化對象以便通過網絡進行處理或傳輸。
·雖然編碼器和標准序列化都負責將對象轉換為字節,但編碼器是動態生成的代碼,並使用一種格式,允許Spark執行許多操作,如過濾,排序和散列,而無需將字節反序列化為對象。
case class Person(name: String, age: Long) // Encoders are created for case classes val caseClassDS = Seq(Person("Andy", 32)).toDS() caseClassDS.show() // +----+---+ // |name|age| // +----+---+ // |Andy| 32| // +----+---+ // Encoders for most common types are automatically provided by importing spark.implicits._ val primitiveDS = Seq(1, 2, 3).toDS() primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4) // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name val path = "examples/src/main/resources/people.json" val peopleDS = spark.read.json(path).as[Person] peopleDS.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala" in the Spark repo.

Interoperating with RDDs(與RDD互操作)

·Spark SQL支持兩種不同的方法將現有RDD轉換為數據集。
·第一種方法使用反射來推斷包含特定類型對象的RDD的模式。
·這種基於反射的方法可以提供更簡潔的代碼,並且在您編寫Spark應用程序時已經了解模式時可以很好地工作。
·創建數據集的第二種方法是通過編程接口,允許您構建模式,然后將其應用於現有RDD。
·雖然此方法更詳細,但它允許您在直到運行時才知道列及其類型時構造數據集。

Inferring the Schema Using Reflection(使用反射推斷模式)

·Spark SQL可以將Row對象的RDD轉換為DataFrame,從而推斷出數據類型。
·通過將鍵/值對列表作為kwargs傳遞給Row類來構造行。
·此列表的鍵定義表的列名稱,並通過對整個數據集進行采樣來推斷類型,類似於對JSON文件執行的推斷
from pyspark.sql import Row sc = spark.sparkContext # Load a text file and convert each line to a Row. lines = sc.textFile("examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) people = parts.map(lambda p: Row(name=p[0], age=int(p[1]))) # Infer the schema, and register the DataFrame as a table. schemaPeople = spark.createDataFrame(people) schemaPeople.createOrReplaceTempView("people") # SQL can be run over DataFrames that have been registered as a table. teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") # The results of SQL queries are Dataframe objects. # rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`. teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect() for name in teenNames: print(name) # Name: Justin 
Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.

Programmatically Specifying the Schema(以編程方式指定架構)

·當無法提前定義kwargs字典時(例如,記錄結構以字符串形式編碼,或者文本數據集將被解析,字段將以不同方式為不同用戶進行投影),可以使用編程方式創建DataFrame
·三個步驟。
·從原始RDD創建元組或列表的RDD;
·創建由StructType表示的模式,該模式與步驟1中創建的RDD中的元組或列表的結構相匹配。
·通過SparkSession提供的createDataFrame方法將模式應用於RDD。

例如:

# Import data types
from pyspark.sql.types import * sc = spark.sparkContext # Load a text file and convert each line to a Row. lines = sc.textFile("examples/src/main/resources/people.txt") parts = lines.map(lambda l: l.split(",")) # Each line is converted to a tuple. people = parts.map(lambda p: (p[0], p[1].strip())) # The schema is encoded in a string. schemaString = "name age" fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()] schema = StructType(fields) # Apply the schema to the RDD. schemaPeople = spark.createDataFrame(people, schema) # Creates a temporary view using the DataFrame schemaPeople.createOrReplaceTempView("people") # SQL can be run over DataFrames that have been registered as a table. results = spark.sql("SELECT name FROM people") results.show() # +-------+ # | name| # +-------+ # |Michael| # | Andy| # | Justin| # +-------+ 
Find full example code at "examples/src/main/python/sql/basic.py" in the Spark repo.

Aggregations(聚合)

·內置的DataFrames函數提供常見的聚合,如count(),countDistinct(),avg(),max(),min()等。雖然這些函數是為DataFrames設計的,但Spark SQL也有類型安全的版本
·其中一些在Scala和Java中使用強類型數據集。
·此外,用戶不限於預定義的聚合函數,並且可以創建自己的聚合函數。

Untyped User-Defined Aggregate Functions(無用戶定義的聚合函數)

·用戶必須擴展UserDefinedAggregateFunction抽象類以實現自定義無類型聚合函數。
·例如,用戶定義的平均值可能如下所示:
import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.expressions.MutableAggregationBuffer import org.apache.spark.sql.expressions.UserDefinedAggregateFunction import org.apache.spark.sql.types._ object MyAverage extends UserDefinedAggregateFunction { // Data types of input arguments of this aggregate function def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil) // Data types of values in the aggregation buffer def bufferSchema: StructType = { StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil) } // The data type of the returned value def dataType: DataType = DoubleType // Whether this function always returns the same output on the identical input def deterministic: Boolean = true // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides // the opportunity to update its values. Note that arrays and maps inside the buffer are still // immutable. def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = 0L buffer(1) = 0L } // Updates the given aggregation buffer `buffer` with new input data from `input` def update(buffer: MutableAggregationBuffer, input: Row): Unit = { if (!input.isNullAt(0)) { buffer(0) = buffer.getLong(0) + input.getLong(0) buffer(1) = buffer.getLong(1) + 1 } } // Merges two aggregation buffers and stores the updated buffer values back to `buffer1` def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0) buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1) } // Calculates the final result def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1) } // Register the function to access it spark.udf.register("myAverage", MyAverage) val df = spark.read.json("examples/src/main/resources/employees.json") df.createOrReplaceTempView("employees") df.show() // +-------+------+ // | name|salary| // +-------+------+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +-------+------+ val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees") result.show() // +--------------+ // |average_salary| // +--------------+ // | 3750.0| // +--------------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedUntypedAggregation.scala" in the Spark repo.

Type-Safe User-Defined Aggregate Functions(類型安全的用戶定義聚合函數)

·強類型數據集的用戶定義聚合圍繞Aggregator抽象類。
·例如,類型安全的用戶定義平均值可能如下所示:
import org.apache.spark.sql.{Encoder, Encoders, SparkSession} import org.apache.spark.sql.expressions.Aggregator case class Employee(name: String, salary: Long) case class Average(var sum: Long, var count: Long) object MyAverage extends Aggregator[Employee, Average, Double] { // A zero value for this aggregation. Should satisfy the property that any b + zero = b def zero: Average = Average(0L, 0L) // Combine two values to produce a new value. For performance, the function may modify `buffer` // and return it instead of constructing a new object def reduce(buffer: Average, employee: Employee): Average = { buffer.sum += employee.salary buffer.count += 1 buffer } // Merge two intermediate values def merge(b1: Average, b2: Average): Average = { b1.sum += b2.sum b1.count += b2.count b1 } // Transform the output of the reduction def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count // Specifies the Encoder for the intermediate value type def bufferEncoder: Encoder[Average] = Encoders.product // Specifies the Encoder for the final output value type def outputEncoder: Encoder[Double] = Encoders.scalaDouble } val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee] ds.show() // +-------+------+ // | name|salary| // +-------+------+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +-------+------+ // Convert the function to a `TypedColumn` and give it a name val averageSalary = MyAverage.toColumn.name("average_salary") val result = ds.select(averageSalary) result.show() // +--------------+ // |average_salary| // +--------------+ // | 3750.0| // +--------------+ 
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/UserDefinedTypedAggregation.scala" in the Spark repo.

Data Sources(數據源)

·Spark SQL支持通過DataFrame接口對各種數據源進行操作。
·DataFrame可以使用關系轉換進行操作,也可以用於創建臨時視圖。
·將DataFrame注冊為臨時視圖允許您對其數據運行SQL查詢。
·本節介紹使用Spark數據源加載和保存數據的一般方法,然后介紹可用於內置數據源的特定選項。

Generic Load/Save Functions(通用加載/保存功能)

在最簡單的形式中,默認數據源(parquet除非另外由spark.sql.sources.default配置)將用於所有操作。

df = spark.read.load("examples/src/main/resources/users.parquet") df.select("name", "favorite_color").write.save("namesAndFavColors.parquet") 
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.

Manually Specifying Options(手動指定選項)

·您還可以手動指定將要使用的數據源以及要傳遞給數據源的任何其他選項。
·數據源由其完全限定名稱(即org.apache.spark.sql.parquet)指定,但對於內置源,您還可以使用其短名稱(json,parquet,jdbc,orc,libsvm,csv,text
·)。
·從任何數據源類型加載的DataFrame都可以使用此語法轉換為其他類型。

要加載JSON文件,您可以使用:

df = spark.read.load("examples/src/main/resources/people.json", format="json") df.select("name", "age").write.save("namesAndAges.parquet", format="parquet") 
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.

要加載CSV文件,您可以使用:

df = spark.read.load("examples/src/main/resources/people.csv", format="csv", sep=":", inferSchema="true", header="true") 
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.
·在寫操作期間也使用額外選項。
·例如,您可以控制ORC數據源的bloom過濾器和字典編碼。
·以下ORC示例將在favorite_color上創建bloom過濾器,並對name和favorite_color使用字典編碼。
·對於Parquet,也存在parquet.enable.dictionary。
·要查找有關額外ORC / Parquet選項的更多詳細信息,請訪問官方Apache ORC / Parquet網站。
df = spark.read.orc("examples/src/main/resources/users.orc") (df.write.format("orc") .option("orc.bloom.filter.columns", "favorite_color") .option("orc.dictionary.key.threshold", "1.0") .save("users_with_options.orc")) 
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.

Run SQL on files directly(直接在文件上運行SQL)

可以直接使用SQL查詢該文件,而不是使用讀取API將文件加載到DataFrame並進行查詢

df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") 
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.

Save Modes(保存模式)

·保存操作可以選擇使用SaveMode,它指定如何處理現有數據(如果存在)。
·重要的是要意識到這些保存模式不使用任何鎖定並且不是原子的。
·此外,執行覆蓋時,將在寫出新數據之前刪除數據。
Scala/Java Any Language Meaning
SaveMode.ErrorIfExists(default) "error" or "errorifexists"(default) 將DataFrame保存到數據源時,如果數據已存在,則會引發異常。
SaveMode.Append "append" 將DataFrame保存到數據源時,如果數據/表已存在,則DataFrame的內容應附加到現有數據。
SaveMode.Overwrite "overwrite" 覆蓋模式意味着在將DataFrame保存到數據源時,如果數據/表已經存在,則預期現有數據將被DataFrame的內容覆蓋。
SaveMode.Ignore "ignore" Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected not to save the contents of the DataFrame and not to change the existing data. This is similar to a CREATE TABLE IF NOT EXISTS in SQL.

Saving to Persistent Tables(保存到持久表)

·也可以使用saveAsTable命令將DataFrames作為持久表保存到Hive Metastore中。
·請注意,使用此功能不需要現有的Hive部署。
·Spark將為您創建默認的本地Hive Metastore(使用Derby)。
·與createOrReplaceTempView命令不同,saveAsTable將實現DataFrame的內容並創建指向Hive Metastore中數據的指針。
·只要您保持與同一Metastore的連接,即使您的Spark程序重新啟動后,持久表仍然存在。
·可以通過使用表的名稱調用SparkSession上的table方法來創建持久表的DataFrame。
·對於基於文件的數據源,例如
·text,parquet,json等您可以通過路徑選項指定自定義表路徑,例如
·df.write.option(“path”,“/ some / path”).saveAsTable(“t”)。
·刪除表時,將不會刪除自定義表路徑,並且表數據仍然存在。
·如果未指定自定義表路徑,則Spark會將數據寫入倉庫目錄下的默認表路徑。
·刪除表時,也將刪除默認表路徑。
·從Spark 2.1開始,持久數據源表將每個分區元數據存儲在Hive Metastore中。
·這帶來了幾個好處:
·由於Metastore只能返回查詢所需的分區,因此不再需要在表的第一個查詢中發現所有分區。
·現在,對於使用Datasource API創建的表,可以使用ALTER TABLE PARTITION ... SET LOCATION等Hive DDL。
·請注意,在創建外部數據源表(具有路徑選項的表)時,默認情況下不會收集分區信息。
·要同步Metastore中的分區信息,可以調用MSCK REPAIR TABLE。

Bucketing, Sorting and Partitioning

·對於基於文件的數據源,還可以對輸出進行存儲和排序或分區。
·分段和排序僅適用於持久表:
df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed") 
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.

雖然分區可以在使用數據集API時與save和saveAsTable一起使用。

df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet") 
·在Spark repo中的“examples / src / main / python / sql / datasource.py”中找到完整的示例代碼。
·雖然分區可以在使用數據集API時與save和saveAsTable一起使用。
df = spark.read.parquet("examples/src/main/resources/users.parquet") (df .write .partitionBy("favorite_color") .bucketBy(42, "name") .saveAsTable("people_partitioned_bucketed")) 
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.
·partitionBy創建一個目錄結構,如分區發現部分所述。
·因此,它對具有高基數的列的適用性有限。
·相比之下,bucketBy可以在固定數量的存儲桶中分配數據,並且可以在許多唯一值無限制時使用。

Parquet Files(Parquet文件)

·Parquet是一種柱狀格式,許多其他數據處理系統都支持它。
·Spark SQL支持讀取和寫入Parquet文件,這些文件自動保留原始數據的模式。
·在編寫Parquet文件時,出於兼容性原因,所有列都會自動轉換為可為空。

Loading Data Programmatically(以編程方式加載數據)

使用上面示例中的數據:

peopleDF = spark.read.json("examples/src/main/resources/people.json") # DataFrames can be saved as Parquet files, maintaining the schema information. peopleDF.write.parquet("people.parquet") # Read in the Parquet file created above. # Parquet files are self-describing so the schema is preserved. # The result of loading a parquet file is also a DataFrame. parquetFile = spark.read.parquet("people.parquet") # Parquet files can also be used to create a temporary view and then used in SQL statements. parquetFile.createOrReplaceTempView("parquetFile") teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") teenagers.show() # +------+ # | name| # +------+ # |Justin| # +------+ 
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.

Partition Discovery(分區發現)

·表分區是Hive等系統中常用的優化方法。
·在分區表中,數據通常存儲在不同的目錄中,分區列值在每個分區目錄的路徑中編碼。
·所有內置文件源(包括Text / CSV / JSON / ORC / Parquet)都能夠自動發現和推斷分區信息。
·例如,我們可以使用以下目錄結構將所有以前使用的填充數據存儲到分區表中,並將兩個額外的列(性別和國家/地區)作為分區列:
path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...
·通過將path / to / table傳遞給SparkSession.read.parquet或SparkSession.read.load,Spark SQL將自動從路徑中提取分區信息。
·現在返回的DataFrame的架構變為:
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)
·請注意,分區列的數據類型是自動推斷的。
·目前,支持數字數據類型,日期,時間戳和字符串類型。
·有時,用戶可能不希望自動推斷分區列的數據類型。
·對於這些用例,可以通過spark.sql.sources.partitionColumnTypeInference.enabled配置自動類型推斷,默認為true。
·禁用類型推斷時,字符串類型將用於分區列。
·從Spark 1.6.0開始,分區發現默認只查找給定路徑下的分區。
·對於上面的示例,如果用戶將path / to / table / gender = male傳遞給SparkSession.read.parquet或SparkSession.read.load,則不會將性別視為分區列。
·如果用戶需要指定分區發現應該開始的基本路徑,則可以在數據源選項中設置basePath。
·例如,當path / to / table / gender = male是數據的路徑並且用戶將basePath設置為path / to / table /時,gender將是分區列。

Schema Merging(架構合並)

·與Protocol Buffer,Avro和Thrift一樣,Parquet也支持模式演變。
·用戶可以從簡單模式開始,並根據需要逐漸向模式添加更多列。
·通過這種方式,用戶可能最終得到具有不同但相互兼容的模式的多個Parquet文件。
·Parquet數據源現在能夠自動檢測這種情況並合並所有這些文件的模式。
·由於模式合並是一項相對昂貴的操作,並且在大多數情況下不是必需的,因此我們默認從1.5.0開始關閉它。
·您可以啟用它
·在讀取Parquet文件時將數據源選項mergeSchema設置為true(如下面的示例所示),或
·將全局SQL選項spark.sql.parquet.mergeSchema設置為true。
from pyspark.sql import Row # spark is from the previous example. # Create a simple DataFrame, stored into a partition directory sc = spark.sparkContext squaresDF = spark.createDataFrame(sc.parallelize(range(1, 6)) .map(lambda i: Row(single=i, double=i ** 2))) squaresDF.write.parquet("data/test_table/key=1") # Create another DataFrame in a new partition directory, # adding a new column and dropping an existing column cubesDF = spark.createDataFrame(sc.parallelize(range(6, 11)) .map(lambda i: Row(single=i, triple=i ** 3))) cubesDF.write.parquet("data/test_table/key=2") # Read the partitioned table mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table") mergedDF.printSchema() # The final schema consists of all 3 columns in the Parquet files together # with the partitioning column appeared in the partition directory paths. # root # |-- double: long (nullable = true) # |-- single: long (nullable = true) # |-- triple: long (nullable = true) # |-- key: integer (nullable = true) 
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.

Hive metastore Parquet table conversion(Hive Metastore Parquet表轉換)

·在讀取和寫入Hive Metastore Parquet表時,Spark SQL將嘗試使用自己的Parquet支持而不是Hive SerDe來獲得更好的性能。
·此行為由spark.sql.hive.convertMetastoreParquet配置控制,默認情況下處於打開狀態。

Hive/Parquet Schema Reconciliation

·從表模式處理的角度來看,Hive和Parquet之間存在兩個主要區別。
·Hive不區分大小寫,而Parquet則不區分大小寫
·Hive認為所有列都可以為空,而P​​arquet中的可空性很重要
·由於這個原因,在將Hive Metastore Parquet表轉換為Spark SQL Parquet表時,我們必須將Hive Metastore模式與Parquet模式進行協調。
·對帳規則是:
·兩個模式中具有相同名稱的字段必須具有相同的數據類型,而不管是否為空。
·協調字段應具有Parquet端的數據類型,以便遵循可為空性。
·協調的模式恰好包含Hive Metastore模式中定義的那些字段。
·僅出現在Parquet模式中的任何字段都將放入已協調的模式中。
·僅出現在Hive Metastore模式中的任何字段都將在協調模式中添加為可空字段。

Metadata Refreshing(元數據刷新)

·Spark SQL緩存Parquet元數據以獲得更好的性能。
·啟用Hive Metastore Parquet表轉換后,還會緩存這些轉換表的元數據。
·如果這些表由Hive或其他外部工具更新,則需要手動刷新它們以確保元數據一致。
# spark is an existing SparkSession spark.catalog.refreshTable("my_table")

Configuration(構造)

可以使用SparkSession上的setConf方法或使用SQL運行SET key = value命令來完成Parquet的配置。

Property Name Default Meaning
spark.sql.parquet.binaryAsString false
·其他一些Parquet生成系統,特別是Impala,Hive和舊版本的Spark SQL,在寫出Parquet模式時不區分二進制數據和字符串。
·此標志告訴Spark SQL將二進制數據解釋為字符串,以提供與這些系統的兼容性。
spark.sql.parquet.int96AsTimestamp true
·一些Parquet生產系統,特別是Impala和Hive,將時間戳存儲到INT96中。
·此標志告訴Spark SQL將INT96數據解釋為時間戳,以提供與這些系統的兼容性。
spark.sql.parquet.compression.codec snappy
·設置編寫Parquet文件時使用的壓縮編解碼器。
·如果在特定於表的選項/屬性中指定了“compression”或“parquet.compression”,則優先級為“compression”,“parquet.compression”,“spark.sql.parquet.compression.codec”。
·可接受的值包括:none,uncompressed,snappy,gzip,lzo,brotli,lz4,zstd。
·請注意,`zstd`需要在Hadoop 2.9.0之前安裝`ZStandardCodec`,`brotli`需要安裝`BrotliCodec`。
spark.sql.parquet.filterPushdown true 設置為true時啟用Parquet過濾器下推優化。
spark.sql.hive.convertMetastoreParquet true 設置為false時,Spark SQL將使用Hive SerDe作為鑲木桌而不是內置支持。
spark.sql.parquet.mergeSchema false

如果為true,則Parquet數據源合並從所有數據文件收集的模式,否則,如果沒有可用的摘要文件,則從摘要文件或隨機數據文件中選取模式。

spark.sql.parquet.writeLegacyFormat false
·如果為true,則數據將以Spark 1.4及更早版本的方式寫入。
·例如,十進制值將以Apache Parquet的固定長度字節數組格式寫入,其他系統(如Apache Hive和Apache Impala)也使用該格式。
·如果為false,將使用Parquet中的較新格式。
·例如,小數將以基於int的格式寫入。
·如果Parquet輸出旨在用於不支持此較新格式的系統,請設置為true。

JSON Files(JSON文件)

·Spark SQL可以自動推斷JSON數據集的架構並將其作為DataFrame加載。
·可以使用JSON文件上的SparkSession.read.json完成此轉換。
·請注意,作為json文件提供的文件不是典型的JSON文件。
·每行必須包含一個單獨的,自包含的有效JSON對象。
·有關更多信息,請參閱JSON Lines文本格式,也稱為換行符分隔的JSON。
·對於常規多行JSON文件,請將multiLine參數設置為True。
# spark is from the previous example.
sc = spark.sparkContext # A JSON dataset is pointed to by path. # The path can be either a single text file or a directory storing text files path = "examples/src/main/resources/people.json" peopleDF = spark.read.json(path) # The inferred schema can be visualized using the printSchema() method peopleDF.printSchema() # root # |-- age: long (nullable = true) # |-- name: string (nullable = true) # Creates a temporary view using the DataFrame peopleDF.createOrReplaceTempView("people") # SQL statements can be run by using the sql methods provided by spark teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19") teenagerNamesDF.show() # +------+ # | name| # +------+ # |Justin| # +------+ # Alternatively, a DataFrame can be created for a JSON dataset represented by # an RDD[String] storing one JSON object per string jsonStrings = ['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}'] otherPeopleRDD = sc.parallelize(jsonStrings) otherPeople = spark.read.json(otherPeopleRDD) otherPeople.show() # +---------------+----+ # | address|name| # +---------------+----+ # |[Columbus,Ohio]| Yin| # +---------------+----+ 
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.

Hive Tables(Hive表)

·Spark SQL還支持讀取和寫入存儲在Apache Hive中的數據。
·但是,由於Hive具有大量依賴項,因此這些依賴項不包含在默認的Spark分發中。
·如果可以在類路徑上找到Hive依賴項,Spark將自動加載它們。
·請注意,這些Hive依賴項也必須存在於所有工作節點上,因為它們需要訪問Hive序列化和反序列化庫(SerDes)才能訪問存儲在Hive中的數據。
·通過在conf /中放置hive-site.xml,core-site.xml(用於安全性配置)和hdfs-site.xml(用於HDFS配置)文件來完成Hive的配置。
·使用Hive時,必須使用Hive支持實例化SparkSession,包括連接到持久性Hive Metastore,支持Hive serdes和Hive用戶定義函數。
·沒有現有Hive部署的用戶仍可以啟用Hive支持。
·當未由hive-site.xml配置時,上下文會自動在當前目錄中創建metastore_db,並創建一個由spark.sql.warehouse.dir配置的目錄,該目錄默認為Spark應用程序當前目錄中的目錄spark-warehouse
·開始了。
·請注意,自Spark 2.0.0起,不推薦使用hive-site.xml中的hive.metastore.warehouse.dir屬性。
·而是使用spark.sql.warehouse.dir指定倉庫中數據庫的默認位置。
·您可能需要向啟動Spark應用程序的用戶授予寫入權限。
from os.path import expanduser, join, abspath from pyspark.sql import SparkSession from pyspark.sql import Row # warehouse_location points to the default location for managed databases and tables warehouse_location = abspath('spark-warehouse') spark = SparkSession \ .builder \ .appName("Python Spark SQL Hive integration example") \ .config("spark.sql.warehouse.dir", warehouse_location) \ .enableHiveSupport() \ .getOrCreate() # spark is an existing SparkSession spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive") spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") # Queries are expressed in HiveQL spark.sql("SELECT * FROM src").show() # +---+-------+ # |key| value| # +---+-------+ # |238|val_238| # | 86| val_86| # |311|val_311| # ... # Aggregation queries are also supported. spark.sql("SELECT COUNT(*) FROM src").show() # +--------+ # |count(1)| # +--------+ # | 500 | # +--------+ # The results of SQL queries are themselves DataFrames and support all normal functions. sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key") # The items in DataFrames are of type Row, which allows you to access each column by ordinal. stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value)) for record in stringsDS.collect(): print(record) # Key: 0, Value: val_0 # Key: 0, Value: val_0 # Key: 0, Value: val_0 # ... # You can also use DataFrames to create temporary views within a SparkSession. Record = Row("key", "value") recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)]) recordsDF.createOrReplaceTempView("records") # Queries can then join DataFrame data with data stored in Hive. spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show() # +---+------+---+------+ # |key| value|key| value| # +---+------+---+------+ # | 2| val_2| 2| val_2| # | 4| val_4| 4| val_4| # | 5| val_5| 5| val_5| # ... 
Find full example code at "examples/src/main/python/sql/hive.py" in the Spark repo.

Specifying storage format for Hive tables(指定Hive表的存儲格式)

·創建Hive表時,需要定義此表應如何從/向文件系統讀取/寫入數據,即“輸入格式”和“輸出格式”。
·您還需要定義此表如何將數據反序列化為行,或將行序列化為數據,即“serde”。
·以下選項可用於指定存儲格式(“serde”,“輸入格式”,“輸出格式”),例如,
·CREATE TABLE src(id int)使用配置單元選項(fileFormat'planra')。
·默認情況下,我們將表文件作為純文本讀取。
·請注意,創建表時尚不支持Hive存儲處理程序,您可以使用Hive端的存儲處理程序創建表,並使用Spark SQL讀取它
Property Name Meaning
fileFormat
·fileFormat是一種存儲格式規范包,包括“serde”,“input format”和“output format”。
·目前我們支持6種fileFormats:'sequencefile','rcfile','orc','parquet','textfile'和'avro'。
inputFormat, outputFormat
·這兩個選項將相應的`InputFormat`和`OutputFormat`類的名稱指定為字符串文字,例如
·`org.apache.hadoop.hive.ql.io.orc.OrcInputFormat`。
·這兩個選項必須出現在pair中,如果已經指定了`fileFormat`選項,則無法指定它們。
serde
·此選項指定serde類的名稱。
·當指定`fileFormat`選項時,如果給定的`fileFormat`已經包含serde的信息,則不要指定此選項。
·目前“sequencefile”,“textfile”和“rcfile”不包含serde信息,您可以將此選項與這3個fileFormats一起使用。
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim
·這些選項只能與“textfile”fileFormat一起使用。
·它們定義了如何將分隔文件讀入行。

使用OPTIONS定義的所有其他屬性將被視為Hive serde屬性。

Interacting with Different Versions of Hive Metastore(與不同版本的Hive Metastore交互)

·Spark SQL的Hive支持最重要的部分之一是與Hive Metastore的交互,這使得Spark SQL能夠訪問Hive表的元數據。
·從Spark 1.4.0開始,可以使用單個二進制構建的Spark SQL來查詢不同版本的Hive Metastores,使用下面描述的配置。
·請注意,獨立於用於與Metastore通信的Hive版本,內部Spark SQL將針對Hive 1.2.1進行編譯,並使用這些類進行內部執行(serdes,UDF,UDAF等)。
·以下選項可用於配置用於檢索元數據的Hive版本:
Property Name Default Meaning
spark.sql.hive.metastore.version 1.2.1 Version of the Hive metastore. Available options are 0.12.0 through 2.3.3.
spark.sql.hive.metastore.jars builtin Location of the jars that should be used to instantiate the HiveMetastoreClient. This property can be one of three options:
    1. builtin
    2. Use Hive 1.2.1, which is bundled with the Spark assembly when 
-Phive
       is enabled. When this option is chosen, 
spark.sql.hive.metastore.version
      must be either 
1.2.1
     or not defined.
  1. maven
  2. Use Hive jars of specified version downloaded from Maven repositories. This configuration is not generally recommended for production deployments.
  3. A classpath in the standard format for the JVM. This classpath must include all of Hive and its dependencies, including the correct version of Hadoop. These jars only need to be present on the driver, but if you are running in yarn cluster mode then you must ensure they are packaged with your application.
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,
org.postgresql,
com.microsoft.sqlserver,
oracle.jdbc

A comma-separated list of class prefixes that should be loaded using the classloader that is shared between Spark SQL and a specific version of Hive. An example of classes that should be shared is JDBC drivers that are needed to talk to the metastore. Other classes that need to be shared are those that interact with classes that are already shared. For example, custom appenders that are used by log4j.

spark.sql.hive.metastore.barrierPrefixes (empty)

A comma separated list of class prefixes that should explicitly be reloaded for each version of Hive that Spark SQL is communicating with. For example, Hive UDFs that are declared in a prefix that typically would be shared (i.e. org.apache.spark.*).

JDBC To Other Databases(JDBC到其他數據庫)

·Spark SQL還包括一個可以使用JDBC從其他數據庫讀取數據的數據源。
·與使用JdbcRDD相比,此功能應該更受歡迎。
·這是因為結果作為DataFrame返回,可以在Spark SQL中輕松處理,也可以與其他數據源連接。
·JDBC數據源也更易於使用Java或Python,因為它不需要用戶提供ClassTag。
·(請注意,這與Spark SQL JDBC服務器不同,后者允許其他應用程序使用Spark SQL運行查詢)。
·首先,您需要在spark類路徑中包含特定數據庫的JDBC驅動程序。
·例如,要從Spark Shell連接到postgres,您將運行以下命令:
bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar
·可以使用Data Sources API將遠程數據庫中的表加載為DataFrame或Spark SQL臨時視圖。
·用戶可以在數據源選項中指定JDBC連接屬性。
·用戶和密碼通常作為登錄數據源的連接屬性提供。
·除連接屬性外,Spark還支持以下不區分大小寫的選項:
Property Name Meaning
url
·要連接的JDBC URL。
·可以在URL中指定特定於源的連接屬性。
·例如,jdbc:postgresql:// localhost / test?user = fred&password = secret
dbtable
·應該讀取或寫入的JDBC表。
·請注意,在讀取路徑中使用它時,可以使用在SQL查詢的FROM子句中有效的任何內容。
·例如,您也可以在括號中使用子查詢,而不是完整的表。
·不允許同時指定`dbtable`和`query`選項。
query
·將用於將數據讀入Spark的查詢。指定的查詢將括起來並用作FROM子句中的子查詢。Spark還會為子查詢子句分配別名。
·例如,spark將向JDBC Source發出以下形式的查詢

SELECT <columns> FROM (<user_specified_query>) spark_gen_alias

·使用此選項時,以下是一些限制。
·不允許同時指定`dbtable`和`query`選項。
·不允許同時指定`query`和`partitionColumn`選項。
·當需要指定`partitionColumn`選項時,可以使用`dbtable`選項指定子查詢,並且可以使用作為`dbtable`的一部分提供的子查詢別名來限定分區列。

范例:
spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("query", "select c1, c2 from t1")
.load()

driver 用於連接到此URL的JDBC驅動程序的類名
partitionColumn, lowerBound, upperBound
·如果指定了任何選項,則必須全部指定這些選項。此外,必須指定numPartitions。它們描述了在從多個工作者並行讀取時如何對表進行分區。partitionColumn必須是相關表中的數字,日期或時間戳列。
·請注意,lowerBound和upperBound僅用於決定分區步幅,而不是用於過濾表中的行。因此,表中的所有行都將被分區並返回。此選項僅適用於閱讀。
numPartitions The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing.
queryTimeout The number of seconds the driver will wait for a Statement object to execute to the given number of seconds. Zero means there is no limit. In the write path, this option depends on how JDBC drivers implement the API setQueryTimeout, e.g., the h2 JDBC driver checks the timeout of each query instead of an entire JDBC batch. It defaults to 0.
fetchsize The JDBC fetch size, which determines how many rows to fetch per round trip. This can help performance on JDBC drivers which default to low fetch size (eg. Oracle with 10 rows). This option applies only to reading.
batchsize The JDBC batch size, which determines how many rows to insert per round trip. This can help performance on JDBC drivers. This option applies only to writing. It defaults to 1000.
isolationLevel The transaction isolation level, which applies to current connection. It can be one of NONEREAD_COMMITTEDREAD_UNCOMMITTEDREPEATABLE_READ, or SERIALIZABLE, corresponding to standard transaction isolation levels defined by JDBC's Connection object, with default of READ_UNCOMMITTED. This option applies only to writing. Please refer the documentation in java.sql.Connection.
sessionInitStatement After each database session is opened to the remote DB and before starting to read data, this option executes a custom SQL statement (or a PL/SQL block). Use this to implement session initialization code. Example: option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""")
truncate This is a JDBC writer related option. When SaveMode.Overwrite is enabled, this option causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient, and prevents the table metadata (e.g., indices) from being removed. However, it will not work in some cases, such as when the new data has a different schema. It defaults to false. This option applies only to writing.
cascadeTruncate This is a JDBC writer related option. If enabled and supported by the JDBC database (PostgreSQL and Oracle at the moment), this options allows execution of a TRUNCATE TABLE t CASCADE (in the case of PostgreSQL a TRUNCATE TABLE ONLY t CASCADE is executed to prevent inadvertently truncating descendant tables). This will affect other tables, and thus should be used with care. This option applies only to writing. It defaults to the default cascading truncate behaviour of the JDBC database in question, specified in the isCascadeTruncate in each JDBCDialect.
createTableOptions This is a JDBC writer related option. If specified, this option allows setting of database-specific table and partition options when creating a table (e.g., CREATE TABLE t (name string) ENGINE=InnoDB.). This option applies only to writing.
createTableColumnTypes The database column data types to use instead of the defaults, when creating the table. Data type information should be specified in the same format as CREATE TABLE columns syntax (e.g: "name CHAR(64), comments VARCHAR(1024)"). The specified types should be valid spark sql data types. This option applies only to writing.
customSchema
·用於從JDBC連接器讀取數據的自定義架構。例如,“id DECIMAL(38,0),名稱為STRING”。您還可以指定部分字段,其他字段使用默認類型映射。例如,“id DECIMAL(38,0)”。列名應與JDBC表的相應列名相同。用戶可以指定Spark SQL的相應數據類型,而不是使用默認值。
·此選項僅適用於閱讀。
pushDownPredicate
·用於啟用或禁用謂詞下推到JDBC數據源的選項。默認值為true,在這種情況下,Spark會盡可能地將過濾器下推到JDBC數據源。否則,如果設置為false,則不會將過濾器下推到JDBC數據源,因此所有過濾器都將由Spark處理。當Spark通過比JDBC數據源更快地執行謂詞過濾時,謂詞下推通常會被關閉。
# Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
# Loading data from a JDBC source jdbcDF = spark.read \ .format("jdbc") \ .option("url", "jdbc:postgresql:dbserver") \ .option("dbtable", "schema.tablename") \ .option("user", "username") \ .option("password", "password") \ .load() jdbcDF2 = spark.read \ .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"}) # Specifying dataframe column data types on read jdbcDF3 = spark.read \ .format("jdbc") \ .option("url", "jdbc:postgresql:dbserver") \ .option("dbtable", "schema.tablename") \ .option("user", "username") \ .option("password", "password") \ .option("customSchema", "id DECIMAL(38, 0), name STRING") \ .load() # Saving data to a JDBC source jdbcDF.write \ .format("jdbc") \ .option("url", "jdbc:postgresql:dbserver") \ .option("dbtable", "schema.tablename") \ .option("user", "username") \ .option("password", "password") \ .save() jdbcDF2.write \ .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"}) # Specifying create table column data types on write jdbcDF.write \ .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \ .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"}) 
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.

Apache Avro Data Source Guide(Apache Avro數據源指南)

自Spark 2.4發布以來,Spark SQL為讀取和編寫Apache Avro數據提供了內置支持。

Deploying(配置)

·spark-avro模塊是外置的,默認情況下不包含在spark-submit或spark-shell中。
·與任何Spark應用程序一樣,spark-submit用於啟動您的應用程序。
·spark-avro_2.12及其依賴項可以直接添加到spark-submit使用--packages,例如
./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:2.4.2 ...

對於在spark-shell上進行試驗,您還可以使用--packages直接添加org.apache.spark:spark-avro_2.12及其依賴項

./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:2.4.2 ...

有關提交具有外部依賴性的應用程序的詳細信息,請參閱“應用程序提交指南。

Load and Save Functions(加載和保存功能)

·由於spark-avro模塊是外部的,因此DataFrameReader或DataFrameWriter中沒有.avro API。
·要以Avro格式加載/保存數據,您需要將數據源選項格式指定為avro(或org.apache.spark.sql.avro)。
df = spark.read.format("avro").load("examples/src/main/resources/users.avro") df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")

to_avro() and from_avro()

·Avro軟件包提供了to_avro函數,可以將列編碼為Avro格式的二進制文件,from_avro()將Avro二進制數據解碼為列。
·兩個函數都將一列轉換為另一列,輸入/輸出SQL數據類型可以是復雜類型或基本類型。
·在讀取或寫入像Kafka這樣的流媒體源時,將Avro記錄用作列非常有用。
·每個Kafka鍵值記錄都會增加一些元數據,例如Kafka的攝取時間戳,Kafka的偏移量等。
·如果包含數據的“value”字段位於Avro中,則可以使用from_avro()提取數據,豐富數據,清理數據,然后再將其下游推送到Kafka或將其寫入文件。
·to_avro()可用於將結構轉換為Avro記錄。
·在將數據寫入Kafka時,如果要將多個列重新編碼為單個列,此方法特別有用。
·這兩個函數目前僅在Scala和Java中可用。
import org.apache.spark.sql.avro._ // `from_avro` requires Avro schema in JSON string format. val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc"))) val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1") .load() // 1. Decode the Avro data into a struct; // 2. Filter by column `favorite_color`; // 3. Encode the column `name` in Avro format. val output = df .select(from_avro('value, jsonFormatSchema) as 'user) .where("user.favorite_color == \"red\"") .select(to_avro($"user.name") as 'value) val query = output .writeStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("topic", "topic2") .start()

Data Source Option(數據源選項)

可以使用DataFrameReader或DataFrameWriter上的.option方法設置Avro的數據源選項。

Property Name Default Meaning Scope
avroSchema None Optional Avro schema provided by an user in JSON format. The date type and naming of record fields should match the input Avro data or Catalyst data, otherwise the read/write action will fail. read and write
recordName topLevelRecord Top level record name in write result, which is required in Avro spec. write
recordNamespace "" Record namespace in write result. write
ignoreExtension true The option controls ignoring of files without .avro extensions in read.
If the option is enabled, all files (with and without .avro extension) are loaded.
read
compression snappy The compression option allows to specify a compression codec used in write.
Currently supported codecs are uncompressedsnappydeflatebzip2 and xz.
If the option is not set, the configuration spark.sql.avro.compression.codec config is taken into account.
write

Configuration(構造)

可以使用SparkSession上的setConf方法或使用SQL運行SET key = value命令來完成Avro的配置。

Property Name Default Meaning
spark.sql.legacy.replaceDatabricksSparkAvro.enabled true If it is set to true, the data source provider com.databricks.spark.avro is mapped to the built-in but external Avro data source module for backward compatibility.
spark.sql.avro.compression.codec snappy Compression codec used in writing of AVRO files. Supported codecs: uncompressed, deflate, snappy, bzip2 and xz. Default codec is snappy.
spark.sql.avro.deflate.level -1 Compression level for the deflate codec used in writing of AVRO files. Valid value must be in the range of from 1 to 9 inclusive or -1. The default value is -1 which corresponds to 6 level in the current implementation.

Compatibility with Databricks spark-avro(與Databricks spark-avro的兼容性)

·此Avro數據源模塊最初來自Databricks的開源存儲庫spark-avro並與之兼容。
·默認情況下,啟用SQL配置spark.sql.legacy.replaceDatabricksSparkAvro.enabled,數據源提供程序com.databricks.spark.avro將映射到此內置Avro模塊。
·對於在目錄元庫中使用Provider屬性創建的Spark表作為com.databricks.spark.avro,如果您使用此內置Avro模塊,則映射對於加載這些表至關重要。
·請注意,在Databricks的spark-avro中,為快捷函數.avro()創建了隱式類AvroDataFrameWriter和AvroDataFrameReader。
·在這個內置但外部的模塊中,兩個隱式類都被刪除了。
·請改用DataFrameWriter或DataFrameReader中的.format(“avro”),它應該干凈且足夠好。
·如果您更喜歡使用自己構建的spark-avro jar文件,則只需禁用配置spark.sql.legacy.replaceDatabricksSparkAvro.enabled,並在部署應用程序時使用選項--jars。
·有關詳細信息,請閱讀“應用程序提交指南”中的“高級依賴關系管理”部分

Supported types for Avro -> Spark SQL conversion

目前,Spark支持在Avro記錄下讀取所有原始類型和復雜類型。

Avro type Spark SQL type
boolean BooleanType
int IntegerType
long LongType
float FloatType
double DoubleType
string StringType
enum StringType
fixed BinaryType
bytes BinaryType
record StructType
array ArrayType
map MapType
union See below
·除了上面列出的類型,它還支持讀取聯合類型。
·以下三種類型被視為基本聯合類型:
·union(int,long)將映射到LongType。
·union(float,double)將映射到DoubleType。
·union(something,null),其中某些東西是任何支持的Avro類型。
·這將被映射到與某事物相同的Spark SQL類型,並將nullable設置為true。
·所有其他聯合類型都被認為是復雜的
·根據union的成員,它們將映射到StructType,其中字段名稱是member0,member1等。
·這與Avro和Parquet之間的轉換行為一致。
·它還支持讀取以下Avro邏輯類型:
Avro logical type Avro type Spark SQL type
date int DateType
timestamp-millis long TimestampType
timestamp-micros long TimestampType
decimal fixed DecimalType
decimal bytes DecimalType

目前,忽略了Avro文件中存在的文檔,別名和其他屬性。

Supported types for Spark SQL -> Avro conversion(支持的Spark SQL類型 - > Avro轉換)

·Spark支持將所有Spark SQL類型寫入Avro。
·對於大多數類型,從Spark類型到Avro類型的映射很簡單(例如,IntegerType轉換為int);
·但是,下面列出了一些特殊情況:
Spark SQL type Avro type Avro logical type
ByteType int  
ShortType int  
BinaryType bytes  
DateType int date
TimestampType long timestamp-micros
DecimalType fixed decimal
您還可以使用選項avroSchema指定整個輸出Avro架構,以便可以將Spark SQL類型轉換為其他Avro類型。
·默認情況下不應用以下轉換,並且需要用戶指定的Avro架構:
Spark SQL type Avro type Avro logical type
BinaryType fixed  
StringType enum  
TimestampType long timestamp-millis
DecimalType bytes decimal

Performance Tuning(性能調優)

對於某些工作負載,可以通過在內存中緩存數據或打開一些實驗選項來提高性能。

Caching Data In Memory(在內存中緩存數據)

·Spark SQL可以通過調用spark.catalog.cacheTable(“tableName”)或dataFrame.cache()使用內存中的列式格式來緩存表。
·然后,Spark SQL將僅掃描所需的列,並自動調整壓縮以最小化內存使用和GC壓力。
·您可以調用spark.catalog.uncacheTable(“tableName”)從內存中刪除該表。
·可以使用SparkSession上的setConf方法或使用SQL運行SET key = value命令來完成內存中緩存的配置。
Property Name Default Meaning
spark.sql.inMemoryColumnarStorage.compressed true 設置為true時,Spark SQL將根據數據統計信息自動為每列選擇壓縮編解碼器
spark.sql.inMemoryColumnarStorage.batchSize 10000
·控制柱狀緩存的批次大小。
·較大的批處理大小可以提高內存利用率和壓縮率,但在緩存數據時存在OOM風險。

Other Configuration Options(其他配置選項)

·以下選項也可用於調整查詢執行的性能。
·由於更多優化會自動執行,因此在將來的版本中可能會棄用這些選項。
Property Name Default Meaning
spark.sql.files.maxPartitionBytes 134217728 (128 MB) 讀取文件時打包到單個分區的最大字節數。
spark.sql.files.openCostInBytes 4194304 (4 MB)
·可以在同一時間掃描通過字節數測量的打開文件的估計成本。
·將多個文件放入分區時使用。
·最好過度估計,然后使用較小文件的分區將比具有較大文件的分區(首先安排的分區)更快
spark.sql.broadcastTimeout 300

廣播連接中廣播等待時間的超時(以秒為單位)

spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB)
·配置在執行連接時將廣播到所有工作節點的表的最大大小(以字節為單位)。
·通過將此值設置為-1,可以禁用廣播。
·請注意,目前僅支持運行命令ANALYZE TABLE COMPUTE STATISTICS noscan的Hive Metastore表的統計信息。
spark.sql.shuffle.partitions 200 配置在為連接或聚合洗牌數據時要使用的分區數。

Broadcast Hint for SQL Queries(SQL查詢的廣播提示)

·BROADCAST提示指導Spark在將其與另一個表或視圖連接時廣播每個指定的表。
·當Spark決定連接方法時,廣播散列連接(即BHJ)是首選,即使統計信息高於配置spark.sql.autoBroadcastJoinThreshold。
·指定連接的兩端時,Spark會廣播具有較低統計信息的那一方。
·注意Spark並不保證始終選擇BHJ,因為並非所有情況(例如全外連接)都支持BHJ。
·當選擇廣播嵌套循環連接時,我們仍然尊重提示。
from pyspark.sql.functions import broadcast broadcast(spark.table("src")).join(spark.table("records"), "key").show()

Distributed SQL Engine(分布式SQL引擎)

·Spark SQL還可以使用其JDBC / ODBC或命令行界面充當分布式查詢引擎。
·在此模式下,最終用戶或應用程序可以直接與Spark SQL交互以運行SQL查詢,而無需編寫任何代碼

Running the Thrift JDBC/ODBC server(運行Thrift JDBC / ODBC服務器)

·此處實現的Thrift JDBC / ODBC服務器對應於Hive 1.2.1中的HiveServer2。
·您可以使用Spark或Hive 1.2.1附帶的beeline腳本測試JDBC服務器。
·要啟動JDBC / ODBC服務器,請在Spark目錄中運行以下命令:
./sbin/start-thriftserver.sh
·此腳本接受所有bin / spark-submit命令行選項,以及--hiveconf選項以指定Hive屬性。
·您可以運行./sbin/start-thriftserver.sh --help以獲取所有可用選項的完整列表。
·默認情況下,服務器偵聽localhost:10000。
·您可以通過任一環境變量覆蓋此行為,即:
export HIVE_SERVER2_THRIFT_PORT=<listening-port> export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host> ./sbin/start-thriftserver.sh \ --master <master-uri> \ ...

或系統屬性:

./sbin/start-thriftserver.sh \ --hiveconf hive.server2.thrift.port=<listening-port> \ --hiveconf hive.server2.thrift.bind.host=<listening-host> \ --master <master-uri> ...

現在您可以使用beeline來測試Thrift JDBC / ODBC服務器:

./bin/beeline

使用以下方式直接連接到JDBC / ODBC服務器:

beeline> !connect jdbc:hive2://localhost:10000
·Beeline會詢問您的用戶名和密碼。
·在非安全模式下,只需在您的計算機上輸入用戶名和空白密碼即可。
·對於安全模式,請按照直線文檔中的說明進行操作。
·通過將hive-site.xml,core-site.xml和hdfs-site.xml文件放在conf /中來完成Hive的配置。
·您也可以使用Hive附帶的beeline腳本。
·Thrift JDBC服務器還支持通過HTTP傳輸發送thrift RPC消息。
·使用以下設置將HTTP模式作為系統屬性或在conf /中的hive-site.xml文件中啟用:
hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number to listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice

要進行測試,請使用beeline以http模式連接到JDBC / ODBC服務器:

beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>

Running the Spark SQL CLI(運行Spark SQL CLI)

·Spark SQL CLI是一種方便的工具,可以在本地模式下運行Hive Metastore服務,並執行從命令行輸入的查詢。
·請注意,Spark SQL CLI無法與Thrift JDBC服務器通信。
·要啟動Spark SQL CLI,請在Spark目錄中運行以下命令:
./bin/spark-sql
·通過將hive-site.xml,core-site.xml和hdfs-site.xml文件放在conf /中來完成Hive的配置。
·您可以運行./bin/spark-sql --help以獲取所有可用選項的完整列表。
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM