Spark SQL支持兩種RDDs轉換為DataFrames的方式
使用反射獲取RDD內的Schema
當已知類的Schema的時候,使用這種基於反射的方法會讓代碼更加簡潔而且效果也很好。
通過編程接口指定Schema
這種方法的好處是,在運行時才知道數據的列以及列的類型的情況下,可以動態生成Schema。
原文和作者一起討論:http://www.cnblogs.com/intsmaze/p/6613755.html
微信:intsmaze
使用反射獲取Schema(Inferring the Schema Using Reflection)
import org.apache.spark.sql.{DataFrameReader, SQLContext} import org.apache.spark.{SparkConf, SparkContext} object InferringSchema { def main(args: Array[String]) { //創建SparkConf()並設置App名稱
val conf = new SparkConf().setAppName("SQL-intsmaze") //SQLContext要依賴SparkContext
val sc = new SparkContext(conf) //創建SQLContext
val sqlContext = new SQLContext(sc) //從指定的地址創建RDD
val lineRDD = sc.textFile("hdfs://192.168.19.131:9000/person.tzt").map(_.split(",")) //創建case class //將RDD和case class關聯
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt)) //導入隱式轉換,如果不導入無法將RDD轉換成DataFrame //將RDD轉換成DataFrame
import sqlContext.implicits._ val personDF = personRDD.toDF //注冊表
personDF.registerTempTable("intsmaze") //傳入SQL
val df = sqlContext.sql("select * from intsmaze order by age desc limit 2") //將結果以JSON的方式存儲到指定位置
df.write.json("hdfs://192.168.19.131:9000/personresult") //停止Spark Context
sc.stop() } } //case class一定要放到外面
case class Person(id: Int, name: String, age: Int)

打包提交到yarn集群:
/home/hadoop/app/spark/bin/spark-submit --class InferringSchema \
--master yarn \
--deploy-mode cluster \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 2 \
--queue default \
/home/hadoop/sparksql-1.0-SNAPSHOT.jar
通過編程接口指定Schema(Programmatically Specifying the Schema)
當JavaBean不能被預先定義的時候,編程創建DataFrame分為三步:
從原來的RDD創建一個Row格式的RDD.
創建與RDD中Rows結構匹配的StructType,通過該StructType創建表示RDD的Schema.
通過SQLContext提供的createDataFrame方法創建DataFrame,方法參數為RDD的Schema.
import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.types._ import org.apache.spark.{SparkContext, SparkConf} object SpecifyingSchema { def main(args: Array[String]) { //創建SparkConf()並設置App名稱 val conf = new SparkConf().setAppName("SQL-intsmaze") //SQLContext要依賴SparkContext val sc = new SparkContext(conf) //創建SQLContext val sqlContext = new SQLContext(sc) //從指定的地址創建RDD val personRDD = sc.textFile(args(0)).map(_.split(",")) //通過StructType直接指定每個字段的schema val schema = StructType( List( StructField("id", IntegerType, true), StructField("name", StringType, true), StructField("age", IntegerType, true) ) ) //將RDD映射到rowRDD val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt)) //將schema信息應用到rowRDD上 val personDataFrame = sqlContext.createDataFrame(rowRDD, schema) //注冊表 personDataFrame.registerTempTable("intsmaze") //執行SQL val df = sqlContext.sql("select * from intsmaze order by age desc ") //將結果以JSON的方式存儲到指定位置 df.write.json(args(1)) //停止Spark Context sc.stop() } }
將程序打成jar包,上傳到spark集群,提交Spark任務
/home/hadoop/app/spark/bin/spark-submit --class SpecifyingSchema \
--master yarn \
--deploy-mode cluster \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 2 \
--queue default \
/home/hadoop/sparksql-1.0-SNAPSHOT.jar \
hdfs://192.168.19.131:9000/person.txt hdfs://192.168.19.131:9000/intsmazeresult
/home/hadoop/app/spark/bin/spark-submit --class SpecifyingSchema \
--master yarn \
--deploy-mode client \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 2 \
--queue default \
/home/hadoop/sparksql-1.0-SNAPSHOT.jar \
hdfs://192.168.19.131:9000/person.txt hdfs://192.168.19.131:9000/intsmazeresult
在maven項目的pom.xml中添加Spark SQL的依賴
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.2</version>
</dependency>
以上就是
2.sparkSQL–DataFrames與RDDs的相互轉換的全部內容了,更多內容請關注:
CPP學習網_CPP大學
本文固定鏈接: CPP學習網_CPP大學- 2.sparkSQL–DataFrames與RDDs的相互轉換
本文固定鏈接: CPP學習網_CPP大學- 2.sparkSQL–DataFrames與RDDs的相互轉換