快速開始
本文將介紹如何用scala、java、python編寫一個spark單擊模式的程序。
首先你只需要在一台機器上成功建造Spark;做法:
進入Spark的根目錄,輸入命令:
$
sbt/sbt package
(由於天朝偉大的防火牆,大陸地區是無法成功的,除非你可以順利翻牆),不想爬牆的可以 下載預編譯好的Spark , spark-0.7.2-prebuilt-hadoop1.tgz.gz
(由於天朝偉大的防火牆,大陸地區是無法成功的,除非你可以順利翻牆),不想爬牆的可以 下載預編譯好的Spark , spark-0.7.2-prebuilt-hadoop1.tgz.gz
Spark shell的交互式分析
一、基礎
概念:
Spark的交互式腳本是一種學習API的簡單途徑,也是分析數據集交互的有力工具。
在Spark根目錄運行:./spark-shell
Spark抽象的分布式集群空間叫做Resilient Distributed Dataset (RDD)彈性數據集。
RDD有兩種創建方式:1、從Hadoop的文件系統輸入(例如HDFS);2、有其他已存在的RDD轉換得到新的RDD。
實踐:
1、現在我們利用Spark目錄下的README文件來創建一個新的RDD:
scala> val textFile = sc.textFile("README.md") textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3
2、RDD有兩種操作,分別是action(返回values)和transformations(返回一個新的RDD);下面開始些少量的actions:
scala> textFile.count() // Number of items in this RDD
res0: Long = 74
scala> textFile.first() // First item in this RDD
res1: String = # Spark
3、下面使用transformations中的filter返回一個文件子集的新RDD
scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"? res3: Long = 15
二、基於RDD的更多操作
1、RDD的actions和transformations可以被用於更多復雜的計算。例如,我們想找出含有字數最多的行:
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) res4: Long = 16
2、為了使程序更簡單,我們可以引用包來使用已有的函數方法來編寫程序:
scala> import java.lang.Math
import java.lang.Math
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 16
3、Spark可以很容易的執行MapReaduce流
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b) wordCounts: spark.RDD[(java.lang.String, Int)] = spark.ShuffledAggregatedRDD@71f027b8
這里我們運用了transformations中的flatMap, map, reduceByKey來計算文件中每個單詞出現的次數並存儲為(String, Int)對的RDD數據集
4、使用actions的collect方法返回計算好的數值
scala> wordCounts.collect() res6: Array[(java.lang.String, Int)] = Array((need,2), ("",43), (Extra,3), (using,1), (passed,1), (etc.,1), (its,1), (`/usr/local/lib/libmesos.so`,1), (`SCALA_HOME`,1), (option,1), (these,1), (#,1), (`PATH`,,2), (200,1), (To,3),...
三、緩存
Spark還支持將數據集緩存到內存中。這解決了處理大量迭代運算(例如,機器學習算法)時的反復磁盤IO操作的耗時。內存IO操作和磁盤IO操作的用時完全不是一個數量級的,帶來的效率提升是不言而喻的。
1、做個小示例,標記我們之前的linesWithSpark數據集並將其緩存:
scala> linesWithSpark.cache()
res7: spark.RDD[String] = spark.FilteredRDD@17e51082
scala> linesWithSpark.count()
res8: Long = 15
四、一個單機版的scala作業
/*** SimpleJob.scala ***/ import spark.SparkContext import SparkContext._ object SimpleJob { def main(args: Array[String]) { val logFile = "/var/log/syslog" // Should be some file on your system val sc = new SparkContext("local", "Simple Job", "$YOUR_SPARK_HOME", List("target/scala-2.9.3/simple-project_2.9.3-1.0.jar")) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line => line.contains("a")).count() val numBs = logData.filter(line => line.contains("b")).count() println("Lines with a: %s, Lines with b: %s".format(numAs, numBs)) } }
程序解釋:
首先要創建一個SparkContext對象,傳入四個參數,分別是:
1.使用的調度器(示例中使用了local scheduler);
2.程序名稱;
3.Spark安裝路徑;
4.包含這個程序資源的jar包名。
注意:在分布式中后兩個參數必須設置,安裝路徑來確定Spark通過哪個several nodes運行;jar名會讓Spark自動向slave nodes傳輸jar文件
這個程序的文件依靠了Spark的API,所以我們必須有一個sbt的配置文件用以說明程序和Spark的依賴關系。下面是配置文件simple.sbt:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.9.3"
libraryDependencies += "org.spark-project" %% "spark-core" % "0.7.3"
resolvers ++= Seq(
"Akka Repository" at "http://repo.akka.io/releases/",
"Spray Repository" at "http://repo.spray.cc/")
為了讓sbt正確的工作,我們必須將SimpleJob.scala和simple.sbt根據典型的目錄結構進行布局。完成布局后,我們可以創建一個包含了程序源碼的JAR包,然后使用sbt的run命令來執行示例程序
$ find .
.
./simple.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleJob.scala
$ sbt package
$ sbt run
...
Lines with a: 8422, Lines with b: 1836
這樣就完成了程序在本地運行的示例