1. 交互式Spark-Shell
根据前一节已经搭建好的Hadoop和Spark环境,直接通过脚本启动Hadoop和Spark服务。如果 http://localhost:8080 能够访问,说明Spark服务已经启动。Spark为我们提供了PySpark以及Spark-shell,可以方便的通过交互试界面调试Spark应用。接下来我们将采用Spark-Shell来调试Spark程序。在终端中输入如下命令: spark-shell --master spark://spark-B470:7077
, master后面的URL就是Spark Master的URL ,可以在 http://localhost:8080 的页面上找到。
hadoop@spark-B470:~/Develop$ spark-shell --master spark://spark-B470:7077
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/spark/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
16/10/29 23:04:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/10/29 23:04:25 WARN Utils: Your hostname, spark-B470 resolves to a loopback address: 127.0.1.1; using 192.168.1.110 instead (on interface enp4s0)
16/10/29 23:04:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
16/10/29 23:04:27 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect.
Spark context Web UI available at http://192.168.1.110:4040
Spark context available as 'sc' (master = spark://spark-B470:7077, app id = app-20161029230426-0000).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.0.0
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_102)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
看到Scala的交互式窗口后,可以在 http://localhost:8080 的web上 Running Applications 这一栏 看到刚启动的应用程序,如果你要退出Spark-Shell,按 CTRL D
组合键退出。
Spark 最主要的抽象是叫 Resilient Distributed Dataset(RDD)
的弹性分布式集合。RDDs 可以使用 Hadoop InputFormats
(例如 HDFS 文件)创建,也可以从其他的 RDDs 转换。让我们在 Spark 源代码目录从 /etc/protocols
文本文件中创建一个新的 RDD。
scala> val file = sc.textFile("file:///etc/protocols")
file: org.apache.spark.rdd.RDD[String] = file:///etc/protocols MapPartitionsRDD[5] at textFile at <console>:24
scala> file.count()
res3: Long = 64
scala> file.first()
res4: String = # Internet (IP) protocols
上面的操作中创建了一个RDD file,执行了两个简单的操作:
- count() 获取RDD的行数
- first() 获取第一行的内容
我们继续执行其他操作,比如查找有多少行含有tcp和udp字符串:
scala> file.filter(line=>line.contains("tcp")).count()
res2: Long = 1
scala> file.filter(line=>line.contains("udp")).count()
res3: Long = 2
查看一共有多少个不同单词的方法,这里用到MapReduce的思路:
scala> val wordcount = file.flatMap(line=>line.split(" ")).map(word=>(word,1)).reduceByKey((x,y)=>x+y)
wordcount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[6] at reduceByKey at <console>:26
scala> wordcount.count()
res4: Long = 243
2. Spark 核心概念
现在你已经用Spark-shell运行了一段Spark程序,是时候对Spark编程作更细致的了解。
从上层来看,每个Spark应用都由一个驱动器程序(driver program)来发起集群 上的各种并行操作。驱动器程序包包含应用的 main
函数, 并且定义了集群上的分布式数据集,还对这些分布式数据集应用了相关的操作。在前面的例子里,实际的驱动器程序就是Spark shell 本身,你只要输入想运行的操作就可以了。
驱动器程序通过一个 SparkContext
对象来访问Spark,这个对象代表对计算集群的一个连接。shell启动时已经自动创建了一个 SparkContext
对象,是一个叫作 sc
的变量。我们可以通过在shell里尝试输出 sc
来查看它的类型。
scala> sc
res1: org.apache.spark.SparkContext = org.apache.spark.SparkContext@1f172892
一旦有了 SparkContext
,你就可以通过它来创建RDD。就像前面的例子,我们调用了 sc.textFile()
来创建一个代表文件中各行文本的RDD。我们可以在这些行上进行各种并行操作,比如 count()
。
要执行这些操作,驱动器程序一般要管理多个执行器(executor)节点。比如,如果我们在集群上运行count() 操作,那么不同的节点会统计文件的不同部分的行数。由于我们刚才是在本地模式下运行的Spark shell,因此所有的工作会在单个节点上执行,但你可以将这个shell连接到集群上来进行并行的数据分析。下图展示Spark如何在一个集群上运行。
3. RDD编程
3.1 RDD基础
Spark中的RDD就是一个不可变的分布式对象集合。每个RDD都被分为多个分区,这些分区运行在集群中的不同节点上。用户可以使用两种方法创建RDD:读取一个外部数据集,或在驱动程序里分发驱动器程序中的对象集合(比如list和set)。我们在前面的例子已经使用 SparkContext.textFile()
来读取文本文件作为一个字符串RDD。创建出来后的RDD支持两种类型的操作:转化操作(transformation)
和 行动操作(action)
。
转化操作会由一个RDD生成一个新的RDD。例如,根据谓词匹配情况筛选数据就是一个常见的转化操作。在我们的文本示例中,我们可以用筛选来生成一个只存储包含单词tcp的字符串的的新的RDD。示例如下:
scala> val tcpLines = file.filter(line => line.contains("tcp"))
tcpLines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at filter at <console>:26
行动操作会对RDD计算一个结果,并把结果返回到驱动器程序中,或者把结果存储到外部存储系统(如HDFS)中。first()
就是我们之前调用的一个行动操作,它会返回RDD的一个元素,示例如下:
scala> tcpLines.first()
res3: String = tcp 6 TCP # transmission control protocol
转化操作和行动操作的区别在于Spark计算RDD的方式不同。虽然你可以在任何时候定义新的RDD,但是Spark只会惰性计算这些RDD。它们只有第一次在一个行动操作中用到时,才会真正计算。这种策略刚开始看起来可能会显得有些奇怪,不过在大数据领域是很有道理的。比如,我们以一个文本文件定义了数据, 然后把其中包含tcp的行筛选出来。如果Spark在运行 val file = sc.textFile("file:///etc/protocols")
时就把文件中的所有行都读取并存储起来,就会消耗很多的存储空间,而我们马上就要筛选掉其中的很多数据。相反,一旦Spark了解了完整的转化操作链之后,它就可以只计算求结果时真正需要的数据。事实上,在行动操作first()中,Spark只需要扫描文件直到找到第一个匹配的行为止,而不需要读取整个文件。
最后,默认情况下,Spark的RDD会在你每次对它们进行行动操作时重新计算。如果想在多个行动操作中重用同一个RDD,可以使用 RDD.persist()
让Spark把这个RDD缓存下来。在第一次对持久化的RDD计算之后,Spark会把RDD的内容保存到内存中(以分区方式存储到集群中的各机器上),这样在之后的行动操作就可以重用这些数据了。Spark在默认情况下不进行持久化可能显得有些奇怪,不过这对于大规模数据集是很有意义的:如果不会重用该RDD,我们就没必要浪费存储空间,Spark可以直接遍历一遍数据然后计算出结果。
在实际操作中,你会经常用 persist()
来把数据的一部分读取到内存中,并反复查询这部分数据。例如,我们想多次对/etc/protocols
文件包含tcp的行进行计算,就可以写出如下脚本:
scala> tcpLines.persist()
res8: tcpLines.type = MapPartitionsRDD[3] at filter at <console>:26
scala> tcpLines.count()
res9: Long = 1
scala> tcpLines.first()
res10: String = tcp 6 TCP # transmission control protocol
总的来说,每个Spark程序一般的工作流程:
- 从外部数据创建输入RDD。
- 使用诸如 filter() 这样的转化操作对RDD进行转化,以定义新的RDD。
- 告诉Spark对需要被重用的中间结果RDD执行persist() 操作。
- 使用行动操作(如count()和first()等)来触发一次并行计算,Spark会对计算进行优化后再执行。
3.2 创建RDD
Spark提供了两种创建RDD 的方式:读取外部数据集,以及在驱动程序中对一个集合进行并行化。
3.2.1 并行集合
并行集合 (Parallelized collections) 的创建是通过在一个已有的集合(Scala Seq)上调用 SparkContext 的 parallelize 方法实现的。集合中的元素被复制到一个可并行操作的分布式数据集中。例如,这里演示了如何在一个包含 1 到 10 的数组中创建并行集合:
scala> val data = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
data: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> val distData = sc.parallelize(data)
distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:26
一旦创建完成,这个分布式数据集(distData)就可以被并行操作。例如,我们可以调用 distData.reduce((a, b) => a + b)
将这个数组中的元素相加。我们以后再描述在分布式上的一些操作。
并行集合一个很重要的参数是切片数(slices),表示一个数据集切分的份数。Spark 会在集群上为每一个切片运行一个任务。你可以在集群上为每个 CPU 设置 2-4 个切片(slices)。正常情况下,Spark 会试着基于你的集群状况自动地设置切片的数目。然而,你也可以通过 parallelize
的第二个参数手动地设置(例如:sc.parallelize(data, 10)
)。
注意:除了开发原型和测试时,这种方式用的并不多,毕竟这种方式需要把你的整个数据集先放在一台机器的内存中。
3.2.2 外部数据集
Spark 可以从任何一个 Hadoop 支持的存储源创建分布式数据集,包括本地文件系统,HDFS,Cassandra,HBase,Amazon S3等。 Spark 支持文本文件(text files),SequenceFiles 和其他 Hadoop InputFormat。
文本文件 RDDs 可以使用 SparkContext.textFile()
方法创建。在这个方法里传入文件的 URI (机器上的本地路径 file:// 或 hdfs://,s3n:// 等),然后它会将文件读取成一个行集合。这里是一个调用例子:
scala> val distFile = sc.textFile("file:///home/hadoop/Develop/start.sh")
distFile: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/Develop/start.sh MapPartitionsRDD[14] at textFile at <console>:24
一旦创建完成,distFile 就能做数据集操作。例如,我们可以用下面的方式使用 map
和 reduce
操作将所有行的长度相加:distFile.map(s => s.length).reduce((a, b) => a + b)
。
注意,Spark 读文件时:
- 如果使用本地文件系统路径,文件必须能在 work 节点上用相同的路径访问到。要么复制文件到所有的 workers,要么使用网络的方式共享文件系统。
- 所有 Spark 的基于文件的方法,包括 textFile,能很好地支持文件目录,压缩过的文件和通配符。例如,你可以使用 textFile(“/my/文件目录”),textFile(“/my/文件目录/.txt”) 和 textFile(“/my/文件目录/.gz”)。
- textFile 方法也可以选择第二个可选参数来控制切片(slices)的数目。默认情况下,Spark 为每一个文件块(HDFS 默认文件块大小是 64M)创建一个切片(slice)。但是你也可以通过一个更大的值来设置一个更高的切片数目。注意,你不能设置一个小于文件块数目的切片值。
3.3 RDD操作
RDD 支持两种类型的操作:转化操作(transformations)
从已经存在的数据集中创建一个新的数据集;行动操作(actions)
在数据集上进行计算之后返回一个值到驱动器程序。例如,map 是一个转化操作,它将每一个数据集元素传递给一个函数并且返回一个新的 RDD。另一方面,reduce 是一个行动操作,它使用相同的函数来聚合 RDD 的所有元素,并且将最终的结果返回到驱动器程序(不过也有一个并行 reduceByKey 能返回一个分布式数据集)。
3.3.1 向Spark传递函数
Spark的大部分转化操作和一部分行动操作,都需要依赖用户传递函数来计算。这里推荐两种方式:
- 匿名函数 (Anonymous function syntax),可以在比较短的代码中使用。
- 全局单例对象里的静态方法。例如,你可以定义 object MyFunctions 然后传递 MyFounctions.func1,像下面这样:
object MyFunctions {
def func1(s: String): String = { ... }
}
myRdd.map(MyFunctions.func1)
注意,它可能传递的是一个类实例里的一个方法引用(而不是一个单例对象),这里必须传送包含方法的整个对象。例如:
class MyClass {
def func1(s: String): String = { ... }
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
这里,如果我们创建了一个 new MyClass
对象,并且调用它的 doStuff
,map
里面引用了这个 MyClass
实例中的 func1
方法,所以这个对象必须传送到集群上。类似写成 rdd.map(x => this.func1(x))
。
以类似的方式,访问外部对象的字段将会引用整个对象:
class MyClass {
val field = "Hello"
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}
相当于写成 rdd.map(x => this.field + x)
,引用了整个 this
对象。为了避免这个问题,最简单的方式是复制 field
到一个本地变量而不是从外部访问它:
def doStuff(rdd: RDD[String]): RDD[String] = {
val field_ = this.field
rdd.map(x => field_ + x)
}
3.3.2 使用键值对
虽然很多 Spark 操作工作在包含任意类型对象的RDD上的,但是少数几个特殊操作仅仅在键值(key-value)对RDD上可用。最常见的是分布式 “shuffle” 操作,例如根据一个 key 对一组数据进行分组和聚合。
在 Scala 中,这些操作在包含二元组(Tuple2)(在语言的内建元组中,通过简单的写 (a, b) 创建) 的 RDD 上自动地变成可用的,只要在你的程序中导入 org.apache.spark.SparkContext._
来启用 Spark 的隐式转换。在 PairRDDFunctions
的类里键值对操作是可以使用的,如果你导入隐式转换它会自动地包装成元组 RDD。
例如,下面的代码在键值对上使用 reduceByKey
操作来统计在一个文件里每一行文本内容出现的次数:
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
我们也可以使用 counts.sortByKey()
,例如,将键值对按照字母进行排序,最后 counts.collect()
把它们作为一个对象数组带回到驱动程序。
3.3.3 常见的转化操作
下面的表格列了 Spark 支持的一些常用 transformations。
转化操作 | 含义 |
---|---|
map(func) | 返回一个新的分布式数据集,将数据源的每一个元素传递给函数 func 映射组成 |
filter(func) | 返回一个新的数据集,从数据源中选中一些元素通过函数 func 返回 true |
flatMap(func) | 类似于 map,但是每个输入项能被映射成多个输出项(所以 func 必须返回一个 Seq,而不是单个 item)。 |
mapPartitions(func) | 类似于 map,但是分别运行在 RDD 的每个分区上,所以 func 的类型必须是 Iterator=> Iterator 当运行在类型为 T 的 RDD 上。 |
mapPartitionsWithIndex(func) | 类似于 mapPartitions,但是 func 需要提供一个 integer 值描述索引(index),所以 func 的类型必须是 (Int, Iterator) => Iterator 当运行在类型为 T 的 RDD 上。 |
sample(withReplacement, fraction, seed) | 对数据进行采样。 |
union(otherDataset) | 生成一个包含两个RDD中所有元素的RDD |
intersection(otherDataset) | 返回两个RDD共同的元素的RDD |
distinct([numTasks])) | 对RDD进行去重 |
groupByKey([numTasks]) | 对具有相同键的值进行分组 |
reduceByKey(func, [numTasks]) | 合并具有相同键的值 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | aggregateByKey函数对PairRDD中相同Key的值进行聚合操作,在聚合过程中同样使用了一个中立的初始值 |
sortByKey([ascending], [numTasks]) | 返回一个根据键升序或降序排序的RDD |
join(otherDataset, [numTasks]) | 对两个RDD具有相同键的键值对进行内连接. |
cogroup(otherDataset, [numTasks]) | 对两个RDD中拥有的相同键的数据分组到一起 |
cartesian(otherDataset) | 与另一个RDD的笛卡尔积 |
pipe(command, [envVars]) | 对RDD中的元素通过脚本管道执行脚本 |
coalesce(numPartitions, [shuffle]) | 该函数用于将RDD进行重分区,使用HashPartitioner。第一个参数为重分区的数目,第二个为是否进行shuffle,默认为false; |
repartition(numPartitions) | 对RDD数据集进行重分区操作,该函数其实就是coalesce函数第二个参数为true的实现 |
表1. 对一个数据为{1, 2, 3, 3}的RDD进行基本的RDD转化操作
函数名 | 示例 | 结果 |
---|---|---|
map() | rdd.map(x => x + 1) | {2, 3, 4, 4} |
flatMap() | rdd.flatMap(x => x.to(3)) | {1, 2, 3, 2, 3, 3, 3} |
filter() | rdd.filter(x => x != 1) | {2, 3, 3} |
distinct | rdd.distinct() | {1, 2, 3} |
sample(withReplacement, fraction, seed) | rdd.sample(false, 0.5) | 非确定 |
接下来,我们在Spark shell里验证一下上述的几个操作,完整的代码如下:
scala> val mRDD = sc.parallelize(Array(1, 2, 3, 3))
mRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at parallelize at <console>:24
scala> mRDD.map(x => x + 1).collect()
res30: Array[Int] = Array(2, 3, 4, 4)
scala> mRDD.flatMap(x => x.to(3)).collect()
res31: Array[Int] = Array(1, 2, 3, 2, 3, 3, 3)
scala> mRDD.filter(x => x != 1).collect()
res32: Array[Int] = Array(2, 3, 3)
scala> mRDD.distinct().collect()
res33: Array[Int] = Array(1, 2, 3)
scala> mRDD.sample(false, 0.5).collect()
res35: Array[Int] = Array()
scala> mRDD.sample(false, 0.5).collect()
res36: Array[Int] = Array(1, 2, 3, 3)
scala> mRDD.sample(false, 0.5).collect()
res37: Array[Int] = Array(1, 3)
上述的几个转化操作中,除sample转化操作每次返回不固定的元素,其他几个转化操作的结果都是可预期的。上述的例子中,我们为了把结果给打印出来,我们调用了 collect()
行动操作。
表2. 对数据分别为 {1, 2, 3} 和 {3, 4, 5} 的RDD进行针对两个RDD的转化操作
函数名 | 示例 | 结果 |
---|---|---|
union() | rdd.union(other) | {1, 2, 3, 3, 4, 5} |
intersection() | rdd.intersection(other) | {3} |
subtract() | rdd.subtract(other) | {1, 2} |
cartesian() | rdd.cartesian(other) | {(1, 3), (1, 4), … (3, 5)} |
同样的,我们在Spark shell里验证一下上述的几个操作,完整的代码如下:
scala> val firstRDD = sc.parallelize(Array(1, 2, 3))
firstRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:24
scala> val secondRDD = sc.parallelize(Array(3, 4, 5))
secondRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:24
scala> firstRDD.union(secondRDD).collect()
res26: Array[Int] = Array(1, 2, 3, 3, 4, 5)
scala> firstRDD.intersection(secondRDD).collect()
res27: Array[Int] = Array(3)
scala> firstRDD.subtract(secondRDD).collect()
res28: Array[Int] = Array(1, 2)
scala> firstRDD.cartesian(secondRDD).collect()
res29: Array[(Int, Int)] = Array((1,3), (1,4), (1,5), (2,3), (2,4), (2,5), (3,3), (3,4), (3,5))
3.3.4 常见的行动操作
下面的表格列了 Spark 支持的一些常用 actions。详细内容请参阅 RDD API 文档(Scala, Java, Python) 和 PairRDDFunctions 文档(Scala, Java)。
行动操作 | 含义 |
---|---|
reduce(func) | 并行整合RDD中所有数据 |
collect() | 返回RDD中的所有元素 |
count() | 返回RDD中的元素个数 |
first() | 返回RDD中的第一个元素 |
take(n) | 从RDD中返回n个元素 |
takeSample(withReplacement, num, [seed]) | 从RDD中返回任意一些元素 |
takeOrdered(n, [ordering]) | 从RDD中按照提供的顺序返回最前面的n个元素 |
saveAsTextFile(path) | 把RDD中的元素写入本地文件系统或者HDFS文件系统 |
saveAsSequenceFile(path) (Java and Scala) | 把RDD中的元素写入本地文件系统或HDFS文件系统的seqfile |
saveAsObjectFile(path) (Java and Scala) | 通过Java序列化把RDD中的元素写入文件系统(本地或HDFS),通过SparkContext.objectFile()加载 |
countByKey() | RDD中各元素的Key出现的次数 |
foreach(func) | 对RDD中的每个元素使用给定的函数 |
表3,对一个数据为 {1, 2, 3, 3} 的RDD进行基本的RDD行动操作:
函数名 | 示例 | 结果 |
---|---|---|
collect() | rdd.collect() | {1, 2, 3, 3} |
count() | rdd.count() | 4 |
countByValue() | rdd.countByValue() | {(1, 1), (2, 1), (3, 2)} |
take(num) | rdd.take(2) | {1, 2} |
top(num) | rdd.top(2) | {3, 3} |
takeOrdered(num)(ordering) | rdd.takeOrdered(2)(myOrdering) | {3, 3} |
takeSample(num) | rdd.takeSample(false, 1) | 非确定的 |
reduce(num) | rdd.reduce((x, y) = > x + y) | 9 |
fold(num) | rdd.fold(0)((x, y) => x + y) | 9 |
aggregate(zeroValue)(seqOp, combOp) | rdd.aggregate((0, 0))((x, y) => (x._1 + y, x._2 + 1), (x, y) => (x._1 + y._1, x._2 + y._2)) | (9, 4) |
foreach(func) | rdd.foreach(func) | 无 |
我们把上述示例的几个行动操作在Spark shell中验证一下,结果如下:
scala> val mRDD = sc.parallelize(Array(1, 2, 3, 3))
mRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[52] at parallelize at <console>:24
scala> mRDD.collect()
res41: Array[Int] = Array(1, 2, 3, 3)
scala> mRDD.count()
res42: Long = 4
scala> mRDD.countByValue()
res43: scala.collection.Map[Int,Long] = Map(1 -> 1, 2 -> 1, 3 -> 2)
scala> mRDD.take(2)
res44: Array[Int] = Array(1, 2)
scala> mRDD.top(2)
res45: Array[Int] = Array(3, 3)
scala> implicit val myOrd = implicitly[Ordering[Int]].reverse
myOrd: scala.math.Ordering[Int] = scala.math.Ordering$$anon$4@5dc5e110
scala> mRDD.takeOrdered(2)(myOrd)
res99: Array[Int] = Array(3, 3)
scala> mRDD.takeSample(false, 1)
res53: Array[Int] = Array(3)
scala> mRDD.takeSample(false, 1)
res54: Array[Int] = Array(2)
scala> mRDD.takeSample(false, 1)
res55: Array[Int] = Array(3)
scala> mRDD.reduce((x,y) => x+y)
res56: Int = 9
scala> mRDD.fold(0)((x,y) => x+y)
res57: Int = 9
scala> mRDD.aggregate((0, 0))((x,y) => (x._1 + y, x._2 + 1), (x, y) => (x._1 + y._1, x._2 + y._2))
res58: (Int, Int) = (9,4)
scala> mRDD.foreach(println)
3
2
3
1
备注:上述的takeSample操作为从RDD中返回任意一个元素,所以每次返回的结果都有可能不同。
3.4 RDD持久化
Spark最重要的一个功能是它可以通过各种操作(operations)持久化(或者缓存)一个集合到内存中。当你持久化一个RDD的时候,每一个节点都将参与计算的所有分区数据存储到内存中,并且这些数据可以被这个集合(以及这个集合衍生的其他集合)的动作(action)重复利用。这个能力使后续的动作速度更快(通常快10倍以上)。对应迭代算法和快速的交互使用来说,缓存是一个关键的工具。
你能通过 persist()
或者 cache()
方法持久化一个 RDD 。首先,在action中计算得到RDD;然后,将其保存在每个节点的内存中。Spark的缓存是一个容错的技术-如果RDD的任何一个分区丢失,它 可以通过原有的转换(transformations)操作自动的重复计算并且创建出这个分区。
此外,我们可以利用不同的存储级别存储每一个被持久化的RDD。例如,它允许我们持久化集合到磁盘上、将集合作为序列化的Java对象持久化到内存中、在节点间复制集合或者存储集合到Tachyon中。我们可以通过传递一个 StorageLevel
对象给 persist()
方法设置这些存储级别。cache()
方法使用了默认的存储级别— StorageLevel.MEMORY_ONLY
。完整的存储级别介绍如下所示:
存储级别 | 含义 |
---|---|
MEMORY_ONLY | 将RDD作为非序列化的Java对象存储在jvm中。如果RDD不适合存在内存中,一些分区将不会被缓存,从而在每次需要这些分区时都需重新计算它们。这是系统默认的存储级别。 |
MEMORY_AND_DISK | 将RDD作为非序列化的Java对象存储在jvm中。如果RDD不适合存在内存中,将这些不适合存在内存中的分区存储在磁盘中,每次需要时读出它们。 |
MEMORY_ONLY_SER | 将RDD作为序列化的Java对象存储(每个分区一个byte数组)。这种方式比非序列化方式更节省空间,特别是用到快速的序列化工具时,但是会更耗费cpu资源—密集的读操作。 |
MEMORY_AND_DISK_SER | 和MEMORY_ONLY_SER类似,但不是在每次需要时重复计算这些不适合存储到内存中的分区,而是将这些分区存储到磁盘中。 |
DISK_ONLY | 仅仅将RDD分区存储到磁盘中 |
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. | 和上面的存储级别类似,但是复制每个分区到集群的两个节点上面 |
OFF_HEAP (experimental) | 以序列化的格式存储RDD到Tachyon中。相对于MEMORY_ONLY_SER,OFF_HEAP减少了垃圾回收的花费,允许更小的执行者共享内存池。这使其在拥有大量内存的环境下或者多并发应用程序的环境中具有更强的吸引力。 |
Spark也会自动持久化一些shuffle操作(如 reduceByKey
)中的中间数据,即使用户没有调用 persist
方法。这样的好处是避免了在shuffle出错情况下,需要重复计算整个输入。如果用户计划重用 计算过程中产生的RDD,我们仍然推荐用户调用 persist
方法。
3.4.1 如何选择存储级别
Spark的多个存储级别意味着在内存利用率和cpu利用效率间的不同权衡。我们推荐通过下面的过程选择一个合适的存储级别:
-
如果你的RDD适合默认的存储级别(MEMORY_ONLY),就选择默认的存储级别。因为这是cpu利用率最高的选项,会使RDD上的操作尽可能的快。
-
如果不适合用默认的级别,选择MEMORY_ONLY_SER。选择一个更快的序列化库提高对象的空间使用率,但是仍能够相当快的访问。
-
除非函数计算RDD的花费较大或者它们需要过滤大量的数据,不要将RDD存储到磁盘上,否则,重复计算一个分区就会和重磁盘上读取数据一样慢。
-
如果你希望更快的错误恢复,可以利用重复(replicated)存储级别。所有的存储级别都可以通过重复计算丢失的数据来支持完整的容错,但是重复的数据能够使你在RDD上继续运行任务,而不需要重复计算丢失的数据。
-
在拥有大量内存的环境中或者多应用程序的环境中,OFF_HEAP具有如下优势:
- 它运行多个执行者共享Tachyon中相同的内存池
- 它显著地减少垃圾回收的花费
- 如果单个的执行者崩溃,缓存的数据不会丢失
3.4.2 删除数据
Spark自动的监控每个节点缓存的使用情况,利用最近最少使用原则删除老旧的数据。如果你想手动的删除RDD,可以使用 RDD.unpersist()
方法
4 共享变量
一般情况下,当一个传递给Spark操作(例如map和reduce)的函数在远程节点上面运行时,Spark操作实际上操作的是这个函数所用变量的一个独立副本。这些变量被复制到每台机器上,并且这些变量在远程机器上的所有更新都不会传递回驱动程序。通常跨任务的读写变量是低效的,但是,Spark还是为两种常见的使用模式提供了两种有限的共享变量:广播变量(broadcast variable)和累加器(accumulator)
4.1 广播变量
广播变量允许程序员缓存一个只读的变量在每台机器上面,而不是每个任务保存一份拷贝。例如,利用广播变量,我们能够以一种更有效率的方式将一个大数据量输入集合的副本分配给每个节点。Spark也尝试着利用有效的广播算法去分配广播变量,以减少通信的成本。
一个广播变量可以通过调用 SparkContext.broadcast(v)
方法从一个初始变量v中创建。广播变量是v的一个包装变量,它的值可以通过value方法访问,下面的代码说明了这个过程:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(87)
scala> broadcastVar.value
res114: Array[Int] = Array(1, 2, 3)
广播变量创建以后,我们就能够在集群的任何函数中使用它来代替变量v,这样我们就不需要再次传递变量v到每个节点上。另外,为了保证所有的节点得到广播变量具有相同的值,对象v不能在广播之后被修改。
4.2 累加器
顾名思义,累加器是一种只能通过关联操作进行“加”操作的变量,因此它能够高效的应用于并行操作中。它们能够用来实现 counters
和 sums
。Spark原生支持数值类型的累加器,开发者可以自己添加支持的类型。如果创建了一个具名的累加器,它可以在spark的UI中显示。这对于理解运行阶段(running stages)的过程有很重要的作用。
一个累加器可以通过调用 SparkContext.accumulator(v)
方法从一个初始变量v中创建。运行在集群上的任务可以通过 add
方法或者使用 +=
操作来给它加值。然而,它们无法读取这个值。只有驱动程序可以使用 value
方法来读取累加器的值。如下代码,展示了如何利用累加器将一个数组里面的所有元素相加:
scala> val accum = sc.accumulator(0, "My Accumulator")
accum: org.apache.spark.Accumulator[Int] = 0
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
scala> accum.value
res2: Int = 10
这个例子利用了内置的整数类型累加器。开发者可以利用子类 AccumulatorParam
创建自己的 累加器类型。AccumulatorParam
接口有两个方法:zero
方法为你的数据类型提供一个“0 值”(zero value);addInPlace
方法计算两个值的和。例如,假设我们有一个Vector
类代表数学上的向量,我们能够定义如下累加器:
object VectorAccumulatorParam extends AccumulatorParam[Vector] {
def zero(initialValue: Vector): Vector = {
Vector.zeros(initialValue.size)
}
def addInPlace(v1: Vector, v2: Vector): Vector = {
v1 += v2
}
}
// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)
在scala中,Spark支持用更一般的Accumulable接口来累积数据-结果类型和用于累加的元素类型 不一样(例如通过收集的元素建立一个列表)。Spark也支持用 SparkContext.accumulableCollection
方法累加一般的scala集合类型。
5. 总结
简单介绍了Spark核心概念和RDD操作,通过这些基本的转化操作和行动操作,就可以进行简单的Spark应用开发。