一、RDD轉DataFrame
方法一:通過 case class 創建 DataFrames
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext object TestDataFrame { def main(args: Array[String]): Unit = { /** * 1、初始化 spark config */ val conf = new SparkConf().setAppName("TestDataFrame").setMaster("local"); /** * 2、初始化spark context */ val sc = new SparkContext(conf); /** * 3、初始化spark sql context */ val ssc = new SQLContext(sc); /** * 4、做spark sql 的df獲取工作 */ val PeopleRDD = sc.textFile("F:\\input.txt").map(line => People(line.split(" ")(0),line.split(" ")(1).trim.toInt)) import ssc.implicits._ var df = PeopleRDD.toDF //將DataFrame注冊成臨時的一張表,這張表相當於臨時注冊到內存中,是邏輯上的表,不會物化到磁盤 這種方式用的比較多 df.registerTempTable("peopel") var df2 =ssc.sql("select * from peopel where age > 23")show() /** * 5、spark context 結束工作 */ sc.stop(); } } case class People(var name:String ,var age : Int)
方法二:通過 structType創建 DataFrames
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.sql.DataFrame import org.apache.spark.sql.Row import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType} object TestDataFrame2{ def test2(): Unit = { /** * 1、初始化 spark config */ val conf = new SparkConf().setAppName("TestDataFrame").setMaster("local"); /** * 2、初始化spark context */ val sc = new SparkContext(conf); /** * 3、初始化spark sql context */ val ssc = new SQLContext(sc); /** * 4、做spark sql 的df獲取工作 */ val peopleRDD = sc.textFile("F:\\input.txt")map(line => Row(line.split(" ")(0),line.split(" ")(1).trim().toInt)) // 創建 StructType 來定義結構 val structType : StructType = StructType( StructField("name",StringType,true):: StructField("age",IntegerType,true) ::Nil ); val df : DataFrame = ssc.createDataFrame(peopleRDD, structType); df.registerTempTable("peopel"); ssc.sql("select * from peopel").show(); /** * 5、spark context 結束工作 */ sc.stop(); } }
方法三:通過json創建 DataFream
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.sql.DataFrame import org.apache.spark.sql.Row import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType} import org.apache.spark.sql.DataFrame object TestDataFrame2{ def test3() : Unit={ /** * 1、初始化 spark config */ val conf = new SparkConf().setAppName("TestDataFrame").setMaster("local"); /** * 2、初始化spark context */ val sc = new SparkContext(conf); /** * 3、初始化spark sql context */ val ssc = new SQLContext(sc); /** * 4、做spark sql 的df獲取工作 */ val df :DataFrame = ssc.read.json("F:\\json.json") df.registerTempTable("people") ssc.sql("select * from people").show(); /** * 5、spark context 結束工作 */ sc.stop(); } }
二、RDD轉DataFrame