Spark筆記:RDD基本操作(下)


  上一篇里我提到可以把RDD當作一個數組,這樣我們在學習spark的API時候很多問題就能很好理解了。上篇文章里的API也都是基於RDD是數組的數據模型而進行操作的。

  Spark是一個計算框架,是對mapreduce計算框架的改進,mapreduce計算框架是基於鍵值對也就是map的形式,之所以使用鍵值對是人們發現世界上大部分計算都可以使用map這樣的簡單計算模型進行計算。但是Spark里的計算模型卻是數組形式,RDD如何處理Map的數據格式了?本篇文章就主要講解RDD是如何處理Map的數據格式。

  Pair RDD及鍵值對RDD,Spark里創建Pair RDD也是可以通過兩種途徑,一種是從內存里讀取,一種是從文件讀取。

  首先是從文件讀取,上篇里我們看到使用textFile方法讀取文件,讀取的文件是按行組織成一個數組,要讓其變成map格式就的進行轉化,代碼如下所示:

    /*
     * 測試文件數據:
     * x01,1,4
       x02,11,1
    x01,3,9
    x01,2,6
       x02,18,12
       x03,7,9
     * 
     * */
    val rddFile:RDD[(String,String)] = sc.textFile("file:///F:/sparkdata01.txt", 1).map { x => (x.split(",")(0),x.split(",")(1) + "," + x.split(",")(2)) }
    val rFile:RDD[String] = rddFile.keys
    println("=========createPairMap File=========")
    println(rFile.collect().mkString(","))// x01,x02,x01,x01,x02,x03
    println("=========createPairMap File=========")

  我們由此可以看到以讀取文件方式構造RDD,我們需要使用map函數進行轉化,讓其變成map的形式。

  下面是通過內存方式進行創建,代碼如下:

    val rdd:RDD[(String,Int)] = sc.makeRDD(List(("k01",3),("k02",6),("k03",2),("k01",26)))
    val r:RDD[(String,Int)] = rdd.reduceByKey((x,y) => x + y)
    println("=========createPairMap=========")
    println(r.collect().mkString(","))// (k01,29),(k03,2),(k02,6)
    println("=========createPairMap=========")

  RDD任然是數組形式,只不過數組的元素是("k01",3)格式是scala里面特有的Tuple2及二元組,元組可以當作一個集合,這個集合可以是各種不同數據類型組合而成,二元組就是只包含兩個元素的元組。

  由此可見Pair RDD也是數組,只不過是一個元素為二元組的數組而已,上篇里對RDD的操作也是同樣適用於Pair RDD的。

  下面是Pair RDD的API講解,同樣我們先說轉化操作的API:

reduceByKey:合並具有相同鍵的值;
groupByKey:對具有相同鍵的值進行分組;
keys:返回一個僅包含鍵值的RDD;
values:返回一個僅包含值的RDD;
sortByKey:返回一個根據鍵值排序的RDD;
flatMapValues:針對Pair RDD中的每個值應用一個返回迭代器的函數,然后對返回的每個元素都生成一個對應原鍵的鍵值對記錄;
mapValues:對Pair RDD里每一個值應用一個函數,但是不會對鍵值進行操作;
combineByKey:使用不同的返回類型合並具有相同鍵的值;
subtractByKey:操作的RDD我們命名為RDD1,參數RDD命名為參數RDD,剔除掉RDD1里和參數RDD中鍵相同的元素;
join:對兩個RDD進行內連接;
rightOuterJoin:對兩個RDD進行連接操作,第一個RDD的鍵必須存在,第二個RDD的鍵不再第一個RDD里面有那么就會被剔除掉,相同鍵的值會被合並;
leftOuterJoin:對兩個RDD進行連接操作,第二個RDD的鍵必須存在,第一個RDD的鍵不再第二個RDD里面有那么就會被剔除掉,相同鍵的值會被合並;
cogroup:將兩個RDD里相同鍵的數據分組在一起

  下面就是行動操作的API了,具體如下:

countByKey:對每個鍵的元素進行分別計數;
collectAsMap:將結果變成一個map;
lookup:在RDD里使用鍵值查找數據

  接下來我再提提那些不是很常用的RDD操作,具體如下:

  轉化操作的:

sample:對RDD采樣;

  行動操作:

take(num):返回RDD里num個元素,隨機的;
top(num):返回RDD里最前面的num個元素,這個方法實用性還比較高;
takeSample:從RDD里返回任意一些元素;
sample:對RDD里的數據采樣;
takeOrdered:從RDD里按照提供的順序返回最前面的num個元素

  接下來就是示例代碼了,如下所示:

package cn.com.sparktest

import org.apache.spark.SparkConf
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.util.collection.CompactBuffer

object SparkPairMap {
  
  val conf:SparkConf = new SparkConf().setAppName("spark pair map").setMaster("local[2]")
  val sc:SparkContext = new SparkContext(conf)
 
  /**
   * 構建Pair RDD
   */
  def createPairMap():Unit = {
    val rdd:RDD[(String,Int)] = sc.makeRDD(List(("k01",3),("k02",6),("k03",2),("k01",26)))
    val r:RDD[(String,Int)] = rdd.reduceByKey((x,y) => x + y)
    println("=========createPairMap=========")
    println(r.collect().mkString(","))// (k01,29),(k03,2),(k02,6)
    println("=========createPairMap=========")
    
    /*
     * 測試文件數據:
     * x01,1,4
			 x02,11,1
			 x01,3,9
			 x01,2,6
       x02,18,12
       x03,7,9
     * 
     * */
    val rddFile:RDD[(String,String)] = sc.textFile("file:///F:/sparkdata01.txt", 1).map { x => (x.split(",")(0),x.split(",")(1) + "," + x.split(",")(2)) }
    val rFile:RDD[String] = rddFile.keys
    println("=========createPairMap File=========")
    println(rFile.collect().mkString(","))// x01,x02,x01,x01,x02,x03
    println("=========createPairMap File=========")
  }
  
  /**
   * 關於Pair RDD的轉化操作和行動操作
   */
  def pairMapRDD(path:String):Unit = {
    val rdd:RDD[(String,Int)] = sc.makeRDD(List(("k01",3),("k02",6),("k03",2),("k01",26)))
    val other:RDD[(String,Int)] = sc.parallelize(List(("k01",29)), 1)
    
    // 轉化操作
    val rddReduce:RDD[(String,Int)] = rdd.reduceByKey((x,y) => x + y)
    println("====reduceByKey===:" + rddReduce.collect().mkString(","))// (k01,29),(k03,2),(k02,6)
    val rddGroup:RDD[(String,Iterable[Int])] = rdd.groupByKey()
    println("====groupByKey===:" + rddGroup.collect().mkString(","))// (k01,CompactBuffer(3, 26)),(k03,CompactBuffer(2)),(k02,CompactBuffer(6))
    val rddKeys:RDD[String] = rdd.keys
    println("====keys=====:" + rddKeys.collect().mkString(","))// k01,k02,k03,k01
    val rddVals:RDD[Int] = rdd.values
    println("======values===:" + rddVals.collect().mkString(","))// 3,6,2,26
    val rddSortAsc:RDD[(String,Int)] = rdd.sortByKey(true, 1)
    val rddSortDes:RDD[(String,Int)] = rdd.sortByKey(false, 1)
    println("====rddSortAsc=====:" + rddSortAsc.collect().mkString(","))// (k01,3),(k01,26),(k02,6),(k03,2)
    println("======rddSortDes=====:" + rddSortDes.collect().mkString(","))// (k03,2),(k02,6),(k01,3),(k01,26)
    val rddFmVal:RDD[(String,Int)] = rdd.flatMapValues { x => List(x + 10) }
    println("====flatMapValues===:" + rddFmVal.collect().mkString(","))// (k01,13),(k02,16),(k03,12),(k01,36)
    val rddMapVal:RDD[(String,Int)] = rdd.mapValues { x => x + 10 }
    println("====mapValues====:" + rddMapVal.collect().mkString(","))// (k01,13),(k02,16),(k03,12),(k01,36)
    val rddCombine:RDD[(String,(Int,Int))] = rdd.combineByKey(x => (x,1), (param:(Int,Int),x) => (param._1 + x,param._2 + 1), (p1:(Int,Int),p2:(Int,Int)) => (p1._1 + p2._1,p1._2 + p2._2))
    println("====combineByKey====:" + rddCombine.collect().mkString(","))//(k01,(29,2)),(k03,(2,1)),(k02,(6,1))
    val rddSubtract:RDD[(String,Int)] = rdd.subtractByKey(other);
    println("====subtractByKey====:" + rddSubtract.collect().mkString(","))// (k03,2),(k02,6)
    val rddJoin:RDD[(String,(Int,Int))] = rdd.join(other)
    println("=====rddJoin====:" + rddJoin.collect().mkString(","))// (k01,(3,29)),(k01,(26,29))
    val rddRight:RDD[(String,(Option[Int],Int))] = rdd.rightOuterJoin(other)
    println("====rightOuterJoin=====:" + rddRight.collect().mkString(","))// (k01,(Some(3),29)),(k01,(Some(26),29))
    val rddLeft:RDD[(String,(Int,Option[Int]))] = rdd.leftOuterJoin(other)
    println("=====rddLeft=====:" + rddLeft.collect().mkString(","))// (k01,(3,Some(29))),(k01,(26,Some(29))),(k03,(2,None)),(k02,(6,None))
    val rddCogroup: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd.cogroup(other)
    println("=====cogroup=====:" + rddCogroup.collect().mkString(","))// (k01,(CompactBuffer(3, 26),CompactBuffer(29))),(k03,(CompactBuffer(2),CompactBuffer())),(k02,(CompactBuffer(6),CompactBuffer()))
    
    // 行動操作
    val resCountByKey = rdd.countByKey()
    println("=====countByKey=====:" + resCountByKey)// Map(k01 -> 2, k03 -> 1, k02 -> 1)
    val resColMap = rdd.collectAsMap()
    println("=====resColMap=====:" + resColMap)//Map(k02 -> 6, k01 -> 26, k03 -> 2)
    val resLookup = rdd.lookup("k01")
    println("====lookup===:" + resLookup) // WrappedArray(3, 26)
  }
  
  /**
   * 其他一些不常用的RDD操作
   */
  def otherRDDOperate(){
    val rdd:RDD[(String,Int)] = sc.makeRDD(List(("k01",3),("k02",6),("k03",2),("k01",26)))
    
    println("=====first=====:" + rdd.first())//(k01,3)
    val resTop = rdd.top(2).map(x => x._1 + ";" + x._2)
    println("=====top=====:" + resTop.mkString(","))// k03;2,k02;6
    val resTake = rdd.take(2).map(x => x._1 + ";" + x._2)
    println("=======take====:" + resTake.mkString(","))// k01;3,k02;6
    val resTakeSample = rdd.takeSample(false, 2).map(x => x._1 + ";" + x._2)
    println("=====takeSample====:" + resTakeSample.mkString(","))// k01;26,k03;2
    val resSample1 = rdd.sample(false, 0.25)
    val resSample2 = rdd.sample(false, 0.75)
    val resSample3 = rdd.sample(false, 0.5)
    println("=====sample======:" + resSample1.collect().mkString(","))// 無
    println("=====sample======:" + resSample2.collect().mkString(","))// (k01,3),(k02,6),(k01,26)
    println("=====sample======:" + resSample3.collect().mkString(","))// (k01,3),(k01,26)
  }
  
  def main(args: Array[String]): Unit = {
    createPairMap()
    pairMapRDD("file:///F:/sparkdata01.txt")
    otherRDDOperate()
  }
  
}

  本篇到此就將我知道的spark的API全部講完了,兩篇文章里的示例代碼都是經過測試的,可以直接運行,大家在閱讀代碼時候最好注意這個特點:我在寫RDD轉化代碼時候都是很明確的寫上了轉化后的RDD的數據類型,這樣做的目的就是讓讀者更加清晰的認識不同RDD轉化后的數據類型,這點在實際開發里非常重要,在實際的計算里我們經常會不同的計算算法不停的轉化RDD的數據類型,而使用scala開發spark程序時候,我發現scala和javascript很類似,我們不去指定返回值數據類型,scala編譯器也會自動推算結果的數據類型,因此編碼時候我們可以不指定具體數據類型。這個特點就會讓我們在實際開發里碰到種種問題,因此我在示例代碼里明確了RDD轉化后的數據類型。

  在使用Pair RDD時候,我們要引入:

import org.apache.spark.SparkContext._

  否則代碼就有可能報錯,說找不到對應的方法,這個引入就是scala里導入的隱世類型轉化的功能,原理和上段文字說到的內容差不多。

      開發spark程序不僅僅只可以使用scala,還可以使用python,java,不過scala使用起來更加方便,spark的API簡單清晰,這樣的編程大大降低了原先使用mapreduce編程的難度,但是如果我們要深入掌握這些API那么就要更加深入的學習下scala。下一篇我就根據spark里RDD的API講解一些scala的語法,通過這些語法讓我們更好的掌握Spark的API。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM