spark中的scalaAPI之RDDAPI常用操作


package com.XXX
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
//spark中的RDD測試
object RddTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("rdd api test")
    val sc = SparkContext.getOrCreate(conf)
//    mapTest(sc)
//    distinctTest(sc)
//    filterTest(sc)
//    keyByTest(sc)
//    sortByTest(sc)
//    topNTest(sc)
//    repartitionTest(sc)
//    groupByTest(sc)
    aggSumTest(sc)
    sc.stop()
  }

  def mapTest(sc:SparkContext) = {
    val file = sc.textFile("file:///G:\\bd14\\user-logs-large.txt",3)
    val mapResult = file.map(x =>{//map的特點是一個輸入對應一條輸出,沒有返回值,對應的返回值會是() NIL
      val info = x.split("\\t")
      (info(0),info(1))//轉換成了元組
    })
    //take是一個action,作用是取出前n條數據發送到driver,一般用於開發測試
    mapResult.take(10).foreach(println)

    //map和mapPartition的區別:map是一條記錄一條記錄的轉換,mapPartition是
    //一個partition(分區)轉換一次
    val mapPartitionResult = file.mapPartitions(x => {//一個分區對應一個分區
    var info = new Array[String](3)
     for(line <- x) yield{//yield:作用:有返回值,所有的記錄返回之后是一個集合
        info = line.split("\\t")
        (info(0),info(1))
      }
    })
    mapPartitionResult.take(10).foreach(println)
    // 把一行轉為多行記錄,使用flatMap展平,把一條new_tweet記錄轉成兩條login記錄
    val flatMapTest = file.flatMap(x=>{
      val info = x.split("\\t")
      info(1) match {
        case "new_tweet"=> for (i <- 1 to 2) yield s"${info(0)} login ${info(2)}"
        case _ => Array(x)
      }
    })
    flatMapTest.take(10).foreach(println)
    println(file.count())
    println(flatMapTest.count())
  }
  //distinct:排重,把重復的數據去掉,不是數據的轉換,屬於數據的聚合
  def distinctTest(sc:SparkContext) = {
    val file = sc.textFile("file:///G:\\bd14\\user-logs-large.txt",3)
    val userRdd = file.map(x=>x.split("\\t")(0)).distinct()
    userRdd.foreach(println)
  }
  //filter:過濾
  def filterTest(sc:SparkContext) = {
    val file = sc.textFile("file:///G:\\bd14\\user-logs-large.txt",3)
    val loginFilter = file.filter(x=>x.split("\\t")(1)=="login")
    loginFilter.take(10).foreach(println)
    println(loginFilter.count())
  }

  //keyBy,輸入作為value,key由算計計算而來
  def keyByTest(sc:SparkContext) = {
    val file = sc.textFile("file:///G:\\bd14\\user-logs-large.txt",3)
    val userActionType = file.keyBy(x=>{
      val info = x.split("\\t")
      s"${info(0)}--${info(1)}"
    })
    userActionType.take(10).foreach(println)
  }
  //sortBy排序
  def sortByTest(sc:SparkContext) = {
    val file = sc.textFile("file:///C:\\Users\\zuizui\\Desktop\\README.txt")
    //數據量小的話,想進行群排序,吧numPartitions設置成1
    //默認為聖墟,姜旭吧第二個參數設置為false
//    val sortBy = file.sortBy(x=>x.split("\\s+")(1).toInt,numPartitions = 1)//后面有不同數量的空格時,使用\\s+來split
    val sortBy = file.sortBy(x=>x.split("\\s+")(1).toInt,false,numPartitions = 1)//后面有不同數量的空格時,使用\\s+來split
    sortBy.foreach(println)
  }

  def topNTest(sc:SparkContext) = {
    val list = List(1,23,34,54,56,100)//把集合轉化為RDD使用parallelize,或者mkRDD
    val rdd = sc.parallelize(list,2)
//添加飲食准換,使takeOrdered,和top的排序順序變反
    implicit  val tonordered = new Ordering[Int]{
      override def compare(x: Int, y: Int): Int = y.compareTo(x)
    }
    val takeOrdered = rdd.takeOrdered(3)//從小到大取出前三條
    takeOrdered.foreach(println)
    val topN = rdd.top(3)//從大到小取出前三條
    topN.foreach(println)
  }
  //重新分區
  def repartitionTest(sc:SparkContext) = {
    val file = sc.textFile("file:///G:\\bd14\\user-logs-large.txt")
    val result  = file.repartition(5)//repartition是寬依賴,所謂寬依賴就是
    //原來RDD的每一個分區中的數據都會分別吧部分數據寫入到新的RDD的每個分區中
    //窄依賴:就是原來RDD的分區中的一個分區數據完全寫入到新的RDD中的一個分區中
    //窄依賴減少網絡間的傳輸
    file.foreachPartition(x=>{
      var sum = 0
      x.foreach(x=>sum+=1)
      println(s"該分區的數據有${sum}")
    })

    result.foreachPartition(x=>{
      var sum = 0
      x.foreach(x=>sum+=1)
      println(s"該分區的數據有${sum}")
    })

    val coalesce = result.coalesce(3)//使用窄依賴,原來有五個分區,現在變成三個的話,
    //其中的一個不變,另外四個分區中的兩兩分別通過窄依賴添加到另外兩個新的分區中
    coalesce.foreachPartition(x=>{
      var sum = 0
      x.foreach(x=>sum+=1)
      println(s"coalesce該分區的數據有${sum}")
    })
  }

  def groupByTest(sc:SparkContext)= {
    val file = sc.textFile("file:///G:\\bd14\\user-logs-large.txt")
    val groupedBy = file.groupBy(x=>x.split("\\t")(0))
    //group by 容易發生數傾斜
    groupedBy.foreachPartition(x=>{
      println(s"groupByRDD分區,該分區共有:${x.size}條記錄")
    })
    groupedBy.foreach(x=>{
      println(s"groupByRDD的一條記錄,key為${x._1},value上集合記錄條數是:${x._2.size}")
    })
    groupedBy.foreach(x => {
      var sum = 0
      x._2.foreach(line => {
        line.split("\\t")(1) match {
          case "login" => sum += 1
          case _ =>
        }
      })
      println(s"用戶:${x._1}的登錄次數是:$sum")
    })
  }

  def aggSumTest(sc:SparkContext) = {
    val list = List(1,2,4,5)
    val rdd = sc.parallelize(list,3)
      //reduce 計算sum
    val reduceResult = rdd.reduce((v1,v2)=>v1+v2)
    //fold計算sum
    val flodResult = rdd.fold(0)((v1,v2)=>v1+v2)
    //aggregate把元素連接成一個字符串
    val aggResult = rdd.aggregate("")((c,v)=>{
      c match {
        case "" => v.toString
        case _ => s"$c,$v"
      }
    },(c1,c2)=>{
      c1 match {
        case ""=> c2
        case _=>s"$c1,$c2"
      }
    })

    println(s"reduceResult:$reduceResult")
    println(s"flodResult:$flodResult")
    println(s"aggResult:$aggResult")
  }

  def persistTest(sc:SparkContext) = {
    val file = sc.textFile("file:///G:\\bd14\\user-logs-large.txt")
//    file.cache()
    file.persist(StorageLevel.MEMORY_ONLY)//相當於cache(),智加載在內存中
    //計算用戶數量
    //計算ip數量
    //計算每個用戶在每一個ip上的數量
  }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM