RDD的詳解、創建及其操作


RDD的詳解


RDD:彈性分布式數據集,是Spark中最基本的數據抽象,用來表示分布式集合,支持分布式操作!

RDD的創建

RDD中的數據可以來源於2個地方:本地集合或外部數據源

RDD操作

分類

轉換算子

Map

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo03Map {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
    conf.setAppName("Demo03Map").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)
    //讀取文件數據
    val linesRDD: RDD[String] = sc.textFile("spark/data/words.txt")
    //對數據進行扁平化處理
    val flatRDD: RDD[String] = linesRDD.flatMap(_.split(","))


    //按照單詞分組
    val groupRDD: RDD[(String, Iterable[String])] = flatRDD.groupBy(w => w)
    //聚合
    val wordsRDD: RDD[String] = groupRDD.map(kv => {
      val key: String = kv._1
      val words: Iterable[String] = kv._2
      key + "," + words.size
    })


    //分組+聚合
    val mapRDD1: RDD[(String, Int)] = flatRDD.map((_, 1))
    val words1: RDD[(String, Int)] = mapRDD1.reduceByKey(_ + _)

    ////分組+聚合
    val mapRDD2: RDD[(String, Int)] = flatRDD.map((_, 1))
    val words2: RDD[(String, Iterable[Int])] = mapRDD2.groupByKey()
    val wordSum: RDD[(String, Int)] = words2.mapValues(_.size)
    wordSum.foreach(println)

    //輸出
    wordsRDD.foreach(println)
    words1.foreach(println)
  }
}

flatMap(數據扁平化處理)

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo04FlatMap {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("Demo04FlatMap").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)

    val linesRDD: RDD[String] = sc.parallelize(List("java,scala,python", "map,java,scala"))
    //扁平化處理
    val flatRDD: RDD[String] = linesRDD.flatMap(_.split(","))
    flatRDD.foreach(println)
  }
}

Mappartitions

map和mapPartitions區別

1)map:每次處理一條數據
2)mapPartitions:每次處理一個分區數據

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo05MapPartition {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("Demo05MapPartition").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)
    val stuRDD: RDD[String] = sc.textFile("spark/data/words.txt",3)
    stuRDD.mapPartitions(rdd => {
      println("map partition")
      // 按分區去處理數據
      rdd.map(line => line.split(",")(1))
    }).foreach(println)
  }
}

fliter 過濾

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo06Filter {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("Demo05MapPartition").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)
    val linesRDD: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5))
    //過濾,轉換算子
    linesRDD.filter(kv => {
      kv % 2 == 1
    }).foreach(println)
  }
}

sample 取樣

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo07Sample {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("Demo05MapPartition").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)
    /**
     * sample:對數據取樣
     * withReplacement 有無放回
     * fraction 抽樣比例
     * withReplacement:表示抽出樣本后是否在放回去,true表示會放回去
     * 這也就意味着抽出的樣本可能有重復
     * fraction :抽出多少,這是一個double類型的參數,0-1之間,eg:0.3表示抽出30%
     */
    val stuRDD: RDD[String] = sc.textFile("spark/data/students.txt",3)
    stuRDD.sample(withReplacement = true,0.1).foreach(println)
  }
}

union 將相同結結構的數據連接到一起

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo08Union {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("Demo05MapPartition").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)

    /**union
     * 將兩個相同結構的數據連接在一起
     */
    val lineRDD1: RDD[String] = sc.parallelize(List("java,scala", "data,python"))
    val lineRDD2: RDD[String] = sc.parallelize(List("spark,scala", "java,python"))
    println(lineRDD1.getNumPartitions)
    val unionRDD: RDD[String] = lineRDD1.union(lineRDD2)
    println(unionRDD.getNumPartitions)
    unionRDD.foreach(println)
  }
}

mappatitionWIthindex

    //mapPartitionsWithIndex也是一個轉換算子
    // 會在處理每一個分區的時候獲得一個index
    //可以選擇的執行的分區
    stuRDD.mapPartitionsWithIndex((index, rdd) => {
      println("當前遍歷的分區:" + index)
      // 按分區去處理數據
      rdd.map(line => line.split(",")(1))
    }).foreach(println)

join 將數據按照相同key進行關聯(數據必須是(K,V))

import java.io

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo09Join {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("Demo05MapPartition").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)
    // 構建K-V格式的RDD
    val tuple2RDD1: RDD[(String, String)] = sc.parallelize(List(("001", "張三"), "002" -> "小紅", "003" -> "小明"))
    val tuple2RDD2: RDD[(String, Int)] = sc.parallelize(List(("001", 20), "002" -> 22, "003" -> 21))
    val tuple2RDD3: RDD[(String, String)] = sc.parallelize(List(("001", "男"), "002" -> "女"))
    //將文件進行join
    val joinRDD: RDD[(String, (String, Int))] = tuple2RDD1.join(tuple2RDD2)
    joinRDD.map(kv => {
      val i: String = kv._1
      val j: String = kv._2._1
      val k: Int = kv._2._2
      i + "," + j + "," + k
    }).foreach(println)

    //第二種方式
    joinRDD.map {
      case (id: String, (name: String, age: Int)) => id + "*" + name + "*" + age
    }.foreach(println)

    val leftJoinRDD: RDD[(String, (String, Option[String]))] = tuple2RDD1.leftOuterJoin(tuple2RDD3)
    leftJoinRDD.map {
          //存在關聯
      case (id: String, (name: String, Some(gender))) => 
        id + "*" + name + "*" + gender
        //不存在關聯
      case (id: String, (name: String, None)) =>
        id + "*" + name + "*" + "_"
    }
  }
}

groupByKey 將kv格式的數據進行key的聚合

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo10GroupByKey {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("Demo10GroupByKey").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)
    /**
     * groupBy 指定分組的字段進行分組
     */

    // 統計班級人數
    val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")
    linesRDD.groupBy(word => word.split(",")(4))
      .map(kv => {
        val key = kv._1
        val wordsCnt = kv._2.size
        key + "," + wordsCnt
      }).foreach(println)

    val linesMap: RDD[(String, String)] = linesRDD.map(lines => (lines.split(",")(4), lines))
    //按照key進行分組
    linesMap.groupByKey()
      .map(lines=>{
        val key = lines._1
        val wordsCnt: Int = lines._2.size
        key+","+wordsCnt
      }).foreach(println)

  }
}

ReduceByKey
reduceByKey 需要接收一個聚合函數
首先會對數據按key分組 然后在組內進行聚合(一般是加和,也可以是Max、Min之類的操作)
相當於 MR 中的combiner
可以在Map端進行預聚合,減少shuffle過程需要傳輸的數據量,以此提高效率
相對於groupByKey來說,效率更高,但功能更弱
冪等操作
y = f(x) = f(y) = f(f(x))
reducebyKey與groupbykey的區別
reduceByKey:具有預聚合操作
groupByKey:沒有預聚合
在不影響業務邏輯的前提下,優先采用reduceByKey。

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo11ReduceByKey {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("Demo11ReduceByKey").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)
    val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")
    //統計班級人數
    linesRDD.map(lines => (lines.split(",")(4), lines))
      .groupByKey()
      .map(kv => {
        val key = kv._1
        val cnt = kv._2.size
        key + "" + cnt
      }).foreach(println)


    //ReduceByKey
    /**
     * reduceByKey 需要接收一個聚合函數
     * 首先會對數據按key分組 然后在組內進行聚合(一般是加和,也可以是Max、Min之類的操作)
     * 相當於 MR 中的combiner
     * 可以在Map端進行預聚合,減少shuffle過程需要傳輸的數據量,以此提高效率
     * 相對於groupByKey來說,效率更高,但功能更弱
     * 冪等操作
     * y = f(x) = f(y) = f(f(x))
     */
    linesRDD.map(lines=>(lines.split(",")(4),1))
      .reduceByKey(_+_)
      .foreach(println)
  }
}

sort 排序,默認升序

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Demo12Sort {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("Demo12Sort").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)
    val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")

    /**
     * sortBy 轉換算子
     * 指定按什么排序 默認升序
     *
     * sortByKey 轉換算子
     * 需要作用在KV格式的RDD上,直接按key排序 默認升序
     */
    linesRDD.sortBy(lines => lines.split(",")(2), ascending = false) //按照年紀降序
      .take(10) //轉換算子打印十行
      .foreach(println)

    val mapRDD: RDD[(String, String)] = linesRDD.map(l => (l.split(",")(2), l))
    mapRDD.sortByKey(ascending = false)
      .take(10)
      .foreach(println)
  }
}

Mapvalue

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo13MapValue {
  def main(args: Array[String]): Unit = {
    /**
     * mapValues 轉換算子
     * 需要作用在K—V格式的RDD上
     * 傳入一個函數f
     * 將RDD的每一條數據的value傳給函數f,key保持不變
     * 數據規模也不會改變
     */
    val conf: SparkConf = new SparkConf().setAppName("Demo13MapValue").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)
    val linesRDD: RDD[(String, Int)] = sc.parallelize(List(("zs", 10), ("zzw", 34), ("lm", 18)))
    linesRDD.mapValues(lines=>lines*2)
      .foreach(println)
  }

行為算子

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo14Action {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("****").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)
    val linesRDD: RDD[String] = sc.textFile("spark/data/students.txt")
    linesRDD.take(10)// take 取出前n條數據 相當於limit
      .foreach(println)  //這里的foreach不是行為算子,是take里面的方法

    // count
    // 返回RDD的數據量的多少
    println(linesRDD.count())

    // collect
    // 將RDD轉換為Scala中的Array
    // 注意數據量的大小 容易OOM
    val collectRDD: Array[String] = linesRDD.collect()
    collectRDD.take(10)
      .foreach(println)

    // reduce 全局聚合
    // select sum(age) from student group by 1
    val i = linesRDD.map(lines => lines.split(",")(2).toInt)
      .reduce(_ + _)
    println(i)

    //save
    linesRDD.sample(withReplacement = false,0.2)
      .saveAsTextFile("spark/data/save")

  }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM