Spark本身用Scala語言編寫,運行於Java虛擬機(JVM)。只要在安裝了Java 6以上版本的便攜式計算機或者集群上都可以運行spark。如果您想使用Python API需要安裝Python解釋器(2.6或者更高版本),請注意Spark暫不支持Python 3。
下載Spark
首先下載Spark並解壓,我們從下載預編譯版本的Spark開始。在瀏覽器中訪問 http://spark.apache.org/down loads.html 選擇"Pre-built for Hadoop 2.4 and later"安裝包,點擊"Direct Download"下載名稱為spark-1.2.0-bin-hadoop2.4.tgz 的壓縮包。
Windows用戶安裝時可能會遇到文件夾名稱中包含空格的問題,建議Spark的安裝目錄的文件夾中不包含空格,比如C:\spark 。
您不需要安裝Hadoop即可運行Spark,但是如果您已有Hadoop集群或者HDFS則需要下載對應的Spark版本。您可在 http:// spark.apache.org/downloads.html 選擇不同的安裝包,這些安裝包的文件名會有所不同。也可以將Spark源碼重新編譯,您可在 Github 下載最新的Spark源代碼。
大多數Unix和Linux操作系統,包括Mac OS X,都包含tar命令行解壓工具。如果您的操作系統沒有安裝tar的命令行工具,請在互聯網搜索免費的解壓縮工具。比如在Windows系統中您可以使用7-Zip。
現在我們將已下載的Spark解壓縮,看看默認的Spark分布式。打開終端,切換至下載Spark的目錄下將其解壓縮。執行下面的代碼將創建一個與壓縮文件同名的新目錄。
cd ~
tar -xf spark-1.2.0-bin-hadoop2.4.tgz
cd spark-1.2.0-bin-hadoop2.4
ls
在包含tar的執行命令中,x表示解壓縮,f表示指定tar包名稱。ls 命令將列出Spark目錄下的所有文件。讓我們簡要介紹下Spark目錄中的重要文件。
README.md
包含Spark入門的簡要說明。
bin
包含與Spark交互的可執行文件(比如在本章后面介紹的Spark Shell)
core, streaming, python, …
包含Spark工程主要組件的源碼
examples
包含可在Spark單機版運行的作業,您可從中了解Spark API。
您不必對Spark工程中包含的如此多的目錄和文件所困擾,本書后續章節會涵蓋其中的大部分技術內容。現在,讓我們深入Spark的Python和Scala 交互式shell。我們將從運行Spark官方示例開始,然后編寫和運行自己的Spark作業。
本章中的Spark作業運行於單機模式,即在本地計算機運行的非分布式的模式。Spark可在不同模式不同環境中運行。除了單機模式,Spark還可運行於Mesos和YARN,以及Spark分布式下的獨立調度。我們將在第七章中詳細介紹各種部署模式。
在 HDInsight 中安裝Spark
We will use a Script Action custom script to install Spark on an HDInsight cluster. This script can install Spark 1.2.0 or Spark 1.0.2 depending on the version of the HDInsight cluster you provision.
-
If you use the script while provisioning an HDInsight 3.2 cluster, it installs Spark 1.2.0.
-
If you use the script while provisioning an HDInsight 3.1 cluster, it installs Spark 1.0.2.
You can modify this script or create your own script to install other versions of Spark.
A sample script to install Spark on an HDInsight cluster is available from a read-only Azure storage blob at https://hdiconfigactions.blob.core.windows.net/sparkconfigactionv03/spark-installer-v03.ps1. This section provides instructions on how to use the sample script while provisioning the cluster by using the Azure portal.
NOTE:
The sample script works only with HDInsight 3.1 and 3.2 clusters. For more information on HDInsight cluster versions, see HDInsight cluster versions.
-
Start provisioning a cluster by using the CUSTOM CREATE option, as described at Provisioning a cluster using custom options. Pick the cluster version depending on the following:
-
If you want to install Spark 1.2.0, provision an HDInsight 3.2 cluster.
-
If you want to install Spark 1.0.2, provision an HDInsight 3.1 cluster.
-
On the Script Actions page of the wizard, click add script action to provide details about the script action, as shown below:
roperty
Value
Name
Specify a name for the script action. For example, Install Spark.
Script URI
Specify the Uniform Resource Identifier (URI) to the script that is invoked to customize the cluster. For example, https://hdiconfigactions.blob.core.windows.net/sparkconfigactionv03/spark-installer-v03.ps1
Node Type
Specify the nodes on which the customization script is run. You can choose All nodes, Head nodes only, or Worker nodes only.
Parameters
Specify the parameters, if required by the script. The script to install Spark does not require any parameters so you can leave this blank.
You can add more than one script action to install multiple components on the cluster. After you have added the scripts, click the checkmark to start provisioning the cluster.
You can also use the script to install Spark on HDInsight by using Azure PowerShell or the HDInsight .NET SDK.
Spark的 Python 和 Scala 交互式Shell
Spark 的 交 互式shell支持可執行的數據分析。如果您使用其他的shell編程,那么您將會對Spark shell感覺很親切。比如R、Python和Scala shell,以及批處理的操作系統編程或者Windows命令提示符。
與其他的Shell編程只能操作單台計算機的磁盤和內存不同的是,Spark Shell支持跨多台計算機的分布式磁盤和內存計算,並且Spark會自動執行分布式作業處理。
因為Spark將數據加載至工作節點內存中,絕大多數分布式計算甚至處理TB級的數據也僅需幾秒鍾。這使得Spark適合處理迭代排序、隨機和未知分析。Spark的Python和Scala的shell均支持集群連接。
讓我們用一個簡單的數據分析的例子來感受一下spark shell的強大,按照Spark官方文檔的快速入門的步驟:
第一步是打開Spark交互式shell。若要打開Python版本的Spark shell,即PySpark shell,在Spark目錄中輸入如下指令:
bin/pyspark
(或者在Windows中輸入bin\pyspark)
打開Scala版本的shell,輸入:
bin/spark-shell
shell提示符應在幾秒鍾后出現。當shell啟動時,您會注意到有大量的日志消息提示。您可按下Enter鍵清除日志輸出,圖2-1顯示的是打開PySpark shell的顯示界面。
圖 2-1 PySpark shell的默認日志輸出
在shell中您可以看到打印的日志信息,您也可以控制日志的詳細程度。在conf 目錄中創建名稱為log4j.properties 的文件,Spark提供了該文件的模板log4j.properties.template 。若不需要輸出那么冗長的日志,您可以復制該模板並將其改名為log4j.properties,在模板的復制文件中找到下面的代碼:
log4j.rootCategory=INFO, console
降低日志的級別只顯示警告信息,將上面的代碼修改如下:
log4j.rootCategory=WARN, console
重新打開shell,您可以看見輸出信息減少了。
圖2-2. PySpark shell 輸出信息減少
使用IPython
IPython是頗受python使用者追捧的增強版Python shell,提供諸如tab鍵完成功能。更多信息請查看 http://ipython.org 。將 IPYTHON的環境變量設置為1即可在Spark中使用IPython。
IPYTHON=1 ./bin/pyspark
To use the IPython Notebook, which is a web-browser-based version of IPython, use:
若要使用基於瀏覽器的IPython Notebook,請使用如下指令:
IPYTHON_OPTS="notebook" ./bin/pyspark
在Windows中設置變量的方法如下:
set IPYTHON=1 bin\pyspark
在Spark中我們通過操作集群的分布式集合進行自動化並行計算,這些集合被稱為彈性分布式數據集,或者RDDs。RDDs是Spark做分布式數據和計算的基礎抽象。
在我們說更多的RDD之前,讓我們創建一個shell程序讀取本地文本文件並計算簡單的特定分析。下面的示例2-1是Python語音,示例2-2是Scala語言。
示例2-1. Python line count
>>> lines = sc.textFile("README.md") # Create an RDD called lines
>>> lines.count() # Count the number of items in this RDD
127
>>> lines.first() # First item in this RDD, i.e. first line of README.md u'# Apache Spark'
示例2-2. Scala line count
scala> val lines = sc.textFile("README.md") // Create an RDD called lines lines: spark.RDD[String] = MappedRDD[...]
scala> lines.count() // Count the number of items in this RDD res0: Long = 127
scala> lines.first() // First item in this RDD, i.e. first line of README.md res1: String = # Apache Spark
若要退出shell,按下Ctrl-D。
您會注意到一條信息: INFO SparkUI: Started SparkUI at http://[ipaddress]:4040 。您可以通過此Spark UI看見更多任務和集群的信息。
在示例2-1和2-2中,變量 lines 為RDD,它是在本地機器中讀取文本文件后被創建的。我們可以對此RDD運行各種並行操作,比如在數據集(這里指文件中文本的行數)中統計元素的數量,或者打印元素。在后面的章節中我們將深入討論RDD,在這個之前我們花點時間介紹Spark的基本概念。
Spark核心概念
現在您已經在shell中運行了第一個Spark代碼,是時候開始學習更深入的編程了。
每一個Spark應用程序都包含一個在集群上運行各種並行操作的驅動程序,驅動程序包含應用程序的主函數和定義在集群上的分布式數據集。在前面的示例中,驅動程序就是Spark shell本身,您只需輸入您想要執行的操作即可。
驅動程序通過一個鏈接到計算集群上的 SparkContext 對象訪問Spark計算集群,在shell中,SparkContext被自動創建為名稱是sc的變量,在示例2-3中我們輸入sc,則shell顯示其類型。
Example 2-3. Examining the sc variable
>>> sc
<pyspark.context.SparkContext object at 0x1025b8f90>
在創建了SparkContext對象之后,您就可創建RDD了。在示例2-1和示例2-2中,我們調用 sc.textFile() 創建一個代表文件中文本行數的RDD。然后,我們就可以在這些行上進行各種操作,例如count()
若要運行這些操作,驅動程序通常管理者多個擁有 executor的工作節點。比如,我們在集群中執行count()操作,不同的機器可能計算lines變量不同的部分。我們只在本地運行Spark shell,則它被執行在單機中,如果我們將shell連接至集群它也可並行的分析數據。示例2-3展示了Spark如何在集群上執行。
圖2-3. Components for distributed execution in Spark
Spark 的 API 很大程度上依靠在驅動程序里傳遞函數到集群上運行。比如,我們擴展上面的README示例,篩選文本中包含的特定關鍵詞"Python"的行,代碼如示例2-4(Python),示例2-5(Scala)。
示例2-4 Python filtering example
>>> lines = sc.textFile("README.md")
>>> pythonLines = lines.filter(lambda line: "Python" in line)
>>> pythonLines.first() u'## Interactive Python Shell'
Example 2-5. Scala filtering example
scala> val lines = sc.textFile("README.md") // Create an RDD called lines lines: spark.RDD[String] = MappedRDD[...]
scala> val pythonLines = lines.filter(line => line.contains("Python")) pythonLines: spark.RDD[String] = FilteredRDD[...]
scala> pythonLines.first() res0: String = ## Interactive Python Shell
Spark傳遞函數 如果您不熟悉示例2-4和2-5中的 lambda表達式 或者 => 語法,那么在此說明其實它是在Python和Scala中的定義內聯函數的簡短寫法。如果您在Spark中使用這些語言,您可定義函數然后將其名稱傳遞給Spark。比如,在Python語言中: def hasPython(line): return "Python" in line pythonLines = lines.filter(hasPython)
Spark傳遞函數也支持Java語言,但在此情況下傳遞函數被定義為類,實現調用函數的接口。比如: JavaRDD<String> pythonLines = lines.filter( new Function<String, Boolean>() { Boolean call(String line) { return line.contains("Python"); } } ); Java 8 中介紹了調用了lambda的的簡短寫法,與Python和Scala很類似。 JavaRDD<String> pythonLines = lines.filter(line -> line.contains("Python"));
We discuss passing functions further in "Passing Functions to Spark" on page 30. 我們將在30頁的"Spark傳遞函數"中深入討論傳遞函數。 |
Spark API包含許多魅力無窮的基於函數的操作可基於集群並行計算,比如篩選(filter)操作,我們在后面的章節詳細介紹。Spark自動將您的函數傳遞給執行(executor)節點。因此,您可在單獨的驅動程序中編寫代碼,它會自動的在多個節點中運行。本書第三章涵蓋了 RDD API的詳細介紹。
獨立(Standalone )應用程序
Spark快速入門教程中缺少如何在獨立(Standalone)應用程序中使用Spark,其實Spark除了可以交互式shell運行,還可以在Java、Scala和Python的獨立應用程序中依賴Spark運行。唯一與shell不同的是,獨立應用程序中需要初始化SparkContext,除此之外所有的API都是相同的。
在獨立應用程序中依賴Spark的方法因語言而異。在Java和Scala中,您可在設置Spark核心的Maven依賴。隨着本書版本的書寫,最新的spark版本為1.2.0,相應的Maven依賴設置為:
groupId = org.apache.spark artifactId = spark-core_2.10 version = 1.2.0
Maven是最受歡迎的基於Java語言的包管理工具,可以鏈接至公共的資源庫。您可以使用Maven創建自己的應用程序,也可以其他的工具比如Scala的sbt或者Gradle創建。流行的集成開發環境如Eclipse允許直接添加Maven依賴至工程中。
在Python中,您可編寫Python腳本的應用程序,然后使用bin/spark-submit提交腳本至Spark運行。在spark-submit腳本中包含供Python使用的Spark依賴,在此腳本中設置Spark的Python API的運行環境。
示例2-6 運行Python腳本
Example 2-6. Running a Python script
bin/spark-submit my_script.py
(請注意在Windows中使用反斜杠\替代正斜杠/。)
初始化SparkContext
如果您將應用程序鏈接至Spark,則需在應用程序中引入Spark包並創建SparkContext。首先創建SparkConf對象配置應用程序,然后實例化SparkContext。示例2-7至2-9以三種語言展示初始化SparkContext的方法。
Example 2-7. Initializing Spark in Python
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf = conf)
Example 2-8. Initializing Spark in Scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)
Example 2-9. Initializing Spark in Java
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
SparkConf conf = new SparkConf().setMaster("local").setAppName("My App");
JavaSparkContext sc = new JavaSparkContext(conf);
這些示例展示最簡單的初始化SparkContext的方法,其中傳遞了兩個參數:
-
集群URL 參數,代表Spark連接到集群的方式,本例中設定為local,表示Spark線程僅運行於本地機器而非連接至集群。
-
應用程序名稱參數,本例中被定義為My App,如果您連接至集群,可在集群管理的UI界面中通過應用的名稱找到您自己的應用程序。
關於應用程序執行或者提交至集群的附加參數配置,將在本書后面的章節中介紹。
在您初始化SparkContext之后,即可調用我們之前展示給您的所有方法來創建RDD(比如從文本文件讀取)並操縱他們。
最后,您可調用stop() 方法關閉Spark,或者簡單的退出該應用程序(比如System.exit(0)或者sys.exit())
以上足以讓您在筆記本電腦上運行一個單機(Standalone)的Spark應用程序。對於更高級的配置,第七章中將介紹如何將應用程序連接至集群,以及如何將應用程序打包以便代碼自動提交至工作節點。目前,我們還是參照Spark官方文檔的快速入門。
創建獨立(Standalone)應用程序
如果本章沒有字數統計的示例,那么就不是完整大數據圖書的導論章節。在單機中運行字數統計的程序很簡單,但是在分布式框架中它卻是一個常見的示例,因為他需要在眾多的工作節點中讀取和合並數據。接下來我們分別以sbt和Maven的方式創建和打包簡單的字數統計的示例。我們所有的示例本都可以一起編譯,但是為了說明這種最小依賴的精簡編譯方式,我們將其分解為多個小的程序,代碼示例在目錄learning-sparkexamples/mini-complete-example下,您可參閱示例2-10(Java)和2-11(Scala)。
Example 2-10. Word count Java application—don't worry about the details yet
// Create a Java Spark Context
SparkConf conf = new SparkConf().setAppName("wordCount");
JavaSparkContext sc = new JavaSparkContext(conf);
// Load our input data.
JavaRDD<String> input = sc.textFile(inputFile);
// Split up into words.
JavaRDD<String> words = input.flatMap(
new FlatMapFunction<String, String>() {
public Iterable<String> call(String x) {
return Arrays.asList(x.split(" "));
}});
// Transform into pairs and count.
JavaPairRDD<String, Integer> counts = words.mapToPair(
new PairFunction<String, String, Integer>(){
public Tuple2<String, Integer> call(String x){
return new Tuple2(x, 1);
}}).reduceByKey(new Function2<Integer, Integer, Integer>(){
public Integer call(Integer x, Integer y){ return x + y;}});
// Save the word count back out to a text file, causing evaluation. counts.saveAsTextFile(outputFile);
Example 2-11. Word count Scala application—don't worry about the details yet
// Create a Scala Spark Context. val conf = new SparkConf().setAppName("wordCount")
val sc = new SparkContext(conf)
// Load our input data.
val input = sc.textFile(inputFile)
// Split it up into words.
val words = input.flatMap(line => line.split(" "))
// Transform into pairs and count.
val counts = words.map(word => (word, 1)).reduceByKey{case (x, y) => x + y}
// Save the word count back out to a text file, causing evaluation. counts.saveAsTextFile(outputFile)
我們可以使用非常簡單的編譯文件比如sbt(示例2-12)示例2-12和Maven(示例2-13)創建應用程序。我們以provided標簽標記了Spark的核心依賴,以便在稍后的編程中我們可以使用該程序集,而不必導入spark-coreJAR包。
Example 2-12. sbt build file
name := "learning-spark-mini-example"
version := "0.0.1"
scalaVersion := "2.10.4"
// additional libraries
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.2.0" % "provided"
)
Example 2-13. Maven build file
<project>
<groupId>com.oreilly.learningsparkexamples.mini</groupId> <artifactId>learning-spark-mini-example</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>example</name>
<packaging>jar</packaging>
<version>0.0.1</version>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.2.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
<properties>
<java.version>1.6</java.version>
</properties>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
spark-core包已經被標記為provided,在應用程序打包時將自動引入該JAR包。更詳細的內容在第七章中介紹。
一旦有了自己的編譯定義文件,我們可以輕松的將應用程序打包並使用bin/spark-submit腳本運行。bin/spark-submit腳本包含設置Spark運行的環境變量參數。在目錄中我們可以編譯Scala(示例2-14)和Java(示例2-15)應用。
Example 2-14. Scala build and run
sbt clean package
$SPARK_HOME/bin/spark-submit \
--class com.oreilly.learningsparkexamples.mini.scala.WordCount \
./target/...(as above) \
./README.md ./wordcounts Example 2-15. Maven build and run
mvn clean && mvn compile && mvn package
$SPARK_HOME/bin/spark-submit \ --class com.oreilly.learningsparkexamples.mini.java.WordCount \ ./target/learning-spark-mini-example-0.0.1.jar \
./README.md ./wordcounts
更詳細的Spark應用程序的示例請參閱Spark官方文檔的快速入門。