http://spark.incubator.apache.org/
http://spark.incubator.apache.org/documentation.html
http://ampcamp.berkeley.edu/3/exercises/data-exploration-using-spark.html, 非常好的hand-on exercises
源碼分析
http://jerryshao.me/archive.html
http://www.cnblogs.com/jerrylead/
下載
http://spark.apache.org/downloads.html
下載需要的版本,解壓就可以
當然想自己編譯也可以
編譯
當前spark支持Maven編譯,
http://spark.apache.org/docs/latest/building-with-maven.html
可以簡單的這樣編譯,
mvn -DskipTests clean package
Spark如果需要通過Hadoop-Client來訪問HDFS, 由於不同版本的Hadoop的client協議不同, 所以編譯時需要匹配特定的Hadoop版本進行編譯,參考上面的鏈接
同樣如果需要用Yarn做資源管理,也需要在編譯的時候指明
Quick Start
http://spark.apache.org/docs/latest/quick-start.html
Spark interactive
Spark很方便的一點是,支持命令行的方式
這樣可以簡單的學習和調試,或者interactively的進行數據分析,很贊
Scala shell (./spark-shell
)
Python interpreter (./pyspark
)
用--help來查看如果使用 (./spark-shell --help
)
並且shell不但可以local使用,可以連接實際的集群,進行大規模的數據分析, ./bin/spark-shell --master spark://IP:PORT
下面看個簡單的例子,統計下README的行數,可以看到在shell中會自動創建SparkContext, sc
scala> val textFile = sc.textFile("README.md") textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3 scala> textFile.count() // Number of items in this RDD res0: Long = 126
看個稍微復雜點的,里面用到java的math包,
計算文件里面最長的行包含多少單詞
scala> import java.lang.Math import java.lang.Math scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b)) res5: Int = 15
Standalone Applications
可以使用Scala,Java,或Python來寫Spark的app
這里描述下用scala是如何做的,Java和Python參考上面的鏈接,
先寫出應用,很簡單,統計出文檔里面,包含a和b的行數各是多少
/* SimpleApp.scala */ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object SimpleApp { def main(args: Array[String]) { val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system val conf = new SparkConf().setAppName("Simple Application") val sc = new SparkContext(conf) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line => line.contains("a")).count() val numBs = logData.filter(line => line.contains("b")).count() println("Lines with a: %s, Lines with b: %s".format(numAs, numBs)) } }
用sbt編譯,創建simple.sbt,注意sbt中每行中間要空行來分割
name := "Simple Project"
version := "1.0"
scalaVersion := "2.10.4" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.1.0"
創建如下的目錄結構,
# Your directory layout should look like this $ find . . ./simple.sbt ./src ./src/main ./src/main/scala ./src/main/scala/SimpleApp.scala
編譯,
# Package a jar containing your application $ sbt package ... [info] Packaging {..}/{..}/target/scala-2.10/simple-project_2.10-1.0.jar
提交任務,並得到結果
# Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit \ --class "SimpleApp" \ --master local[4] \ target/scala-2.10/simple-project_2.10-1.0.jar ... Lines with a: 46, Lines with b: 23
maven編譯Spark應用
參考,
http://www.scala-lang.org/old/node/345
https://blogs.oracle.com/arungupta/entry/scala_and_maven_getting_started
sbt用不慣,也懶的學,連spark都用maven編譯,所以介紹如何用maven來編譯scala寫的spark應用
比較簡單Maven本身提供scala的archetype,scala-archetype-simple
mvn archetype:generate \
-DarchetypeGroupId=org.scala-tools.archetypes \
-DarchetypeArtifactId=scala-archetype-simple \
-DremoteRepositories=http://scala-tools.org/repo-releases \
-DgroupId=com.xxx \
-DartifactId=sparkapp \
-Dversion=1.0-SNAPSHOT
然后再pom里面加上依賴,
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.1.0</version>
</dependency>
默認沒有maven-assembly-plugin,加上
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
別忘了修改pom里面scala的版本,然后就mvn package,就ok了
Running the Examples
同時Spark也提供了些例子大家可以參考,也可以直接運行,如下
./bin/run-example SparkPi
得到結果,Pi is roughly 3.14302
也可以看下,SparkPi的實現,很簡單,
package org.apache.spark.examples import scala.math.random import org.apache.spark._ /** Computes an approximation to pi */ object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Pi") val spark = new SparkContext(conf) val slices = if (args.length > 0) args(0).toInt else 2 val n = 100000 * slices val count = spark.parallelize(1 to n, slices).map { i => val x = random * 2 - 1 val y = random * 2 - 1 if (x*x + y*y < 1) 1 else 0 }.reduce(_ + _) println("Pi is roughly " + 4.0 * count / n) spark.stop() } }
Programming Guide
http://spark.apache.org/docs/latest/quick-start.html
Initializing Spark
開始用Spark, 首先需要創建SparkContext
This is done through the following constructor:
new SparkContext(master, appName, [sparkHome], [jars])
1. master參數, 就是指明spark集群的位置url, 支持如下一些格式
local, Run Spark locally with one worker thread (i.e. no parallelism at all).
local[K], Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine).
spark://HOST:PORT, Connect to the given Spark standalone cluster master. The port must be whichever one your master is configured to use, which is 7077 by default.
mesos://HOST:PORT, Connect to the given Mesos cluster. The host parameter is the hostname of the Mesos master. The port must be whichever one the master is configured to use, which is 5050 by default.
If no master URL is specified, the spark shell defaults to “local”.
2. appName, 這個很好理解,給個名字
3. 如果你需要部署到分布式集群上, 那么就需要指定后面兩個參數,
sparkHome, spark在worker上的的安裝目錄,必須保持一致
jars, 所有的jar, 包含你的應用和依賴, spark會部署到所有worker上, 並自動加入到classpath
Resilient Distributed Datasets (RDDs)
首先數據如果要在Spark中被處理,首先需要導入成RDD
如何生成RDD有兩種方式,
Parallelized collections
可以將進程中的數據結構轉換為RDD,這樣挺方便,你可以把數據從任意源中先讀出來,然后轉成RDD
但前提是數據不能太大,否則效率會是問題,因為這種case是會把數據copy到各個節點
對於Scala,在Scala collection上面調用SparkContext的parallelize函數, 作用就是把collection分成多個slices, 並分發到各個分布式節點上, 這樣便於並行處理
scala> val data = Array(1, 2, 3, 4, 5) scala> val distData = sc.parallelize(data) //會將data, 分成slices, 並分發到各個節點 distData: spark.RDD[Int] = spark.ParallelCollection@10d13e3e
這里slices, 其實spark會根據集群的CPU情況自動分配, 但是你也可以自己設置
sc.parallelize(data, 10)
External Datasets
當然更為常見的是,從外部數據集中讀取數據,轉換為RDD
Spark最常用的是從文件讀取數據,local,hdfs或s3等
Spark也支持從數據庫,如Cassandra, HBase, 甚至是Mysql讀取數據(通過jdbcRDD)
先看看使用SparkContext的textFile方法來加載文件,
Text file RDDs can be created using SparkContext
’s textFile
method. This method takes an URI for the file (either a local path on the machine, or a hdfs://
, s3n://
, kfs://
, etc URI)and reads it as a collection of lines. Here is an example invocation:
scala> val distFile = sc.textFile("data.txt")
distFile: spark.RDD[String] = spark.HadoopRDD@1d4cee08
textFile
一樣可以指定slices參數, 默認Spark會為每個block file創建一個slice, 你可以選擇更多的slices, 但不能比block數少
需要注意的是,對於外部存儲,是各個worker獨自去讀自己的數據的,所以如果用分布式存儲沒有問題,但如果用local存儲就需要保證在每個節點的改目錄上都要有這個文件
所以如果不用分布式存儲,就使用共享存儲,不然比較麻煩,需要自己copy到每個節點
textFile會將文件讀出成collection of lines,所以可以for直接遍歷每行
並且還支持目錄,壓縮文件,或通配符的讀取, textFile("/my/directory")
, textFile("/my/directory/*.txt")
, and textFile("/my/directory/*.gz")
.
除了textFile,還支持其他的讀取接口,
SparkContext.wholeTextFiles,用於讀一個目錄,返回(filename, content) pairs
SequeceFiles, 使用sequenceFile[K, V]
接口, K, V表示類型, 應該是Writable的子類 SparkContext.hadoopRDD
method, which takes an arbitrary JobConf
and input format class, key class and value class. Set these the same way you would for a Hadoop job with your input source.
RDD Operations
現在已經把原始數據導入成RDD,后面就是如果繼續處理
Spark對於RDD有兩種操作,transformations和actions
transformation具有lazy特性,其實就是調用transform的時候,不會真的執行,只會記錄下這個操作而已
action,會真正的runjob去執行
下面會列出最常用的的transform和action
這個鏈接會給出所有操作的例子,不錯
http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html
Transformations
map(func)
Return a new distributed dataset formed by passing each element of the source through a function func.filter(func)
Return a new dataset formed by selecting those elements of the source on which func returns true.flatMap(func), map是1->1, flatMap是1->0或多個,所以這個函數返回的是Seq,最終會把所有的Seq合成一個
Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
Example,
sc.parallelize(List(1, 2, 3), 2).flatMap(x => List(x, x, x)).collect
res85: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3)為何需要下面兩個,當map里面有比較耗時的初始化操作,比如鏈接和斷開數據庫,肯定不想每個item都做一遍
所以用mapPartitons就可以一個partition只做一次,輸入輸出都是iterator
而mapPartitionsWithSplit只是多傳入split index而已,也許對於某些場景需要知道
mapPartitions(func)
Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator[T] => Iterator[U] when running on an RDD of type T.
mapPartitionsWithSplit(func)
Similar to mapPartitions, but also provides func with an integer value representing the index of the split, so func must be of type (Int, Iterator[T]) => Iterator[U] when running on an RDD of type T.pipe(command, [envVars]), 以shell命令處理RDD數據
Takes the RDD data of each partition and sends it via stdin to a shell-command. The resulting output of the command is captured and returned as a RDD of string values.
Example,
val a = sc.parallelize(1 to 9, 3)
a.pipe("head -n 1").collect
res2: Array[String] = Array(1, 4, 7)sample(withReplacement, fraction, seed), 隨機抽樣
Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
withReplacement,是否放回抽樣
fraction,比例,0.1表示10%
seed,偽隨機,相同的seed得到的隨機序列是一樣的,所以如果不設seed,同一段代碼執行兩遍得到的隨機序列是一樣的
example,
val a = sc.parallelize(1 to 10000, 3)
a.sample(false, 0.1, 0).count
res24: Long = 960集合操作
union(otherDataset)
Return a new dataset that contains the union of the elements in the source dataset and the argument.
intersection(otherDataset)
Return a new RDD that contains the intersection of elements in the source dataset and the argument.
distinct([numTasks]))
Return a new dataset that contains the distinct elements of the source dataset.
Example,
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
c.distinct.collect
res6: Array[String] = Array(Dog, Gnu, Cat, Rat)下面的操作都是基於key-value pairs的,而其他的操作大都是不限制數據類型
先看幾種生成key-value pairs的函數,
KeyBy(func)
用於構建key-value pairs,func是用於基於item來構建key,item作為value
Example,
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.keyBy(_.length)
b.collect
res26: Array[(Int, String)] = Array((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant)Zip
cartesian( otherDataset), 笛卡爾積
Joins two RDDs by combining the i-th of either partition with each other.
Examples,
val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
val b = sc.parallelize(1 to a.count.toInt, 2)
val c = a.zip(b)
c.sortByKey(true).collect
res74: Array[(String, Int)] = Array((ant,5), (cat,2), (dog,1), (gnu,4), (owl,3)))
When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
Example,
val x = sc.parallelize(List(1,2,3,4,5))
val y = sc.parallelize(List(6,7,8,9,10))
x.cartesian(y).collect
res0: Array[(Int, Int)] = Array((1,6), (1,7), (1,8), (1,9), (1,10), (2,6), (2,7), (2,8), (2,9), (2,10), (3,6), (3,7), (3,8), (3,9), (3,10), (4,6), (5,6), (4,7), (5,7), (4,8), (5,8), (4,9), (4,10), (5,9), (5,10))
再看看基於key-value pairs的操作,
這類操作要求是key-value pairs,基於PairRDDFunctions類
這類操作往往需要shuffle,如group或aggregate
並且都有個可選參數numTasks,默認就是parent的partition數目,當然也可以指定groupByKey([numTasks])
When called on a dataset of (K, V) pairs, returns a dataset of (K, Seq[V]) pairs.
如果group的目的是做sum或average,那用reduceByKey
orcombineByKey
會有更好的效率
Example,val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2)
val b = a.keyBy(_.length)
b.groupByKey.collect
res11: Array[(Int, Seq[String])] = Array((4,ArrayBuffer(lion)), (6,ArrayBuffer(spider)), (3,ArrayBuffer(dog, cat)), (5,ArrayBuffer(tiger, eagle)))reduceByKey(func, [numTasks])
When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function.
經典的例子,統計文本詞數,val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations.
這個function看着比較難懂,
aggreateByKey(zeroValue: U)(seqOp: (U, T)=> U, combOp: (U, U) =>U)
寫成這樣好懂點,
首先和reduceByKey的不同在於,reduceByKey輸入輸出都是(K, V)
而aggreateByKey輸出是(K, U),可以不同於輸入(K, V)
這里需要定義3個東西,類似combineByKey
zeroValue: U,初始值,比如空列表{}
seqOp: (U, T)=> U, seq操作符,描述如何將T合並入U,比如如何將item合並到列表
combOp: (U, U) =>U,comb操作符,描述如果合並兩個U,比如合並兩個列表
所以aggreateByKey可以看成更高抽象的,更靈活的reduce或group
Example,
val z = sc.parallelize(List(1,2,3,4,5,6), 2)
z.aggreateByKey(0)(math.max(_, _), _ + _)
res40: Int = 9
val z = sc.parallelize(List("a","b","c","d","e","f"),2)
z.aggreateByKey("")(_ + _, _+_)
res115: String = abcdefsortByKey([ascending], [numTasks])
When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the booleanascending
argument.join(otherDataset, [numTasks])
When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key.
Outer joins are also supported throughleftOuterJoin
andrightOuterJoin
.
Example,
val a = sc.parallelize(List("dog", "salmon","elephant"), 3)
val b = a.keyBy(_.length)
val c = sc.parallelize(List("dog","rabbit","turkey","wolf"), 3)
val d = c.keyBy(_.length)
b.join(d).collect
res0: Array[(Int, (String, String))] = Array((6,(salmon,rabbit)), (6,(salmon,turkey)), (3,(dog,dog)))cogroup(otherDataset, [numTasks])
When called on datasets of type (K, V) and (K, W), returns a dataset of (K, Iterable<V>, Iterable<W>)tuples. This operation is also calledgroupWith
.
groupByKey, 可以用於group一個rdd里面的數據,cogroup可以同時group多個rdd的數據
Example,
val a = sc.parallelize(List(1, 2, 1, 3), 1)
val b = a.map((_, "b"))
val c = a.map((_, "c"))
val d = a.map((_, "d"))
b.cogroup(c, d).collect
res2: Array[(Int, (Iterable[String], Iterable[String], Iterable[String]))] = Array((1,(CompactBuffer(b, b),CompactBuffer(c, c),CompactBuffer(d, d))), (3,(CompactBuffer(b),CompactBuffer(c),CompactBuffer(d))), (2,(CompactBuffer(b),CompactBuffer(c),CompactBuffer(d))))下面的操作用於改變partition的個數,
為何要改變partition個數?因為有些操作會大幅的增加或減少RDD中的數據
比如filter,可以會過濾掉極大部分的數據,所以此時可以用coalesce來減少partition的數目
repartition,不光可以減少,也可以增加partition的數目,而且一定會用reshuffle來隨機的balance各個partition
coalesce(numPartitions)
Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.repartition(numPartitions)
Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
Actions
reduce(func)
Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
Example,
val a = sc.parallelize(1 to 100, 3)
a.reduce(_ + _)
res41: Int = 5050collect()
Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
Example,
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
c.collect
res29: Array[String] = Array(Gnu, Cat, Rat, Dog, Gnu, Rat)count()
Return the number of elements in the dataset.
Example,
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)
c.count
res2: Long = 4first()
Return the first element of the dataset (similar to take(1)).
Example,
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)
c.first
res1: String = Gnutake(n)
Return an array with the first n elements of the dataset. Note that this is currently not executed in parallel. Instead, the driver program computes all the elements.
Example,
val b = sc.parallelize(List("dog", "cat", "ape", "salmon", "gnu"), 2)
b.take(2)
res18: Array[String] = Array(dog, cat)takeSample(withReplacement, num, seed)
Return an array with a random sample of num elements of the dataset, with or without replacement, using the given random number generator seed.
和Sample不同,
是action所以返回的是array而不是RDD
第二個參數num會精確指定抽樣數,而不是比例
返回array時,會進行隨機排序saveAsTextFile(path)
Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
Example,
val a = sc.parallelize(1 to 10000, 3)
a.saveAsTextFile("mydata_a")
這個例子會在mydata_a目錄下存3個文件,part-00000,part-00001,part-00002saveAsSequenceFile(path)
Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is only available on RDDs of key-value pairs that either implement Hadoop's Writable interface or are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).countByKey()
Only available on RDDs of type (K, V). Returns a `Map` of (K, Int) pairs with the count of each key.
Example,
val c = sc.parallelize(List((3, "Gnu"), (3, "Yak"), (5, "Mouse"), (3, "Dog")), 2)
c.countByKey
res3: scala.collection.Map[Int,Long] = Map(3 -> 3, 5 -> 1)foreach(func)
Run a function func on each element of the dataset. This is usually done for side effects such as updating an accumulator variable (see below) or interacting with external storage systems.
Example,
val c = sc.parallelize(List("cat", "dog", "tiger", "lion", "gnu", "crocodile", "ant", "whale", "dolphin", "spider"), 3)
c.foreach(x => println(x + "s are yummy"))
lions are yummy
gnus are yummy
crocodiles are yummy
ants are yummy
whales are yummy
dolphins are yummy
spiders are yummy
Passing Functions to Spark
使用spark有很多的transform和action作為元操作,很方便
但總有一些特殊的邏輯,需要用比如map引用到數據上面去,那么如果來定義這些function
1. 對於很短的邏輯,直接用Anonymous function syntax,這是最方便的 lineLengths.reduce((a, b) => a + b)
2. 當然不可能所有的邏輯都那么短,所以可以使用Static methods in a global singleton object
因為Scala里面沒有靜態類,所以global singleton object其實就是靜態類的概念 object MyFunctions {
def func1(s: String): String = { ... }
}
myRdd.map(MyFunctions.func1)
上面兩種是推薦的形式,如果你非要用普通類的成員函數,或是函數中用到類成員 class MyClass {
def func1(s: String): String = { ... }
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
} class MyClass {
val field = "Hello"
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}
那么需要了解,這樣整個類對象都需要被send到集群上去執行,比較低效
對於用到成員變量的case,可以用局部變量替換繞過 def doStuff(rdd: RDD[String]): RDD[String] = {
val field_ = this.field
rdd.map(x => field_ + x)
}
RDD Persistence
在一個stage中,各個transform就是function的調用,中間結果是不會保留的,當然出於方便或避免反復計算,也可以cache某個中間結果
這就是RDD persisitence的目的
可以選擇不同的storage level, 如果使用cache(), 就是使用默認的MEMORY_ONLY
當然也可以使用persist()接口, 可以選擇不同的level
MEMORY_ONLY
Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.
MEMORY_AND_DISK
Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.
MEMORY_ONLY_SER
Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
MEMORY_AND_DISK_SER
Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.
DISK_ONLY
Store the RDD partitions only on disk.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.
Same as the levels above, but replicate each partition on two cluster nodes.
如何選擇不同的storage level?
盡量選擇memory_only
實在memory不夠, 可以考慮使用memory_only_ser, 會節省些空間, 但這樣的問題是每次使用都需要做反序列化, 所以要用快的序列化庫
盡量不要使用disk, 因為一般重新計算也比從disk讀快, 除非計算復雜度非常高
一般不需要使用replica, 因為RDD都會通過用重新計算來快速fault recovery, 除非有實時需求, 不忍容忍重新計算的時間
並且,Spark會以least-recently-used (LRU)的方式清理舊的cache,也可以通過RDD.unpersist(),手工清空某個RDD的cache
Shared Variables
一般來說, 對於這種分布式架構是很難提供shared variables的, 但是出於方便的要求, Spark提供兩種特殊的shared variables, broadcast variables and accumulators
Broadcast Variables
通過接口廣播read-only variable到每個節點, 更靈活些(和shipping a copy of it with tasks比較)
注意這個值是不可以改變的, 否則就會導致各個節點不一致
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) //廣播數據
broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)
scala> broadcastVar.value //通過value取到廣播的值
res0: Array[Int] = Array(1, 2, 3)
Accumulators
用於來實現counter, Spark支持int和double類型的accumulator, 用戶可以實現其他類型的
注意的是在各個節點都可以通過+=來增加accumulator, 但只有在driver上可以read accumulator的值
The interpreter session below shows an accumulator being used to add up the elements of an array:
scala> val accum = sc.accumulator(0)
accum: spark.Accumulator[Int] = 0
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Int = 10