本文主要是講解spark里RDD的基礎操作。RDD是spark特有的數據模型,談到RDD就會提到什么彈性分布式數據集,什么有向無環圖,本文暫時不去展開這些高深概念,在閱讀本文時候,大家可以就把RDD當作一個數組,這樣的理解對我們學習RDD的API是非常有幫助的。本文所有示例代碼都是使用scala語言編寫的。
Spark里的計算都是操作RDD進行,那么學習RDD的第一個問題就是如何構建RDD,構建RDD從數據來源角度分為兩類:第一類是從內存里直接讀取數據,第二類就是從文件系統里讀取,當然這里的文件系統種類很多常見的就是HDFS以及本地文件系統了。
第一類方式從內存里構造RDD,使用的方法:makeRDD和parallelize方法,如下代碼所示:
/* 使用makeRDD創建RDD */
/* List */
val rdd01 = sc.makeRDD(List(1,2,3,4,5,6))
val r01 = rdd01.map { x => x * x }
println(r01.collect().mkString(","))
/* Array */
val rdd02 = sc.makeRDD(Array(1,2,3,4,5,6))
val r02 = rdd02.filter { x => x < 5}
println(r02.collect().mkString(","))
val rdd03 = sc.parallelize(List(1,2,3,4,5,6), 1)
val r03 = rdd03.map { x => x + 1 }
println(r03.collect().mkString(","))
/* Array */
val rdd04 = sc.parallelize(List(1,2,3,4,5,6), 1)
val r04 = rdd04.filter { x => x > 3 }
println(r04.collect().mkString(","))
大家看到了RDD本質就是一個數組,因此構造數據時候使用的是List(鏈表)和Array(數組)類型。
第二類方式是通過文件系統構造RDD,代碼如下所示:
val rdd:RDD[String] = sc.textFile("file:///D:/sparkdata.txt", 1)
val r:RDD[String] = rdd.flatMap { x => x.split(",") }
println(r.collect().mkString(","))
這里例子使用的是本地文件系統,所以文件路徑協議前綴是file://。
構造了RDD對象了,接下來就是如何操作RDD對象了,RDD的操作分為轉化操作(transformation)和行動操作(action),RDD之所以將操作分成這兩類這是和RDD惰性運算有關,當RDD執行轉化操作時候,實際計算並沒有被執行,只有當RDD執行行動操作時候才會促發計算任務提交,執行相應的計算操作。區別轉化操作和行動操作也非常簡單,轉化操作就是從一個RDD產生一個新的RDD操作,而行動操作就是進行實際的計算。
下面是RDD的基礎操作API介紹:
| 操作類型 |
函數名 |
作用 |
| 轉化操作 |
map() |
參數是函數,函數應用於RDD每一個元素,返回值是新的RDD |
| flatMap() |
參數是函數,函數應用於RDD每一個元素,將元素數據進行拆分,變成迭代器,返回值是新的RDD |
|
| filter() |
參數是函數,函數會過濾掉不符合條件的元素,返回值是新的RDD |
|
| distinct() |
沒有參數,將RDD里的元素進行去重操作 |
|
| union() |
參數是RDD,生成包含兩個RDD所有元素的新RDD |
|
| intersection() |
參數是RDD,求出兩個RDD的共同元素 |
|
| subtract() |
參數是RDD,將原RDD里和參數RDD里相同的元素去掉 |
|
| cartesian() |
參數是RDD,求兩個RDD的笛卡兒積 |
|
| 行動操作 |
collect() |
返回RDD所有元素 |
| count() |
RDD里元素個數 |
|
| countByValue() |
各元素在RDD中出現次數 |
|
| reduce() |
並行整合所有RDD數據,例如求和操作 |
|
| fold(0)(func) |
和reduce功能一樣,不過fold帶有初始值 |
|
| aggregate(0)(seqOp,combop) |
和reduce功能一樣,但是返回的RDD數據類型和原RDD不一樣 |
|
| foreach(func) |
對RDD每個元素都是使用特定函數 |
下面是以上API操作的示例代碼,如下:
轉化操作:
val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
val rddFile:RDD[String] = sc.textFile(path, 1)
val rdd01:RDD[Int] = sc.makeRDD(List(1,3,5,3))
val rdd02:RDD[Int] = sc.makeRDD(List(2,4,5,1))
/* map操作 */
println("======map操作======")
println(rddInt.map(x => x + 1).collect().mkString(","))
println("======map操作======")
/* filter操作 */
println("======filter操作======")
println(rddInt.filter(x => x > 4).collect().mkString(","))
println("======filter操作======")
/* flatMap操作 */
println("======flatMap操作======")
println(rddFile.flatMap { x => x.split(",") }.first())
println("======flatMap操作======")
/* distinct去重操作 */
println("======distinct去重======")
println(rddInt.distinct().collect().mkString(","))
println(rddStr.distinct().collect().mkString(","))
println("======distinct去重======")
/* union操作 */
println("======union操作======")
println(rdd01.union(rdd02).collect().mkString(","))
println("======union操作======")
/* intersection操作 */
println("======intersection操作======")
println(rdd01.intersection(rdd02).collect().mkString(","))
println("======intersection操作======")
/* subtract操作 */
println("======subtract操作======")
println(rdd01.subtract(rdd02).collect().mkString(","))
println("======subtract操作======")
/* cartesian操作 */
println("======cartesian操作======")
println(rdd01.cartesian(rdd02).collect().mkString(","))
println("======cartesian操作======")
行動操作代碼如下:
val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
/* count操作 */
println("======count操作======")
println(rddInt.count())
println("======count操作======")
/* countByValue操作 */
println("======countByValue操作======")
println(rddInt.countByValue())
println("======countByValue操作======")
/* reduce操作 */
println("======countByValue操作======")
println(rddInt.reduce((x ,y) => x + y))
println("======countByValue操作======")
/* fold操作 */
println("======fold操作======")
println(rddInt.fold(0)((x ,y) => x + y))
println("======fold操作======")
/* aggregate操作 */
println("======aggregate操作======")
val res:(Int,Int) = rddInt.aggregate((0,0))((x,y) => (x._1 + x._2,y),(x,y) => (x._1 + x._2,y._1 + y._2))
println(res._1 + "," + res._2)
println("======aggregate操作======")
/* foeach操作 */
println("======foeach操作======")
println(rddStr.foreach { x => println(x) })
println("======foeach操作======")
RDD操作暫時先學習到這里,剩下的內容在下一篇里再談了,下面我要說說如何開發spark,安裝spark的內容我后面會使用專門的文章進行講解,這里我們假設已經安裝好了spark,那么我們就可以在已經裝好的spark服務器上使用spark-shell進行與spark交互的shell,這里我們直接可以敲打代碼編寫spark程序。但是spark-shell畢竟使用太麻煩,而且spark-shell一次只能使用一個用戶,當另外一個用戶要使用spark-shell就會把前一個用戶踢掉,而且shell也沒有IDE那種代碼補全,代碼校驗的功能,使用起來很是痛苦。
不過spark的確是一個神奇的框架,這里的神奇就是指spark本地開發調試非常簡單,本地開發調試不需要任何已經裝好的spark系統,我們只需要建立一個項目,這個項目可以是java的也可以是scala,然后我們將spark-assembly-1.6.1-hadoop2.6.0.jar這樣的jar放入項目的環境里,這個時候我們就可以在本地開發調試spark程序了。
大家請看我們裝有scala插件的eclipse里的完整代碼:
package cn.com.sparktest
import org.apache.spark.SparkConf
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
object SparkTest {
val conf:SparkConf = new SparkConf().setAppName("xtq").setMaster("local[2]")
val sc:SparkContext = new SparkContext(conf)
/**
* 創建數據的方式--從內存里構造數據(基礎)
*/
def createDataMethod():Unit = {
/* 使用makeRDD創建RDD */
/* List */
val rdd01 = sc.makeRDD(List(1,2,3,4,5,6))
val r01 = rdd01.map { x => x * x }
println("===================createDataMethod:makeRDD:List=====================")
println(r01.collect().mkString(","))
println("===================createDataMethod:makeRDD:List=====================")
/* Array */
val rdd02 = sc.makeRDD(Array(1,2,3,4,5,6))
val r02 = rdd02.filter { x => x < 5}
println("===================createDataMethod:makeRDD:Array=====================")
println(r02.collect().mkString(","))
println("===================createDataMethod:makeRDD:Array=====================")
/* 使用parallelize創建RDD */
/* List */
val rdd03 = sc.parallelize(List(1,2,3,4,5,6), 1)
val r03 = rdd03.map { x => x + 1 }
println("===================createDataMethod:parallelize:List=====================")
println(r03.collect().mkString(","))
println("===================createDataMethod:parallelize:List=====================")
/* Array */
val rdd04 = sc.parallelize(List(1,2,3,4,5,6), 1)
val r04 = rdd04.filter { x => x > 3 }
println("===================createDataMethod:parallelize:Array=====================")
println(r04.collect().mkString(","))
println("===================createDataMethod:parallelize:Array=====================")
}
/**
* 創建Pair Map
*/
def createPairRDD():Unit = {
val rdd:RDD[(String,Int)] = sc.makeRDD(List(("key01",1),("key02",2),("key03",3)))
val r:RDD[String] = rdd.keys
println("===========================createPairRDD=================================")
println(r.collect().mkString(","))
println("===========================createPairRDD=================================")
}
/**
* 通過文件創建RDD
* 文件數據:
* key01,1,2.3
key02,5,3.7
key03,23,4.8
key04,12,3.9
key05,7,1.3
*/
def createDataFromFile(path:String):Unit = {
val rdd:RDD[String] = sc.textFile(path, 1)
val r:RDD[String] = rdd.flatMap { x => x.split(",") }
println("=========================createDataFromFile==================================")
println(r.collect().mkString(","))
println("=========================createDataFromFile==================================")
}
/**
* 基本的RDD操作
*/
def basicTransformRDD(path:String):Unit = {
val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
val rddFile:RDD[String] = sc.textFile(path, 1)
val rdd01:RDD[Int] = sc.makeRDD(List(1,3,5,3))
val rdd02:RDD[Int] = sc.makeRDD(List(2,4,5,1))
/* map操作 */
println("======map操作======")
println(rddInt.map(x => x + 1).collect().mkString(","))
println("======map操作======")
/* filter操作 */
println("======filter操作======")
println(rddInt.filter(x => x > 4).collect().mkString(","))
println("======filter操作======")
/* flatMap操作 */
println("======flatMap操作======")
println(rddFile.flatMap { x => x.split(",") }.first())
println("======flatMap操作======")
/* distinct去重操作 */
println("======distinct去重======")
println(rddInt.distinct().collect().mkString(","))
println(rddStr.distinct().collect().mkString(","))
println("======distinct去重======")
/* union操作 */
println("======union操作======")
println(rdd01.union(rdd02).collect().mkString(","))
println("======union操作======")
/* intersection操作 */
println("======intersection操作======")
println(rdd01.intersection(rdd02).collect().mkString(","))
println("======intersection操作======")
/* subtract操作 */
println("======subtract操作======")
println(rdd01.subtract(rdd02).collect().mkString(","))
println("======subtract操作======")
/* cartesian操作 */
println("======cartesian操作======")
println(rdd01.cartesian(rdd02).collect().mkString(","))
println("======cartesian操作======")
}
/**
* 基本的RDD行動操作
*/
def basicActionRDD():Unit = {
val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
/* count操作 */
println("======count操作======")
println(rddInt.count())
println("======count操作======")
/* countByValue操作 */
println("======countByValue操作======")
println(rddInt.countByValue())
println("======countByValue操作======")
/* reduce操作 */
println("======countByValue操作======")
println(rddInt.reduce((x ,y) => x + y))
println("======countByValue操作======")
/* fold操作 */
println("======fold操作======")
println(rddInt.fold(0)((x ,y) => x + y))
println("======fold操作======")
/* aggregate操作 */
println("======aggregate操作======")
val res:(Int,Int) = rddInt.aggregate((0,0))((x,y) => (x._1 + x._2,y),(x,y) => (x._1 + x._2,y._1 + y._2))
println(res._1 + "," + res._2)
println("======aggregate操作======")
/* foeach操作 */
println("======foeach操作======")
println(rddStr.foreach { x => println(x) })
println("======foeach操作======")
}
def main(args: Array[String]): Unit = {
println(System.getenv("HADOOP_HOME"))
createDataMethod()
createPairRDD()
createDataFromFile("file:///D:/sparkdata.txt")
basicTransformRDD("file:///D:/sparkdata.txt")
basicActionRDD()
/*打印結果*/
/*D://hadoop
===================createDataMethod:makeRDD:List=====================
1,4,9,16,25,36
===================createDataMethod:makeRDD:List=====================
===================createDataMethod:makeRDD:Array=====================
1,2,3,4
===================createDataMethod:makeRDD:Array=====================
===================createDataMethod:parallelize:List=====================
2,3,4,5,6,7
===================createDataMethod:parallelize:List=====================
===================createDataMethod:parallelize:Array=====================
4,5,6
===================createDataMethod:parallelize:Array=====================
===========================createPairRDD=================================
key01,key02,key03
===========================createPairRDD=================================
key01,1,2.3,key02,5,3.7,key03,23,4.8,key04,12,3.9,key05,7,1.3
=========================createDataFromFile==================================
2,3,4,5,6,7,3,6,2
======map操作======
======filter操作======
5,6,5
======filter操作======
======flatMap操作======
key01
======flatMap操作======
======distinct去重======
4,6,2,1,3,5
======distinct去重======
======union操作======
1,3,5,3,2,4,5,1
======union操作======
======intersection操作======
1,5
======intersection操作======
======subtract操作======
3,3
======subtract操作======
======cartesian操作======
(1,2),(1,4),(3,2),(3,4),(1,5),(1,1),(3,5),(3,1),(5,2),(5,4),(3,2),(3,4),(5,5),(5,1),(3,5),(3,1)
======cartesian操作======
======count操作======
9
======count操作======
======countByValue操作======
Map(5 -> 2, 1 -> 2, 6 -> 1, 2 -> 2, 3 -> 1, 4 -> 1)
======countByValue操作======
======countByValue操作======
29
======countByValue操作======
======fold操作======
29
======fold操作======
======aggregate操作======
19,10
======aggregate操作======
======foeach操作======
a
b
c
d
b
a
======foeach操作======*/
}
}
Spark執行時候我們需要構造一個SparkContenxt的環境變量,構造環境變量時候需要構造一個SparkConf對象,例如代碼:setAppName("xtq").setMaster("local[2]")
appName就是spark任務名稱,master為local[2]是指使用本地模式,啟動2個線程完成spark任務。
在eclipse里運行spark程序時候,會報出如下錯誤:
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries. at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:355) at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:370) at org.apache.hadoop.util.Shell.<clinit>(Shell.java:363) at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79) at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:104) at org.apache.hadoop.security.Groups.<init>(Groups.java:86) at org.apache.hadoop.security.Groups.<init>(Groups.java:66) at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:280) at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:271) at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:248) at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:763) at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:748) at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:621) at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2160) at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2160) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2160) at org.apache.spark.SparkContext.<init>(SparkContext.scala:322) at cn.com.sparktest.SparkTest$.<init>(SparkTest.scala:10) at cn.com.sparktest.SparkTest$.<clinit>(SparkTest.scala) at cn.com.sparktest.SparkTest.main(SparkTest.scala)
該錯誤不會影響程序的運算,但總是讓人覺得不舒服,這個問題是因為spark運行依賴於hadoop,可是在window下其實是無法安裝hadoop,只能使用cygwin模擬安裝,而新版本的hadoop在windows下使用需要使用winutils.exe,解決這個問題很簡單,就是下載一個winutils.exe,注意下自己操作系統是32位還是64位,找到對應版本,然后放置在這樣的目錄下:
D:\hadoop\bin\winutils.exe
然后再環境變量里定義HADOOP_HOME= D:\hadoop
環境變量的改變要重啟eclipse,這樣環境變量才會生效,這個時候程序運行就不會報出錯誤了。
