Spark筆記:RDD基本操作(上)


  本文主要是講解spark里RDD的基礎操作。RDD是spark特有的數據模型,談到RDD就會提到什么彈性分布式數據集,什么有向無環圖,本文暫時不去展開這些高深概念,在閱讀本文時候,大家可以就把RDD當作一個數組,這樣的理解對我們學習RDD的API是非常有幫助的。本文所有示例代碼都是使用scala語言編寫的。

  Spark里的計算都是操作RDD進行,那么學習RDD的第一個問題就是如何構建RDD,構建RDD從數據來源角度分為兩類:第一類是從內存里直接讀取數據,第二類就是從文件系統里讀取,當然這里的文件系統種類很多常見的就是HDFS以及本地文件系統了。

  第一類方式從內存里構造RDD,使用的方法:makeRDD和parallelize方法,如下代碼所示:

 

    /* 使用makeRDD創建RDD */
    /* List */
    val rdd01 = sc.makeRDD(List(1,2,3,4,5,6))
    val r01 = rdd01.map { x => x * x }
    println(r01.collect().mkString(","))
    /* Array */
    val rdd02 = sc.makeRDD(Array(1,2,3,4,5,6))
    val r02 = rdd02.filter { x => x < 5}
    println(r02.collect().mkString(","))

    val rdd03 = sc.parallelize(List(1,2,3,4,5,6), 1)
    val r03 = rdd03.map { x => x + 1 }
    println(r03.collect().mkString(","))
    /* Array */
    val rdd04 = sc.parallelize(List(1,2,3,4,5,6), 1)
    val r04 = rdd04.filter { x => x > 3 }
    println(r04.collect().mkString(","))

 

  大家看到了RDD本質就是一個數組,因此構造數據時候使用的是List(鏈表)和Array(數組)類型。

  第二類方式是通過文件系統構造RDD,代碼如下所示:

    val rdd:RDD[String] = sc.textFile("file:///D:/sparkdata.txt", 1)
    val r:RDD[String] = rdd.flatMap { x => x.split(",") }
    println(r.collect().mkString(","))

  這里例子使用的是本地文件系統,所以文件路徑協議前綴是file://。

  構造了RDD對象了,接下來就是如何操作RDD對象了,RDD的操作分為轉化操作(transformation)和行動操作(action),RDD之所以將操作分成這兩類這是和RDD惰性運算有關,當RDD執行轉化操作時候,實際計算並沒有被執行,只有當RDD執行行動操作時候才會促發計算任務提交,執行相應的計算操作。區別轉化操作和行動操作也非常簡單,轉化操作就是從一個RDD產生一個新的RDD操作,而行動操作就是進行實際的計算。

  下面是RDD的基礎操作API介紹:

操作類型

函數名

作用

轉化操作

map()

參數是函數,函數應用於RDD每一個元素,返回值是新的RDD

flatMap()

參數是函數,函數應用於RDD每一個元素,將元素數據進行拆分,變成迭代器,返回值是新的RDD

filter()

參數是函數,函數會過濾掉不符合條件的元素,返回值是新的RDD

distinct()

沒有參數,將RDD里的元素進行去重操作

union()

參數是RDD,生成包含兩個RDD所有元素的新RDD

intersection()

參數是RDD,求出兩個RDD的共同元素

subtract()

參數是RDD,將原RDD里和參數RDD里相同的元素去掉

cartesian()

參數是RDD,求兩個RDD的笛卡兒積

行動操作

collect()

返回RDD所有元素

count()

RDD里元素個數

countByValue()

各元素在RDD中出現次數

reduce()

並行整合所有RDD數據,例如求和操作

fold(0)(func)

和reduce功能一樣,不過fold帶有初始值

aggregate(0)(seqOp,combop)

和reduce功能一樣,但是返回的RDD數據類型和原RDD不一樣

foreach(func)

對RDD每個元素都是使用特定函數

  下面是以上API操作的示例代碼,如下:

  轉化操作:

    val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
    val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
    val rddFile:RDD[String] = sc.textFile(path, 1)
    
    val rdd01:RDD[Int] = sc.makeRDD(List(1,3,5,3))
    val rdd02:RDD[Int] = sc.makeRDD(List(2,4,5,1))

    /* map操作 */
    println("======map操作======")
    println(rddInt.map(x => x + 1).collect().mkString(","))
    println("======map操作======")
    /* filter操作 */
    println("======filter操作======")
    println(rddInt.filter(x => x > 4).collect().mkString(","))
    println("======filter操作======")
    /* flatMap操作 */
    println("======flatMap操作======")
    println(rddFile.flatMap { x => x.split(",") }.first())
    println("======flatMap操作======")
    /* distinct去重操作 */
    println("======distinct去重======")
    println(rddInt.distinct().collect().mkString(","))
    println(rddStr.distinct().collect().mkString(","))
    println("======distinct去重======")
    /* union操作 */
    println("======union操作======")
    println(rdd01.union(rdd02).collect().mkString(","))
    println("======union操作======")
    /* intersection操作 */
    println("======intersection操作======")
    println(rdd01.intersection(rdd02).collect().mkString(","))
    println("======intersection操作======")
    /* subtract操作 */
    println("======subtract操作======")
    println(rdd01.subtract(rdd02).collect().mkString(","))
    println("======subtract操作======")
    /* cartesian操作 */
    println("======cartesian操作======")
    println(rdd01.cartesian(rdd02).collect().mkString(","))
    println("======cartesian操作======")

  行動操作代碼如下:

    val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
    val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
    
    /* count操作 */
    println("======count操作======")
    println(rddInt.count())
    println("======count操作======")   
    /* countByValue操作 */
    println("======countByValue操作======")
    println(rddInt.countByValue())
    println("======countByValue操作======")
    /* reduce操作 */
    println("======countByValue操作======")
    println(rddInt.reduce((x ,y) => x + y))
    println("======countByValue操作======")
    /* fold操作 */
    println("======fold操作======")
    println(rddInt.fold(0)((x ,y) => x + y))
    println("======fold操作======")
    /* aggregate操作 */
    println("======aggregate操作======")
    val res:(Int,Int) = rddInt.aggregate((0,0))((x,y) => (x._1 + x._2,y),(x,y) => (x._1 + x._2,y._1 + y._2))
    println(res._1 + "," + res._2)
    println("======aggregate操作======")
    /* foeach操作 */
    println("======foeach操作======")
    println(rddStr.foreach { x => println(x) })
    println("======foeach操作======") 

  RDD操作暫時先學習到這里,剩下的內容在下一篇里再談了,下面我要說說如何開發spark,安裝spark的內容我后面會使用專門的文章進行講解,這里我們假設已經安裝好了spark,那么我們就可以在已經裝好的spark服務器上使用spark-shell進行與spark交互的shell,這里我們直接可以敲打代碼編寫spark程序。但是spark-shell畢竟使用太麻煩,而且spark-shell一次只能使用一個用戶,當另外一個用戶要使用spark-shell就會把前一個用戶踢掉,而且shell也沒有IDE那種代碼補全,代碼校驗的功能,使用起來很是痛苦。

  不過spark的確是一個神奇的框架,這里的神奇就是指spark本地開發調試非常簡單,本地開發調試不需要任何已經裝好的spark系統,我們只需要建立一個項目,這個項目可以是java的也可以是scala,然后我們將spark-assembly-1.6.1-hadoop2.6.0.jar這樣的jar放入項目的環境里,這個時候我們就可以在本地開發調試spark程序了。

  大家請看我們裝有scala插件的eclipse里的完整代碼:

package cn.com.sparktest

import org.apache.spark.SparkConf
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

object SparkTest {
  val conf:SparkConf = new SparkConf().setAppName("xtq").setMaster("local[2]")
  val sc:SparkContext = new SparkContext(conf)
  
  /**
   * 創建數據的方式--從內存里構造數據(基礎)
   */
  def createDataMethod():Unit = {
    /* 使用makeRDD創建RDD */
    /* List */
    val rdd01 = sc.makeRDD(List(1,2,3,4,5,6))
    val r01 = rdd01.map { x => x * x }
    println("===================createDataMethod:makeRDD:List=====================")
    println(r01.collect().mkString(","))
    println("===================createDataMethod:makeRDD:List=====================")
    /* Array */
    val rdd02 = sc.makeRDD(Array(1,2,3,4,5,6))
    val r02 = rdd02.filter { x => x < 5}
    println("===================createDataMethod:makeRDD:Array=====================")
    println(r02.collect().mkString(","))
    println("===================createDataMethod:makeRDD:Array=====================")
    
    /* 使用parallelize創建RDD */
    /* List */
    val rdd03 = sc.parallelize(List(1,2,3,4,5,6), 1)
    val r03 = rdd03.map { x => x + 1 }
    println("===================createDataMethod:parallelize:List=====================")
    println(r03.collect().mkString(","))
    println("===================createDataMethod:parallelize:List=====================")
    /* Array */
    val rdd04 = sc.parallelize(List(1,2,3,4,5,6), 1)
    val r04 = rdd04.filter { x => x > 3 }
    println("===================createDataMethod:parallelize:Array=====================")
    println(r04.collect().mkString(","))
    println("===================createDataMethod:parallelize:Array=====================")
  }
  
  /**
   * 創建Pair Map
   */
  def createPairRDD():Unit = {
    val rdd:RDD[(String,Int)] = sc.makeRDD(List(("key01",1),("key02",2),("key03",3)))
    val r:RDD[String] = rdd.keys
    println("===========================createPairRDD=================================")
    println(r.collect().mkString(","))
    println("===========================createPairRDD=================================")
  }
  
  /**
   * 通過文件創建RDD
   * 文件數據:
   * 	key01,1,2.3
		  key02,5,3.7
      key03,23,4.8
      key04,12,3.9
      key05,7,1.3
   */
  def createDataFromFile(path:String):Unit = {
    val rdd:RDD[String] = sc.textFile(path, 1)
    val r:RDD[String] = rdd.flatMap { x => x.split(",") }
    println("=========================createDataFromFile==================================")
    println(r.collect().mkString(","))
    println("=========================createDataFromFile==================================")
  }
  
  /**
   * 基本的RDD操作
   */
  def basicTransformRDD(path:String):Unit = {
    val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
    val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
    val rddFile:RDD[String] = sc.textFile(path, 1)
    
    val rdd01:RDD[Int] = sc.makeRDD(List(1,3,5,3))
    val rdd02:RDD[Int] = sc.makeRDD(List(2,4,5,1))

    /* map操作 */
    println("======map操作======")
    println(rddInt.map(x => x + 1).collect().mkString(","))
    println("======map操作======")
    /* filter操作 */
    println("======filter操作======")
    println(rddInt.filter(x => x > 4).collect().mkString(","))
    println("======filter操作======")
    /* flatMap操作 */
    println("======flatMap操作======")
    println(rddFile.flatMap { x => x.split(",") }.first())
    println("======flatMap操作======")
    /* distinct去重操作 */
    println("======distinct去重======")
    println(rddInt.distinct().collect().mkString(","))
    println(rddStr.distinct().collect().mkString(","))
    println("======distinct去重======")
    /* union操作 */
    println("======union操作======")
    println(rdd01.union(rdd02).collect().mkString(","))
    println("======union操作======")
    /* intersection操作 */
    println("======intersection操作======")
    println(rdd01.intersection(rdd02).collect().mkString(","))
    println("======intersection操作======")
    /* subtract操作 */
    println("======subtract操作======")
    println(rdd01.subtract(rdd02).collect().mkString(","))
    println("======subtract操作======")
    /* cartesian操作 */
    println("======cartesian操作======")
    println(rdd01.cartesian(rdd02).collect().mkString(","))
    println("======cartesian操作======")    
  }
  
  /**
   * 基本的RDD行動操作
   */
  def basicActionRDD():Unit = {
    val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
    val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
    
    /* count操作 */
    println("======count操作======")
    println(rddInt.count())
    println("======count操作======")   
    /* countByValue操作 */
    println("======countByValue操作======")
    println(rddInt.countByValue())
    println("======countByValue操作======")
    /* reduce操作 */
    println("======countByValue操作======")
    println(rddInt.reduce((x ,y) => x + y))
    println("======countByValue操作======")
    /* fold操作 */
    println("======fold操作======")
    println(rddInt.fold(0)((x ,y) => x + y))
    println("======fold操作======")
    /* aggregate操作 */
    println("======aggregate操作======")
    val res:(Int,Int) = rddInt.aggregate((0,0))((x,y) => (x._1 + x._2,y),(x,y) => (x._1 + x._2,y._1 + y._2))
    println(res._1 + "," + res._2)
    println("======aggregate操作======")
    /* foeach操作 */
    println("======foeach操作======")
    println(rddStr.foreach { x => println(x) })
    println("======foeach操作======")    
  }
  
  def main(args: Array[String]): Unit = {
    println(System.getenv("HADOOP_HOME"))
    createDataMethod()
    createPairRDD()
    createDataFromFile("file:///D:/sparkdata.txt")
    basicTransformRDD("file:///D:/sparkdata.txt")
    basicActionRDD()
    /*打印結果*/
    /*D://hadoop
===================createDataMethod:makeRDD:List=====================
1,4,9,16,25,36
===================createDataMethod:makeRDD:List=====================
===================createDataMethod:makeRDD:Array=====================
1,2,3,4
===================createDataMethod:makeRDD:Array=====================
===================createDataMethod:parallelize:List=====================
2,3,4,5,6,7
===================createDataMethod:parallelize:List=====================
===================createDataMethod:parallelize:Array=====================
4,5,6
===================createDataMethod:parallelize:Array=====================
===========================createPairRDD=================================
key01,key02,key03
===========================createPairRDD=================================
key01,1,2.3,key02,5,3.7,key03,23,4.8,key04,12,3.9,key05,7,1.3
=========================createDataFromFile==================================
2,3,4,5,6,7,3,6,2
======map操作======
======filter操作======
5,6,5
======filter操作======
======flatMap操作======
key01
======flatMap操作======
======distinct去重======
4,6,2,1,3,5
======distinct去重======
======union操作======
1,3,5,3,2,4,5,1
======union操作======
======intersection操作======
1,5
======intersection操作======
======subtract操作======
3,3
======subtract操作======
======cartesian操作======
(1,2),(1,4),(3,2),(3,4),(1,5),(1,1),(3,5),(3,1),(5,2),(5,4),(3,2),(3,4),(5,5),(5,1),(3,5),(3,1)
======cartesian操作======
======count操作======
9
======count操作======
======countByValue操作======
Map(5 -> 2, 1 -> 2, 6 -> 1, 2 -> 2, 3 -> 1, 4 -> 1)
======countByValue操作======
======countByValue操作======
29
======countByValue操作======
======fold操作======
29
======fold操作======
======aggregate操作======
19,10
======aggregate操作======
======foeach操作======
a
b
c
d
b
a
======foeach操作======*/
  }
}

  Spark執行時候我們需要構造一個SparkContenxt的環境變量,構造環境變量時候需要構造一個SparkConf對象,例如代碼:setAppName("xtq").setMaster("local[2]")

  appName就是spark任務名稱,master為local[2]是指使用本地模式,啟動2個線程完成spark任務。

  在eclipse里運行spark程序時候,會報出如下錯誤:

java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
	at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:355)
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:370)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:363)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
	at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:104)
	at org.apache.hadoop.security.Groups.<init>(Groups.java:86)
	at org.apache.hadoop.security.Groups.<init>(Groups.java:66)
	at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:280)
	at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:271)
	at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:248)
	at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:763)
	at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:748)
	at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:621)
	at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2160)
	at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2160)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2160)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:322)
	at cn.com.sparktest.SparkTest$.<init>(SparkTest.scala:10)
	at cn.com.sparktest.SparkTest$.<clinit>(SparkTest.scala)
	at cn.com.sparktest.SparkTest.main(SparkTest.scala)

  該錯誤不會影響程序的運算,但總是讓人覺得不舒服,這個問題是因為spark運行依賴於hadoop,可是在window下其實是無法安裝hadoop,只能使用cygwin模擬安裝,而新版本的hadoop在windows下使用需要使用winutils.exe,解決這個問題很簡單,就是下載一個winutils.exe,注意下自己操作系統是32位還是64位,找到對應版本,然后放置在這樣的目錄下:

  D:\hadoop\bin\winutils.exe

  然后再環境變量里定義HADOOP_HOME= D:\hadoop

  環境變量的改變要重啟eclipse,這樣環境變量才會生效,這個時候程序運行就不會報出錯誤了。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM