Spark編程入門


1. 交互式Spark-Shell

根據前一節已經搭建好的Hadoop和Spark環境,直接通過腳本啟動Hadoop和Spark服務。如果 http://localhost:8080 能夠訪問,說明Spark服務已經啟動。Spark為我們提供了PySpark以及Spark-shell,可以方便的通過交互試界面調試Spark應用。接下來我們將采用Spark-Shell來調試Spark程序。在終端中輸入如下命令: spark-shell --master spark://spark-B470:7077, master后面的URL就是Spark Master的URL ,可以在 http://localhost:8080 的頁面上找到。

  1. hadoop@spark-B470:~/Develop$ spark-shell --master spark://spark-B470:7077
  2. Setting default log level to "WARN".
  3. To adjust logging level use sc.setLogLevel(newLevel).
  4. SLF4J: Class path contains multiple SLF4J bindings.
  5. SLF4J: Found binding in [jar:file:/usr/local/spark/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  6. SLF4J: Found binding in [jar:file:/usr/local/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  7. SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
  8. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
  9. 16/10/29 23:04:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  10. 16/10/29 23:04:25 WARN Utils: Your hostname, spark-B470 resolves to a loopback address: 127.0.1.1; using 192.168.1.110 instead (on interface enp4s0)
  11. 16/10/29 23:04:25 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
  12. 16/10/29 23:04:27 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect.
  13. Spark context Web UI available at http://192.168.1.110:4040
  14. Spark context available as 'sc' (master = spark://spark-B470:7077, app id = app-20161029230426-0000).
  15. Spark session available as 'spark'.
  16. Welcome to
  17. ____ __
  18. / __/__ ___ _____/ /__
  19. _\ \/ _ \/ _ `/ __/ '_/
  20. /___/ .__/\_,_/_/ /_/\_\ version 2.0.0
  21. /_/
  22. Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_102)
  23. Type in expressions to have them evaluated.
  24. Type :help for more information.
  25. scala>

看到Scala的交互式窗口后,可以在 http://localhost:8080 的web上 Running Applications 這一欄 看到剛啟動的應用程序,如果你要退出Spark-Shell,按 CTRL D組合鍵退出。

spark_web_ui

Spark 最主要的抽象是叫 Resilient Distributed Dataset(RDD) 的彈性分布式集合。RDDs 可以使用 Hadoop InputFormats(例如 HDFS 文件)創建,也可以從其他的 RDDs 轉換。讓我們在 Spark 源代碼目錄從 /etc/protocols 文本文件中創建一個新的 RDD。

  1. scala> val file = sc.textFile("file:///etc/protocols")
  2. file: org.apache.spark.rdd.RDD[String] = file:///etc/protocols MapPartitionsRDD[5] at textFile at <console>:24
  3. scala> file.count()
  4. res3: Long = 64
  5. scala> file.first()
  6. res4: String = # Internet (IP) protocols

上面的操作中創建了一個RDD file,執行了兩個簡單的操作:

  • count() 獲取RDD的行數
  • first() 獲取第一行的內容

我們繼續執行其他操作,比如查找有多少行含有tcp和udp字符串:

  1. scala> file.filter(line=>line.contains("tcp")).count()
  2. res2: Long = 1
  3. scala> file.filter(line=>line.contains("udp")).count()
  4. res3: Long = 2

查看一共有多少個不同單詞的方法,這里用到MapReduce的思路:

  1. scala> val wordcount = file.flatMap(line=>line.split(" ")).map(word=>(word,1)).reduceByKey((x,y)=>x+y)
  2. wordcount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[6] at reduceByKey at <console>:26
  3. scala> wordcount.count()
  4. res4: Long = 243

2. Spark 核心概念

現在你已經用Spark-shell運行了一段Spark程序,是時候對Spark編程作更細致的了解。

從上層來看,每個Spark應用都由一個驅動器程序(driver program)來發起集群 上的各種並行操作。驅動器程序包包含應用的 main 函數, 並且定義了集群上的分布式數據集,還對這些分布式數據集應用了相關的操作。在前面的例子里,實際的驅動器程序就是Spark shell 本身,你只要輸入想運行的操作就可以了。

驅動器程序通過一個 SparkContext 對象來訪問Spark,這個對象代表對計算集群的一個連接。shell啟動時已經自動創建了一個 SparkContext 對象,是一個叫作 sc 的變量。我們可以通過在shell里嘗試輸出 sc 來查看它的類型。

  1. scala> sc
  2. res1: org.apache.spark.SparkContext = org.apache.spark.SparkContext@1f172892

一旦有了 SparkContext,你就可以通過它來創建RDD。就像前面的例子,我們調用了 sc.textFile() 來創建一個代表文件中各行文本的RDD。我們可以在這些行上進行各種並行操作,比如 count()

要執行這些操作,驅動器程序一般要管理多個執行器(executor)節點。比如,如果我們在集群上運行count() 操作,那么不同的節點會統計文件的不同部分的行數。由於我們剛才是在本地模式下運行的Spark shell,因此所有的工作會在單個節點上執行,但你可以將這個shell連接到集群上來進行並行的數據分析。下圖展示Spark如何在一個集群上運行。

Spark分布式集群

3. RDD編程

3.1 RDD基礎

Spark中的RDD就是一個不可變的分布式對象集合。每個RDD都被分為多個分區,這些分區運行在集群中的不同節點上。用戶可以使用兩種方法創建RDD:讀取一個外部數據集,或在驅動程序里分發驅動器程序中的對象集合(比如list和set)。我們在前面的例子已經使用 SparkContext.textFile() 來讀取文本文件作為一個字符串RDD。創建出來后的RDD支持兩種類型的操作:轉化操作(transformation) 和 行動操作(action)

轉化操作會由一個RDD生成一個新的RDD。例如,根據謂詞匹配情況篩選數據就是一個常見的轉化操作。在我們的文本示例中,我們可以用篩選來生成一個只存儲包含單詞tcp的字符串的的新的RDD。示例如下:

  1. scala> val tcpLines = file.filter(line => line.contains("tcp"))
  2. tcpLines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at filter at <console>:26

行動操作會對RDD計算一個結果,並把結果返回到驅動器程序中,或者把結果存儲到外部存儲系統(如HDFS)中。first() 就是我們之前調用的一個行動操作,它會返回RDD的一個元素,示例如下:

  1. scala> tcpLines.first()
  2. res3: String = tcp 6 TCP # transmission control protocol

轉化操作和行動操作的區別在於Spark計算RDD的方式不同。雖然你可以在任何時候定義新的RDD,但是Spark只會惰性計算這些RDD。它們只有第一次在一個行動操作中用到時,才會真正計算。這種策略剛開始看起來可能會顯得有些奇怪,不過在大數據領域是很有道理的。比如,我們以一個文本文件定義了數據, 然后把其中包含tcp的行篩選出來。如果Spark在運行 val file = sc.textFile("file:///etc/protocols") 時就把文件中的所有行都讀取並存儲起來,就會消耗很多的存儲空間,而我們馬上就要篩選掉其中的很多數據。相反,一旦Spark了解了完整的轉化操作鏈之后,它就可以只計算求結果時真正需要的數據。事實上,在行動操作first()中,Spark只需要掃描文件直到找到第一個匹配的行為止,而不需要讀取整個文件。

最后,默認情況下,Spark的RDD會在你每次對它們進行行動操作時重新計算。如果想在多個行動操作中重用同一個RDD,可以使用 RDD.persist() 讓Spark把這個RDD緩存下來。在第一次對持久化的RDD計算之后,Spark會把RDD的內容保存到內存中(以分區方式存儲到集群中的各機器上),這樣在之后的行動操作就可以重用這些數據了。Spark在默認情況下不進行持久化可能顯得有些奇怪,不過這對於大規模數據集是很有意義的:如果不會重用該RDD,我們就沒必要浪費存儲空間,Spark可以直接遍歷一遍數據然后計算出結果。

在實際操作中,你會經常用 persist() 來把數據的一部分讀取到內存中,並反復查詢這部分數據。例如,我們想多次對
/etc/protocols 文件包含tcp的行進行計算,就可以寫出如下腳本:

  1. scala> tcpLines.persist()
  2. res8: tcpLines.type = MapPartitionsRDD[3] at filter at <console>:26
  3. scala> tcpLines.count()
  4. res9: Long = 1
  5. scala> tcpLines.first()
  6. res10: String = tcp 6 TCP # transmission control protocol

總的來說,每個Spark程序一般的工作流程:

  1. 從外部數據創建輸入RDD。
  2. 使用諸如 filter() 這樣的轉化操作對RDD進行轉化,以定義新的RDD。
  3. 告訴Spark對需要被重用的中間結果RDD執行persist() 操作。
  4. 使用行動操作(如count()和first()等)來觸發一次並行計算,Spark會對計算進行優化后再執行。

3.2 創建RDD

Spark提供了兩種創建RDD 的方式:讀取外部數據集,以及在驅動程序中對一個集合進行並行化。

3.2.1 並行集合

並行集合 (Parallelized collections) 的創建是通過在一個已有的集合(Scala Seq)上調用 SparkContext 的 parallelize 方法實現的。集合中的元素被復制到一個可並行操作的分布式數據集中。例如,這里演示了如何在一個包含 1 到 10 的數組中創建並行集合:

  1. scala> val data = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
  2. data: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
  3. scala> val distData = sc.parallelize(data)
  4. distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:26

一旦創建完成,這個分布式數據集(distData)就可以被並行操作。例如,我們可以調用 distData.reduce((a, b) => a + b) 將這個數組中的元素相加。我們以后再描述在分布式上的一些操作。

並行集合一個很重要的參數是切片數(slices),表示一個數據集切分的份數。Spark 會在集群上為每一個切片運行一個任務。你可以在集群上為每個 CPU 設置 2-4 個切片(slices)。正常情況下,Spark 會試着基於你的集群狀況自動地設置切片的數目。然而,你也可以通過 parallelize 的第二個參數手動地設置(例如:sc.parallelize(data, 10))。

注意:除了開發原型和測試時,這種方式用的並不多,畢竟這種方式需要把你的整個數據集先放在一台機器的內存中。

3.2.2 外部數據集

Spark 可以從任何一個 Hadoop 支持的存儲源創建分布式數據集,包括本地文件系統,HDFS,Cassandra,HBase,Amazon S3等。 Spark 支持文本文件(text files),SequenceFiles 和其他 Hadoop InputFormat。

文本文件 RDDs 可以使用 SparkContext.textFile() 方法創建。在這個方法里傳入文件的 URI (機器上的本地路徑 file:// 或 hdfs://,s3n:// 等),然后它會將文件讀取成一個行集合。這里是一個調用例子:

  1. scala> val distFile = sc.textFile("file:///home/hadoop/Develop/start.sh")
  2. distFile: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/Develop/start.sh MapPartitionsRDD[14] at textFile at <console>:24

一旦創建完成,distFile 就能做數據集操作。例如,我們可以用下面的方式使用 map 和 reduce 操作將所有行的長度相加:distFile.map(s => s.length).reduce((a, b) => a + b) 。

注意,Spark 讀文件時:

  • 如果使用本地文件系統路徑,文件必須能在 work 節點上用相同的路徑訪問到。要么復制文件到所有的 workers,要么使用網絡的方式共享文件系統。
  • 所有 Spark 的基於文件的方法,包括 textFile,能很好地支持文件目錄,壓縮過的文件和通配符。例如,你可以使用 textFile(“/my/文件目錄”),textFile(“/my/文件目錄/.txt”) 和 textFile(“/my/文件目錄/.gz”)。
  • textFile 方法也可以選擇第二個可選參數來控制切片(slices)的數目。默認情況下,Spark 為每一個文件塊(HDFS 默認文件塊大小是 64M)創建一個切片(slice)。但是你也可以通過一個更大的值來設置一個更高的切片數目。注意,你不能設置一個小於文件塊數目的切片值。

3.3 RDD操作

RDD 支持兩種類型的操作:轉化操作(transformations) 從已經存在的數據集中創建一個新的數據集;行動操作(actions) 在數據集上進行計算之后返回一個值到驅動器程序。例如,map 是一個轉化操作,它將每一個數據集元素傳遞給一個函數並且返回一個新的 RDD。另一方面,reduce 是一個行動操作,它使用相同的函數來聚合 RDD 的所有元素,並且將最終的結果返回到驅動器程序(不過也有一個並行 reduceByKey 能返回一個分布式數據集)。

3.3.1 向Spark傳遞函數

Spark的大部分轉化操作和一部分行動操作,都需要依賴用戶傳遞函數來計算。這里推薦兩種方式:

  • 匿名函數 (Anonymous function syntax),可以在比較短的代碼中使用。
  • 全局單例對象里的靜態方法。例如,你可以定義 object MyFunctions 然后傳遞 MyFounctions.func1,像下面這樣:
  1. object MyFunctions {
  2. def func1(s: String): String = { ... }
  3. }
  4. myRdd.map(MyFunctions.func1)

注意,它可能傳遞的是一個類實例里的一個方法引用(而不是一個單例對象),這里必須傳送包含方法的整個對象。例如:

  1. class MyClass {
  2. def func1(s: String): String = { ... }
  3. def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
  4. }

這里,如果我們創建了一個 new MyClass 對象,並且調用它的 doStuffmap 里面引用了這個 MyClass 實例中的 func1 方法,所以這個對象必須傳送到集群上。類似寫成 rdd.map(x => this.func1(x))

以類似的方式,訪問外部對象的字段將會引用整個對象:

  1. class MyClass {
  2. val field = "Hello"
  3. def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
  4. }

相當於寫成 rdd.map(x => this.field + x),引用了整個 this 對象。為了避免這個問題,最簡單的方式是復制 field 到一個本地變量而不是從外部訪問它:

  1. def doStuff(rdd: RDD[String]): RDD[String] = {
  2. val field_ = this.field
  3. rdd.map(x => field_ + x)
  4. }
3.3.2 使用鍵值對

雖然很多 Spark 操作工作在包含任意類型對象的RDD上的,但是少數幾個特殊操作僅僅在鍵值(key-value)對RDD上可用。最常見的是分布式 “shuffle” 操作,例如根據一個 key 對一組數據進行分組和聚合。

在 Scala 中,這些操作在包含二元組(Tuple2)(在語言的內建元組中,通過簡單的寫 (a, b) 創建) 的 RDD 上自動地變成可用的,只要在你的程序中導入 org.apache.spark.SparkContext._ 來啟用 Spark 的隱式轉換。在 PairRDDFunctions 的類里鍵值對操作是可以使用的,如果你導入隱式轉換它會自動地包裝成元組 RDD。

例如,下面的代碼在鍵值對上使用 reduceByKey 操作來統計在一個文件里每一行文本內容出現的次數:

  1. val lines = sc.textFile("data.txt")
  2. val pairs = lines.map(s => (s, 1))
  3. val counts = pairs.reduceByKey((a, b) => a + b)

我們也可以使用 counts.sortByKey(),例如,將鍵值對按照字母進行排序,最后 counts.collect() 把它們作為一個對象數組帶回到驅動程序。

3.3.3 常見的轉化操作

下面的表格列了 Spark 支持的一些常用 transformations。

轉化操作 含義
map(func) 返回一個新的分布式數據集,將數據源的每一個元素傳遞給函數 func 映射組成
filter(func) 返回一個新的數據集,從數據源中選中一些元素通過函數 func 返回 true
flatMap(func) 類似於 map,但是每個輸入項能被映射成多個輸出項(所以 func 必須返回一個 Seq,而不是單個 item)。
mapPartitions(func) 類似於 map,但是分別運行在 RDD 的每個分區上,所以 func 的類型必須是 Iterator=> Iterator 當運行在類型為 T 的 RDD 上。
mapPartitionsWithIndex(func) 類似於 mapPartitions,但是 func 需要提供一個 integer 值描述索引(index),所以 func 的類型必須是 (Int, Iterator) => Iterator 當運行在類型為 T 的 RDD 上。
sample(withReplacement, fraction, seed) 對數據進行采樣。
union(otherDataset) 生成一個包含兩個RDD中所有元素的RDD
intersection(otherDataset) 返回兩個RDD共同的元素的RDD
distinct([numTasks])) 對RDD進行去重
groupByKey([numTasks]) 對具有相同鍵的值進行分組
reduceByKey(func, [numTasks]) 合並具有相同鍵的值
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) aggregateByKey函數對PairRDD中相同Key的值進行聚合操作,在聚合過程中同樣使用了一個中立的初始值
sortByKey([ascending], [numTasks]) 返回一個根據鍵升序或降序排序的RDD
join(otherDataset, [numTasks]) 對兩個RDD具有相同鍵的鍵值對進行內連接.
cogroup(otherDataset, [numTasks]) 對兩個RDD中擁有的相同鍵的數據分組到一起
cartesian(otherDataset) 與另一個RDD的笛卡爾積
pipe(command, [envVars]) 對RDD中的元素通過腳本管道執行腳本
coalesce(numPartitions, [shuffle]) 該函數用於將RDD進行重分區,使用HashPartitioner。第一個參數為重分區的數目,第二個為是否進行shuffle,默認為false;
repartition(numPartitions) 對RDD數據集進行重分區操作,該函數其實就是coalesce函數第二個參數為true的實現

表1. 對一個數據為{1, 2, 3, 3}的RDD進行基本的RDD轉化操作

函數名 示例 結果
map() rdd.map(x => x + 1) {2, 3, 4, 4}
flatMap() rdd.flatMap(x => x.to(3)) {1, 2, 3, 2, 3, 3, 3}
filter() rdd.filter(x => x != 1) {2, 3, 3}
distinct rdd.distinct() {1, 2, 3}
sample(withReplacement, fraction, seed) rdd.sample(false, 0.5) 非確定

接下來,我們在Spark shell里驗證一下上述的幾個操作,完整的代碼如下:

  1. scala> val mRDD = sc.parallelize(Array(1, 2, 3, 3))
  2. mRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at parallelize at <console>:24
  3. scala> mRDD.map(x => x + 1).collect()
  4. res30: Array[Int] = Array(2, 3, 4, 4)
  5. scala> mRDD.flatMap(x => x.to(3)).collect()
  6. res31: Array[Int] = Array(1, 2, 3, 2, 3, 3, 3)
  7. scala> mRDD.filter(x => x != 1).collect()
  8. res32: Array[Int] = Array(2, 3, 3)
  9. scala> mRDD.distinct().collect()
  10. res33: Array[Int] = Array(1, 2, 3)
  11. scala> mRDD.sample(false, 0.5).collect()
  12. res35: Array[Int] = Array()
  13. scala> mRDD.sample(false, 0.5).collect()
  14. res36: Array[Int] = Array(1, 2, 3, 3)
  15. scala> mRDD.sample(false, 0.5).collect()
  16. res37: Array[Int] = Array(1, 3)

上述的幾個轉化操作中,除sample轉化操作每次返回不固定的元素,其他幾個轉化操作的結果都是可預期的。上述的例子中,我們為了把結果給打印出來,我們調用了 collect() 行動操作。

表2. 對數據分別為 {1, 2, 3} 和 {3, 4, 5} 的RDD進行針對兩個RDD的轉化操作

函數名 示例 結果
union() rdd.union(other) {1, 2, 3, 3, 4, 5}
intersection() rdd.intersection(other) {3}
subtract() rdd.subtract(other) {1, 2}
cartesian() rdd.cartesian(other) {(1, 3), (1, 4), … (3, 5)}

同樣的,我們在Spark shell里驗證一下上述的幾個操作,完整的代碼如下:

  1. scala> val firstRDD = sc.parallelize(Array(1, 2, 3))
  2. firstRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:24
  3. scala> val secondRDD = sc.parallelize(Array(3, 4, 5))
  4. secondRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:24
  5. scala> firstRDD.union(secondRDD).collect()
  6. res26: Array[Int] = Array(1, 2, 3, 3, 4, 5)
  7. scala> firstRDD.intersection(secondRDD).collect()
  8. res27: Array[Int] = Array(3)
  9. scala> firstRDD.subtract(secondRDD).collect()
  10. res28: Array[Int] = Array(1, 2)
  11. scala> firstRDD.cartesian(secondRDD).collect()
  12. res29: Array[(Int, Int)] = Array((1,3), (1,4), (1,5), (2,3), (2,4), (2,5), (3,3), (3,4), (3,5))
3.3.4 常見的行動操作

下面的表格列了 Spark 支持的一些常用 actions。詳細內容請參閱 RDD API 文檔(ScalaJavaPython) 和 PairRDDFunctions 文檔(ScalaJava)。

行動操作 含義
reduce(func) 並行整合RDD中所有數據
collect() 返回RDD中的所有元素
count() 返回RDD中的元素個數
first() 返回RDD中的第一個元素
take(n) 從RDD中返回n個元素
takeSample(withReplacement, num, [seed]) 從RDD中返回任意一些元素
takeOrdered(n, [ordering]) 從RDD中按照提供的順序返回最前面的n個元素
saveAsTextFile(path) 把RDD中的元素寫入本地文件系統或者HDFS文件系統
saveAsSequenceFile(path) (Java and Scala) 把RDD中的元素寫入本地文件系統或HDFS文件系統的seqfile
saveAsObjectFile(path) (Java and Scala) 通過Java序列化把RDD中的元素寫入文件系統(本地或HDFS),通過SparkContext.objectFile()加載
countByKey() RDD中各元素的Key出現的次數
foreach(func) 對RDD中的每個元素使用給定的函數

表3,對一個數據為 {1, 2, 3, 3} 的RDD進行基本的RDD行動操作:

函數名 示例 結果
collect() rdd.collect() {1, 2, 3, 3}
count() rdd.count() 4
countByValue() rdd.countByValue() {(1, 1), (2, 1), (3, 2)}
take(num) rdd.take(2) {1, 2}
top(num) rdd.top(2) {3, 3}
takeOrdered(num)(ordering) rdd.takeOrdered(2)(myOrdering) {3, 3}
takeSample(num) rdd.takeSample(false, 1) 非確定的
reduce(num) rdd.reduce((x, y) = > x + y) 9
fold(num) rdd.fold(0)((x, y) => x + y) 9
aggregate(zeroValue)(seqOp, combOp) rdd.aggregate((0, 0))((x, y) => (x._1 + y, x._2 + 1), (x, y) => (x._1 + y._1, x._2 + y._2)) (9, 4)
foreach(func) rdd.foreach(func)

我們把上述示例的幾個行動操作在Spark shell中驗證一下,結果如下:

  1. scala> val mRDD = sc.parallelize(Array(1, 2, 3, 3))
  2. mRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[52] at parallelize at <console>:24
  3. scala> mRDD.collect()
  4. res41: Array[Int] = Array(1, 2, 3, 3)
  5. scala> mRDD.count()
  6. res42: Long = 4
  7. scala> mRDD.countByValue()
  8. res43: scala.collection.Map[Int,Long] = Map(1 -> 1, 2 -> 1, 3 -> 2)
  9. scala> mRDD.take(2)
  10. res44: Array[Int] = Array(1, 2)
  11. scala> mRDD.top(2)
  12. res45: Array[Int] = Array(3, 3)
  13. scala> implicit val myOrd = implicitly[Ordering[Int]].reverse
  14. myOrd: scala.math.Ordering[Int] = scala.math.Ordering$$anon$4@5dc5e110
  15. scala> mRDD.takeOrdered(2)(myOrd)
  16. res99: Array[Int] = Array(3, 3)
  17. scala> mRDD.takeSample(false, 1)
  18. res53: Array[Int] = Array(3)
  19. scala> mRDD.takeSample(false, 1)
  20. res54: Array[Int] = Array(2)
  21. scala> mRDD.takeSample(false, 1)
  22. res55: Array[Int] = Array(3)
  23. scala> mRDD.reduce((x,y) => x+y)
  24. res56: Int = 9
  25. scala> mRDD.fold(0)((x,y) => x+y)
  26. res57: Int = 9
  27. scala> mRDD.aggregate((0, 0))((x,y) => (x._1 + y, x._2 + 1), (x, y) => (x._1 + y._1, x._2 + y._2))
  28. res58: (Int, Int) = (9,4)
  29. scala> mRDD.foreach(println)
  30. 3
  31. 2
  32. 3
  33. 1

備注:上述的takeSample操作為從RDD中返回任意一個元素,所以每次返回的結果都有可能不同。

3.4 RDD持久化

Spark最重要的一個功能是它可以通過各種操作(operations)持久化(或者緩存)一個集合到內存中。當你持久化一個RDD的時候,每一個節點都將參與計算的所有分區數據存儲到內存中,並且這些數據可以被這個集合(以及這個集合衍生的其他集合)的動作(action)重復利用。這個能力使后續的動作速度更快(通常快10倍以上)。對應迭代算法和快速的交互使用來說,緩存是一個關鍵的工具。

你能通過 persist() 或者 cache() 方法持久化一個 RDD 。首先,在action中計算得到RDD;然后,將其保存在每個節點的內存中。Spark的緩存是一個容錯的技術-如果RDD的任何一個分區丟失,它 可以通過原有的轉換(transformations)操作自動的重復計算並且創建出這個分區。

此外,我們可以利用不同的存儲級別存儲每一個被持久化的RDD。例如,它允許我們持久化集合到磁盤上、將集合作為序列化的Java對象持久化到內存中、在節點間復制集合或者存儲集合到Tachyon中。我們可以通過傳遞一個 StorageLevel 對象給 persist() 方法設置這些存儲級別。cache() 方法使用了默認的存儲級別— StorageLevel.MEMORY_ONLY。完整的存儲級別介紹如下所示:

存儲級別 含義
MEMORY_ONLY 將RDD作為非序列化的Java對象存儲在jvm中。如果RDD不適合存在內存中,一些分區將不會被緩存,從而在每次需要這些分區時都需重新計算它們。這是系統默認的存儲級別。
MEMORY_AND_DISK 將RDD作為非序列化的Java對象存儲在jvm中。如果RDD不適合存在內存中,將這些不適合存在內存中的分區存儲在磁盤中,每次需要時讀出它們。
MEMORY_ONLY_SER 將RDD作為序列化的Java對象存儲(每個分區一個byte數組)。這種方式比非序列化方式更節省空間,特別是用到快速的序列化工具時,但是會更耗費cpu資源—密集的讀操作。
MEMORY_AND_DISK_SER 和MEMORY_ONLY_SER類似,但不是在每次需要時重復計算這些不適合存儲到內存中的分區,而是將這些分區存儲到磁盤中。
DISK_ONLY 僅僅將RDD分區存儲到磁盤中
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. 和上面的存儲級別類似,但是復制每個分區到集群的兩個節點上面
OFF_HEAP (experimental) 以序列化的格式存儲RDD到Tachyon中。相對於MEMORY_ONLY_SER,OFF_HEAP減少了垃圾回收的花費,允許更小的執行者共享內存池。這使其在擁有大量內存的環境下或者多並發應用程序的環境中具有更強的吸引力。

Spark也會自動持久化一些shuffle操作(如 reduceByKey)中的中間數據,即使用戶沒有調用 persist 方法。這樣的好處是避免了在shuffle出錯情況下,需要重復計算整個輸入。如果用戶計划重用 計算過程中產生的RDD,我們仍然推薦用戶調用 persist 方法。

3.4.1 如何選擇存儲級別

Spark的多個存儲級別意味着在內存利用率和cpu利用效率間的不同權衡。我們推薦通過下面的過程選擇一個合適的存儲級別:

  • 如果你的RDD適合默認的存儲級別(MEMORY_ONLY),就選擇默認的存儲級別。因為這是cpu利用率最高的選項,會使RDD上的操作盡可能的快。

  • 如果不適合用默認的級別,選擇MEMORY_ONLY_SER。選擇一個更快的序列化庫提高對象的空間使用率,但是仍能夠相當快的訪問。

  • 除非函數計算RDD的花費較大或者它們需要過濾大量的數據,不要將RDD存儲到磁盤上,否則,重復計算一個分區就會和重磁盤上讀取數據一樣慢。

  • 如果你希望更快的錯誤恢復,可以利用重復(replicated)存儲級別。所有的存儲級別都可以通過重復計算丟失的數據來支持完整的容錯,但是重復的數據能夠使你在RDD上繼續運行任務,而不需要重復計算丟失的數據。

  • 在擁有大量內存的環境中或者多應用程序的環境中,OFF_HEAP具有如下優勢:

    • 它運行多個執行者共享Tachyon中相同的內存池
    • 它顯著地減少垃圾回收的花費
    • 如果單個的執行者崩潰,緩存的數據不會丟失
3.4.2 刪除數據

Spark自動的監控每個節點緩存的使用情況,利用最近最少使用原則刪除老舊的數據。如果你想手動的刪除RDD,可以使用 RDD.unpersist() 方法

4 共享變量

一般情況下,當一個傳遞給Spark操作(例如map和reduce)的函數在遠程節點上面運行時,Spark操作實際上操作的是這個函數所用變量的一個獨立副本。這些變量被復制到每台機器上,並且這些變量在遠程機器上的所有更新都不會傳遞回驅動程序。通常跨任務的讀寫變量是低效的,但是,Spark還是為兩種常見的使用模式提供了兩種有限的共享變量:廣播變量(broadcast variable)和累加器(accumulator)

4.1 廣播變量

廣播變量允許程序員緩存一個只讀的變量在每台機器上面,而不是每個任務保存一份拷貝。例如,利用廣播變量,我們能夠以一種更有效率的方式將一個大數據量輸入集合的副本分配給每個節點。Spark也嘗試着利用有效的廣播算法去分配廣播變量,以減少通信的成本。

一個廣播變量可以通過調用 SparkContext.broadcast(v) 方法從一個初始變量v中創建。廣播變量是v的一個包裝變量,它的值可以通過value方法訪問,下面的代碼說明了這個過程:

  1. scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
  2. broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(87)
  3. scala> broadcastVar.value
  4. res114: Array[Int] = Array(1, 2, 3)

廣播變量創建以后,我們就能夠在集群的任何函數中使用它來代替變量v,這樣我們就不需要再次傳遞變量v到每個節點上。另外,為了保證所有的節點得到廣播變量具有相同的值,對象v不能在廣播之后被修改。

4.2 累加器

顧名思義,累加器是一種只能通過關聯操作進行“加”操作的變量,因此它能夠高效的應用於並行操作中。它們能夠用來實現 counters 和 sums。Spark原生支持數值類型的累加器,開發者可以自己添加支持的類型。如果創建了一個具名的累加器,它可以在spark的UI中顯示。這對於理解運行階段(running stages)的過程有很重要的作用。

一個累加器可以通過調用 SparkContext.accumulator(v) 方法從一個初始變量v中創建。運行在集群上的任務可以通過 add 方法或者使用 += 操作來給它加值。然而,它們無法讀取這個值。只有驅動程序可以使用 value 方法來讀取累加器的值。如下代碼,展示了如何利用累加器將一個數組里面的所有元素相加:

  1. scala> val accum = sc.accumulator(0, "My Accumulator")
  2. accum: org.apache.spark.Accumulator[Int] = 0
  3. scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
  4. scala> accum.value
  5. res2: Int = 10

這個例子利用了內置的整數類型累加器。開發者可以利用子類 AccumulatorParam 創建自己的 累加器類型。AccumulatorParam 接口有兩個方法:zero 方法為你的數據類型提供一個“0 值”(zero value);addInPlace 方法計算兩個值的和。例如,假設我們有一個Vector 類代表數學上的向量,我們能夠定義如下累加器:

  1. object VectorAccumulatorParam extends AccumulatorParam[Vector] {
  2. def zero(initialValue: Vector): Vector = {
  3. Vector.zeros(initialValue.size)
  4. }
  5. def addInPlace(v1: Vector, v2: Vector): Vector = {
  6. v1 += v2
  7. }
  8. }
  9. // Then, create an Accumulator of this type:
  10. val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)

在scala中,Spark支持用更一般的Accumulable接口來累積數據-結果類型和用於累加的元素類型 不一樣(例如通過收集的元素建立一個列表)。Spark也支持用 SparkContext.accumulableCollection 方法累加一般的scala集合類型。

5. 總結

簡單介紹了Spark核心概念和RDD操作,通過這些基本的轉化操作和行動操作,就可以進行簡單的Spark應用開發。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM