spark創建DataFrame的幾種方式


package com.hollysys.spark

import java.util

import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext, SparkSession}

/**
  * Created by shirukai on 2018/7/17
  * 創建DataFrame的幾種方式
  */
object CreateDataFrameTest {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName(this.getClass.getSimpleName).master("local")
      .getOrCreate()

    //第一種:通過Seq生成
    val df = spark.createDataFrame(Seq(
      ("ming", 20, 15552211521L),
      ("hong", 19, 13287994007L),
      ("zhi", 21, 15552211523L)
    )) toDF("name", "age", "phone")

    df.show()

    //第二種:通過讀取Json文件生成
    val dfJson = spark.read.format("json").load("/Users/shirukai/Desktop/HollySys/Repository/sparkLearn/data/student.json")
    dfJson.show()

    //第三種:通過讀取Csv文件生成
    val dfCsv = spark.read.format("csv").option("header", true).load("/Users/shirukai/Desktop/HollySys/Repository/sparkLearn/data/students.csv")
    dfCsv.show()

    //第四種:通過Json格式的RDD生成(棄用)
    val sc = spark.sparkContext
    import spark.implicits._
    val jsonRDD = sc.makeRDD(Array(
      "{\"name\":\"ming\",\"age\":20,\"phone\":15552211521}",
      "{\"name\":\"hong\", \"age\":19,\"phone\":13287994007}",
      "{\"name\":\"zhi\", \"age\":21,\"phone\":15552211523}"
    ))

    val jsonRddDf = spark.read.json(jsonRDD)
    jsonRddDf.show()

    //第五種:通過Json格式的DataSet生成
    val jsonDataSet = spark.createDataset(Array(
      "{\"name\":\"ming\",\"age\":20,\"phone\":15552211521}",
      "{\"name\":\"hong\", \"age\":19,\"phone\":13287994007}",
      "{\"name\":\"zhi\", \"age\":21,\"phone\":15552211523}"
    ))
    val jsonDataSetDf = spark.read.json(jsonDataSet)

    jsonDataSetDf.show()

    //第六種: 通過csv格式的DataSet生成
    val scvDataSet = spark.createDataset(Array(
      "ming,20,15552211521",
      "hong,19,13287994007",
      "zhi,21,15552211523"
    ))
    spark.read.csv(scvDataSet).toDF("name","age","phone").show()

    //第七種:動態創建schema
    val schema = StructType(List(
      StructField("name", StringType, true),
      StructField("age", IntegerType, true),
      StructField("phone", LongType, true)
    ))
    val dataList = new util.ArrayList[Row]()
    dataList.add(Row("ming",20,15552211521L))
    dataList.add(Row("hong",19,13287994007L))
    dataList.add(Row("zhi",21,15552211523L))
    spark.createDataFrame(dataList,schema).show()

    //第八種:讀取數據庫(mysql)
    val options = new util.HashMap[String,String]()
    options.put("url", "jdbc:mysql://localhost:3306/spark")
    options.put("driver","com.mysql.jdbc.Driver")
    options.put("user","root")
    options.put("password","hollysys")
    options.put("dbtable","user")

    spark.read.format("jdbc").options(options).load().show()

  }
}

轉載:https://blog.csdn.net/shirukai/article/details/81085642


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM