Spark之 RDD转换成DataFrame的Scala实现


依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.1.3</version>
</dependency>

RDD转化成DataFrame:通过StructType指定schema

package com.zy.sparksql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

/**
  * RDD转化成DataFrame:通过StructType指定schema
  */
object StructTypeSchema {
  def main(args: Array[String]): Unit = {
    //创建sparkSession对象
    val sparkSession: SparkSession = SparkSession.builder().appName("StructTypeSchema").master("local[2]").getOrCreate()
    //获取sparkContext
    val sc: SparkContext = sparkSession.sparkContext
    //设置日志级别
    sc.setLogLevel("WARN")

    //读取文件
    val textFile: RDD[String] = sc.textFile("D:\\person.txt")
    //切分文件
    val lineArrayRDD: RDD[Array[String]] = textFile.map(_.split(","))

    //关联对象
    val rowRDD: RDD[Row] = lineArrayRDD.map(x => Row(x(0).toInt, x(1), x(2).toInt))
    //创建rdd的schema信息
    val schema: StructType = (new StructType)
      .add("id", IntegerType, true, "id")
      .add("name", StringType, false, "姓名")
      .add("age", IntegerType, true, "年龄")
    //根据rdd和schema信息创建DataFrame
    val personDF: DataFrame = sparkSession.createDataFrame(rowRDD, schema)

    //DSL操作
    personDF.show()

    //sql 操作
    //将df注册成表
    personDF.createTempView("person")

    sparkSession.sql("select * from person where id =3").show()

    sparkSession.stop()
  }
}

 

RDD转化成DataFrame:利用反射机制推断schema

package com.zy.sparksql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}


/**
  * RDD转化成DataFrame:利用反射机制推断schema
  */

//todo 定义一个样例类
case class Person(id: Int, name: String, age: Int)

object CaseClassSchema {
  def main(args: Array[String]): Unit = {
    //构建sparkSession 指定appName和master地址(本地测试local)
    val sparkSession: SparkSession = SparkSession.builder().appName("CaseClassSchema").master("local[2]").getOrCreate()
    //获取sparkContext
    val sc: SparkContext = sparkSession.sparkContext

    //设置日志输出级别
    sc.setLogLevel("WARN")

    //加载数据
    val dataRDD: RDD[String] = sc.textFile("D:\\person.txt")
    //切分数据
    val lineArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(","))
    //将rdd和person样例类关联
    val personRDD: RDD[Person] = lineArrayRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))

    //将rdd转换成dataFrame 导入隐式转换
    import sparkSession.implicits._
    val personDF: DataFrame = personRDD.toDF

    //DSL语法
    personDF.show()
    personDF.printSchema()
    personDF.select("name").show()
    personDF.filter($"age" > 30).show()

    println("---------------------------------------------")

    //sql语法
    //首先要创建临时视图
    personDF.createTempView("person")
    sparkSession.sql("select * from person where id>1").show()

    sparkSession.stop()
  }
}

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM