如何安裝Spark
安裝和使用Spark有幾種不同方式。你可以在自己的電腦上將Spark作為一個獨立的框架安裝或者從諸如Cloudera,HortonWorks或MapR之類的供應商處獲取一個Spark虛擬機鏡像直接使用。或者你也可以使用在雲端環境(如Databricks Cloud)安裝並配置好的Spark。
在本文中,我們將把Spark作為一個獨立的框架安裝並在本地啟動它。最近Spark剛剛發布了1.2.0版本。我們將用這一版本完成示例應用的代碼展示。
如何運行Spark
當你在本地機器安裝了Spark或使用了基於雲端的Spark后,有幾種不同的方式可以連接到Spark引擎。
下表展示了不同的Spark運行模式所需的Master URL參數。
如何與Spark交互
Spark啟動並運行后,可以用Spark shell連接到Spark引擎進行交互式數據分析。Spark shell支持Scala和Python兩種語言。Java不支持交互式的Shell,因此這一功能暫未在Java語言中實現。
可以用spark-shell.cmd和pyspark.cmd命令分別運行Scala版本和Python版本的Spark Shell。
Spark網頁控制台
不論Spark運行在哪一種模式下,都可以通過訪問Spark網頁控制台查看Spark的作業結果和其他的統計數據,控制台的URL地址如下:
http://localhost:4040
Spark控制台如下圖3所示,包括Stages,Storage,Environment和Executors四個標簽頁
(點擊查看大圖)
圖3. Spark網頁控制台
共享變量
Spark提供兩種類型的共享變量可以提升集群環境中的Spark程序運行效率。分別是廣播變量和累加器。
廣播變量:廣播變量可以在每台機器上緩存只讀變量而不需要為各個任務發送該變量的拷貝。他們可以讓大的輸入數據集的集群拷貝中的節點更加高效。
下面的代碼片段展示了如何使用廣播變量。
// // Broadcast Variables // val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar.value
累加器:只有在使用相關操作時才會添加累加器,因此它可以很好地支持並行。累加器可用於實現計數(就像在MapReduce中那樣)或求和。可以用add方法將運行在集群上的任務添加到一個累加器變量中。不過這些任務無法讀取變量的值。只有驅動程序才能夠讀取累加器的值。
下面的代碼片段展示了如何使用累加器共享變量:
// // Accumulators // val accum = sc.accumulator(0, "My Accumulator") sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) accum.value
Spark應用示例
本篇文章中所涉及的示例應用是一個簡單的字數統計應用。這與學習用Hadoop進行大數據處理時的示例應用相同。我們將在一個文本文件上執行一些數據分析查詢。本示例中的文本文件和數據集都很小,不過無須修改任何代碼,示例中所用到的Spark查詢同樣可以用到大容量數據集之上。
為了讓討論盡量簡單,我們將使用Spark Scala Shell。
首先讓我們看一下如何在你自己的電腦上安裝Spark。
前提條件:
- 為了讓Spark能夠在本機正常工作,你需要安裝Java開發工具包(JDK)。這將包含在下面的第一步中。
- 同樣還需要在電腦上安裝Spark軟件。下面的第二步將介紹如何完成這項工作。
注:下面這些指令都是以Windows環境為例。如果你使用不同的操作系統環境,需要相應的修改系統變量和目錄路徑已匹配你的環境。
I. 安裝JDK
1)從Oracle網站上下載JDK。推薦使用JDK 1.7版本。
將JDK安裝到一個沒有空格的目錄下。對於Windows用戶,需要將JDK安裝到像c:\dev這樣的文件夾下,而不能安裝到“c:\Program Files”文件夾下。“c:\Program Files”文件夾的名字中包含空格,如果軟件安裝到這個文件夾下會導致一些問題。
注:不要在“c:\Program Files”文件夾中安裝JDK或(第二步中所描述的)Spark軟件。
2)完成JDK安裝后,切換至JDK 1.7目錄下的”bin“文件夾,然后鍵入如下命令,驗證JDK是否正確安裝:
java -version
如果JDK安裝正確,上述命令將顯示Java版本。
II. 安裝Spark軟件:
從Spark網站上下載最新版本的Spark。在本文發表時,最新的Spark版本是1.2。你可以根據Hadoop的版本選擇一個特定的Spark版本安裝。我下載了與Hadoop 2.4或更高版本匹配的Spark,文件名是spark-1.2.0-bin-hadoop2.4.tgz。
將安裝文件解壓到本地文件夾中(如:c:\dev)。
為了驗證Spark安裝的正確性,切換至Spark文件夾然后用如下命令啟動Spark Shell。這是Windows環境下的命令。如果使用Linux或Mac OS,請相應地編輯命令以便能夠在相應的平台上正確運行。
c: cd c:\dev\spark-1.2.0-bin-hadoop2.4 bin\spark-shell
如果Spark安裝正確,就能夠在控制台的輸出中看到如下信息。
…. 15/01/17 23:17:46 INFO HttpServer: Starting HTTP Server 15/01/17 23:17:46 INFO Utils: Successfully started service 'HTTP class server' on port 58132. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.2.0 /_/ Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71) Type in expressions to have them evaluated. Type :help for more information. …. 15/01/17 23:17:53 INFO BlockManagerMaster: Registered BlockManager 15/01/17 23:17:53 INFO SparkILoop: Created spark context.. Spark context available as sc.
可以鍵入如下命令檢查Spark Shell是否工作正常。
sc.version
(或)
sc.appName
完成上述步驟之后,可以鍵入如下命令退出Spark Shell窗口:
:quit
如果想啟動Spark Python Shell,需要先在電腦上安裝Python。你可以下載並安裝Anaconda,這是一個免費的Python發行版本,其中包括了一些比較流行的科學、數學、工程和數據分析方面的Python包。
然后可以運行如下命令啟動Spark Python Shell:
c: cd c:\dev\spark-1.2.0-bin-hadoop2.4 bin\pyspark
Spark示例應用
完成Spark安裝並啟動后,就可以用Spark API執行數據分析查詢了。
這些從文本文件中讀取並處理數據的命令都很簡單。我們將在這一系列文章的后續文章中向大家介紹更高級的Spark框架使用的用例。
首先讓我們用Spark API運行流行的Word Count示例。如果還沒有運行Spark Scala Shell,首先打開一個Scala Shell窗口。這個示例的相關命令如下所示:
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ val txtFile = "README.md" val txtData = sc.textFile(txtFile) txtData.cache()
我們可以調用cache函數將上一步生成的RDD對象保存到緩存中,在此之后Spark就不需要在每次數據查詢時都重新計算。需要注意的是,cache()是一個延遲操作。在我們調用cache時,Spark並不會馬上將數據存儲到內存中。只有當在某個RDD上調用一個行動時,才會真正執行這個操作。
現在,我們可以調用count函數,看一下在文本文件中有多少行數據。
txtData.count()
然后,我們可以執行如下命令進行字數統計。在文本文件中統計數據會顯示在每個單詞的后面。
val wcData = txtData.flatMap(l => l.split(" ")).map(word => (word, 1)).reduceByKey(_ + _) wcData.collect().foreach(println)
如果想查看更多關於如何使用Spark核心API的代碼示例,請參考網站上的Spark文檔。