2.sparkSQL–DataFrames與RDDs的相互轉換


Spark SQL支持兩種RDDs轉換為DataFrames的方式
使用反射獲取RDD內的Schema
    當已知類的Schema的時候,使用這種基於反射的方法會讓代碼更加簡潔而且效果也很好。
通過編程接口指定Schema
    通過Spark SQL的接口創建RDD的Schema,這種方式會讓代碼比較冗長。
    這種方法的好處是,在運行時才知道數據的列以及列的類型的情況下,可以動態生成Schema。

原文和作者一起討論:http://www.cnblogs.com/intsmaze/p/6613755.html

微信:intsmaze

使用反射獲取Schema(Inferring the Schema Using Reflection)
import org.apache.spark.sql.{DataFrameReader, SQLContext} import org.apache.spark.{SparkConf, SparkContext} object InferringSchema { def main(args: Array[String]) { //創建SparkConf()並設置App名稱
    val conf = new SparkConf().setAppName("SQL-intsmaze") //SQLContext要依賴SparkContext
    val sc = new SparkContext(conf) //創建SQLContext
    val sqlContext = new SQLContext(sc) //指定的地址創建RDD
    val lineRDD = sc.textFile("hdfs://192.168.19.131:9000/person.tzt").map(_.split(",")) //創建case class //將RDD和case class關聯
    val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt)) //導入隱式轉換,如果不導入無法將RDD轉換成DataFrame //將RDD轉換成DataFrame
    import sqlContext.implicits._ val personDF = personRDD.toDF //注冊表
    personDF.registerTempTable("intsmaze") //傳入SQL
    val df = sqlContext.sql("select * from intsmaze order by age desc limit 2") //將結果以JSON的方式存儲到指定位置
    df.write.json("hdfs://192.168.19.131:9000/personresult") //停止Spark Context
 sc.stop() } } //case class一定要放到外面
case class Person(id: Int, name: String, age: Int)
spark shell中不需要導入sqlContext.implicits._是因為spark shell默認已經自動導入了。
打包提交到yarn集群:
/home/hadoop/app/spark/bin/spark-submit --class InferringSchema \
--master yarn \
--deploy-mode cluster \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 2 \
--queue default \
/home/hadoop/sparksql-1.0-SNAPSHOT.jar 

 

通過編程接口指定Schema(Programmatically Specifying the Schema)

當JavaBean不能被預先定義的時候,編程創建DataFrame分為三步:

從原來的RDD創建一個Row格式的RDD.

創建與RDD中Rows結構匹配的StructType,通過該StructType創建表示RDD的Schema.

通過SQLContext提供的createDataFrame方法創建DataFrame,方法參數為RDD的Schema.

 

import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types._
import org.apache.spark.{SparkContext, SparkConf}

object SpecifyingSchema {
  def main(args: Array[String]) {
    //創建SparkConf()並設置App名稱
    val conf = new SparkConf().setAppName("SQL-intsmaze")
    //SQLContext要依賴SparkContext
    val sc = new SparkContext(conf)
    //創建SQLContext
    val sqlContext = new SQLContext(sc)

    //指定的地址創建RDD
    val personRDD = sc.textFile(args(0)).map(_.split(","))

    //通過StructType直接指定每個字段的schema
    val schema = StructType(
      List(
        StructField("id", IntegerType, true),
        StructField("name", StringType, true),
        StructField("age", IntegerType, true)
      )
    )

    //將RDD映射到rowRDD
    val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))

    //將schema信息應用到rowRDD上
    val personDataFrame = sqlContext.createDataFrame(rowRDD, schema)

    //注冊表
    personDataFrame.registerTempTable("intsmaze")
    //執行SQL
    val df = sqlContext.sql("select * from intsmaze order by age desc ")
    //將結果以JSON的方式存儲到指定位置
    df.write.json(args(1))
    //停止Spark Context
    sc.stop()
  }
}
將程序打成jar包,上傳到spark集群,提交Spark任務

/home/hadoop/app/spark/bin/spark-submit --class SpecifyingSchema \
--master yarn \
--deploy-mode cluster \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 2 \
--queue default \
/home/hadoop/sparksql-1.0-SNAPSHOT.jar \
hdfs://192.168.19.131:9000/person.txt hdfs://192.168.19.131:9000/intsmazeresult

 

/home/hadoop/app/spark/bin/spark-submit --class SpecifyingSchema \
--master yarn \
--deploy-mode client \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 2 \
--queue default \
/home/hadoop/sparksql-1.0-SNAPSHOT.jar \
hdfs://192.168.19.131:9000/person.txt hdfs://192.168.19.131:9000/intsmazeresult

maven項目的pom.xml中添加Spark SQL的依賴

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.10</artifactId>
  <version>1.6.2</version>
</dependency>

 



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM