一、DataFrame:有列名的RDD
首先,我們知道SparkSQL的目的是用sql語句去操作RDD,和Hive類似。SparkSQL的核心結構是DataFrame,如果我們知道RDD里面的字段,也知道里面的數據類型,就好比關系型數據庫里面的一張表。那么我們就可以寫SQL,所以其實這兒我們是不能用面向對象的思維去編程的。我們最好的方式就是把抽象成為一張表,然后去用SQL語句去操作它。
DataFrame的存儲方式:它采用的存儲是類似於數據庫的表的形式進行存儲的。一個數據表有幾部分組成:1、數據,這個數據是一行一行進行存儲的,一條記錄就是一行,2、數據表的數據字典,包括表的名稱,表的字段和字段的類型等元數據信息。那么DataFrame也是按照行進行存儲的,這個類是Row,一行一行的進行數據存儲。一般情況下處理粒度是行粒度的,不需要對其行內數據進行操作。
二、SparkSQL的程序入口:
在Spark2.0之前,是有sqlContext和hiveContext的概念的,因為這兩個概念難以區分,Spark2.0之后統一稱為SparkSession,除此之外SparkSession還封裝了SparkConf和SparkContext。
值得注意的一點是:Hive有很多依賴包,所以這些依賴包沒有包含在默認的Spark包里面。如果Hive依賴的包能在classpath找到,Spark將會自動加載它們。這些Hive依賴包必須復制到所有的工作節點上,因為它們為了能夠訪問存儲在Hive的數據,會調用Hive的序列化和反序列化(SerDes)包。Hive的配置文件hive-site.xml、core-site.xml(security配置)和hdfs-site.xml(HDFS配置)是保存在conf目錄下面。
當使用Hive時,必須初始化一個支持Hive的SparkSession,用戶即使沒有部署一個Hive的環境仍然可以使用Hive。當沒有配置hive-site.xml時,Spark會自動在當前應用目錄創建metastore_db和創建由spark.sql.warehouse.dir配置的目錄,如果沒有配置,默認是當前應用目錄下的spark-warehouse目錄。
注意:從Spark 2.0.0版本開始,hive-site.xml里面的hive.metastore.warehouse.dir屬性已經被spark.sql.warehouse.dir替代,用於指定warehouse的默認數據路徑(必須有寫權限)。
於是SparkSQL在與Hive有交互的情況下,需要指定支持Hive:
val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}") val spark = SparkSession.builder().config(conf).config("spark.sql.warehouse.dir", "hdfs://hadoop1:9000/user/hive/warehouse").enableHiveSupport().getOrCreate()
回到正題,程序入口:
1.6版本:
val conf=new SparkConf() conf.setAppName(s"${this.getClass.getSimpleName}").setMaster("local") val sc=new SparkContext(conf) val sqlContext = new SQLContext(sc)
2.0版本:
SparkSQL的程序入口縮減為一句
val sparkSession=SparkSession.builder().appName(s"${this.getClass.getSimpleName}").master("local").getOrCreate()
兩個版本一個獲得sqlContext(或者hiveContext),一個獲得sparkSession。
三、算了,還是放在一起寫吧。。
case class Person(var name:String,var age:Int) object Test { def main(args: Array[String]): Unit = { //1.6版本入口
val conf=new SparkConf() conf.setAppName(s"${this.getClass.getSimpleName}").setMaster("local") val sc=new SparkContext(conf) val sqlContext = new SQLContext(sc) //第一種創建DataFrame的方式:直接讀取列式存儲的格式,可以直接形成DataFrame(后續怎么操作呢?)
val df: DataFrame = sqlContext.read.json("")
//第二種創建DataFrame的方式:因為rdd沒有toDF()方法,需要進行隱式轉化,通過map后形成一個數組
import sqlContext.implicits._ val df: DataFrame = sc.textFile("C:\\Users\\wangyongxiang\\Desktop\\plan\\person.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
//第二種方法的另一種形態,用sqlContext或者sparkSession的createDataFrame(),其實和toDF()方法是雷同的
val rdd: RDD[Person] = sc.textFile("C:\\Users\\wangyongxiang\\Desktop\\plan\\person.txt") .map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)) val df: DataFrame = sqlContext.createDataFrame(rdd)
//第三種創建DataFrame:生成一個RowRDD,然后給出構造的描述
val rdd=sc.textFile("C:\\Users\\wangyongxiang\\Desktop\\plan\\person.txt") val rowRDD: RDD[Row] = rdd.map(_.split(",")).map(p=>Row(p(0),p(1).trim.toInt)) val schame=StructType( StructField("name",StringType,true):: StructField("age",IntegerType,true)::Nil ) val df: DataFrame = sqlContext.createDataFrame(rowRDD,schame)
//后續代碼,可以創建臨時視圖作為查詢,與mysql互操作要創建臨時視圖才能做查詢
//用hiveContext則直接在hive中創建表,然后將數據load到hive表中,可以直接進行條件查詢,無需創建臨時視圖,后面與hive集成會有說明
df.registerTempTable("person") sqlContext.sql("select * from person where age>21").show() //將處理后的數據用jdbc保存到mysql數據庫中成為一張表,注意這里要使用user而不能使用username,因為系統也有一個username,會覆蓋你的用戶名
val properties=new Properties() properties.put("user","root") properties.put("password","root") df.write.mode(SaveMode.Overwrite)jdbc("jdbc:mysql://localhost:3306/test","test",properties) } }
四、load和save操作。
object saveAndLoadTest { def main(args: Array[String]): Unit = { val conf =new SparkConf().setAppName("").setMaster("local") val sc=new SparkContext(conf) val sqlContext=new SQLContext(sc) //read,load:讀取
sqlContext.read.json("") // sqlContext.read.jdbc("url","table",properties)
sqlContext.read.load("parquet路徑") sqlContext.read.format("json").load("路徑") val df: DataFrame = sqlContext.read.format("parquet").load("路徑") //write,save保存
df.write.parquet("路徑.parquet") df.write.json("路徑.json") // df.write.jdbc("url","table",properties)
df.write.format("parquet").save("路徑.parquet") df.write.format(("json")).save("路徑.json") //保存模式可選擇覆蓋,追加等
df.write.mode(SaveMode.Overwrite).save("") } }
個人理解是read和load都是讀取的作用,write和save都是保存的作用,通過上述的代碼,我們可以完成文件格式轉換的工作,將效率低的一些格式轉化成parquet這種sparksql原生支持的文件類型
