1.將DataFrame數據如何寫入到Hive表中?
2.通過那個API實現創建spark臨時表?
3.如何將DataFrame數據寫入hive指定數據表的分區中?
從spark1.2 到spark1.3,spark SQL中的SchemaRDD變為了DataFrame,DataFrame相對於SchemaRDD有了較大改變,同時提供了更多好用且方便的API。
DataFrame將數據寫入hive中時,默認的是hive默認數據庫,insertInto沒有指定數據庫的參數,本文使用了下面方式將數據寫入hive表或者hive表的分區中,僅供參考。
1、將DataFrame數據寫入到Hive表中
從DataFrame類中可以看到與hive表有關的寫入Api有以下幾個:
registerTempTable(tableName: String): Unit,
insertInto(tableName: String): Unit
insertInto(tableName: String, overwrite: Boolean): Unit
saveAsTable(tableName: String, source: String, mode: SaveMode, options: Map[String, String]): Unit
有很多重載函數,不一一列舉
registerTempTable函數是創建spark臨時表
insertInto函數是向表中寫入數據,可以看出此函數不能指定數據庫和分區等信息,不可以直接進行寫入。
向hive數據倉庫寫入數據必須指定數據庫,hive數據表建立可以在hive上建立,或者使用hiveContext.sql(“create table ....")
下面語句是向指定數據庫數據表中寫入數據:
- case class Person(name:String,col1:Int,col2:String)
- val sc = new org.apache.spark.SparkContext
- val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
- import hiveContext.implicits._
- hiveContext.sql("use DataBaseName")
- val data = sc.textFile("path").map(x=>x.split("\\s+")).map(x=>Person(x(0),x(1).toInt,x(2)))
- data.toDF()insertInto("tableName")
創建一個case類將RDD中數據類型轉為case類類型,然后通過toDF轉換為DataFrame,調用insertInto函數時,首先指定數據庫,使用的是hiveContext.sql("use DataBaseName")語句,就可以將DataFrame數據寫入hive數據表中了
2、將DataFrame數據寫入hive指定數據表的分區中
hive數據表建立可以在hive上建立,或者使用hiveContext.sql(“create table ...."),使用saveAsTable時數據存儲格式有限,默認格式為parquet,可以指定為json,如果有其他格式指定,盡量使用語句來建立hive表。
將數據寫入分區表的思路是:首先將DataFrame數據寫入臨時表,之后是由hiveContext.sql語句將數據寫入hive分區表中。具體操作如下:
- case class Person(name:String,col1:Int,col2:String)
- val sc = new org.apache.spark.SparkContext
- val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
- import hiveContext.implicits._
- hiveContext.sql("use DataBaseName")
- val data = sc.textFile("path").map(x=>x.split("\\s+")).map(x=>Person(x(0),x(1).toInt,x(2)))
- data.toDF().registerTempTable("table1")
- hiveContext.sql("insert into table2 partition(date='2015-04-02') select name,col1,col2 from table1")
使用以上方式就可以將dataframe數據寫入hive分區表了