spark 讀取hive中的數據
scala> import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.HiveContext
scala> val hiveContext = new HiveContext(sc)
//hive中的feigu數據庫中表stud_info
scala> val stud_infoRDD = hiveContext.sql("select * from feigu.stud_info").rdd
scala> stud_infoRDD.take(5).foreach(line => println("code:"+line(0)+";name:"+line(1)))
code:stud_code;name:stud_name
code:2015101000;name:王進
code:2015101001;name:劉海
code:2015101002;name:張飛
code:2015101003;name:劉婷
spark載入數據到hive
兩個文件
hadoop@master:~/wujiadong$ cat spark_stud_info.txt
wujiadong,26
ji,24
sun,27
xu,25
hadoop@master:~/wujiadong$ cat spark_stud_score.txt
wujiadong,90
ji,100
sun,99
xu,99
scala代碼
scala> import org.apache.spark.sql.hive.HiveContext
scala> val hiveContext = new HiveContext(sc)
scala> hiveContext.sql("drop table if exists wujiadong.spark_stud_info")
scala> hiveContext.sql("create table if not exists wujiadong.spark_stud_info(name string,age int) row format delimited fields terminated by ','")
scala> hiveContext.sql("load data local inpath '/home/hadoop/wujiadong/spark_stud_info.txt' into table wujiadong.spark_stud_info");
scala> hiveContext.sql("drop table if exists wujiadong.spark_stud_score")
scala> hiveContext.sql("create table if not exists wujiadong.spark_stud_score(name string,score int) row format delimited fields terminated by ','")
scala> hiveContext.sql("load data local inpath '/home/hadoop/wujiadong/spark_stud_score.txt' into table wujiadong.spark_stud_score");
然后到hive中查詢是否導入成功
hive> select * from spark_stud_info;
OK
wujiadong 26
ji 24
sun 27
xu 25
Time taken: 0.178 seconds, Fetched: 4 row(s)
hive> select * from spark_stud_score;
OK
wujiadong 90
ji 100
sun 99
xu 99
Time taken: 0.212 seconds, Fetched: 4 row(s)
//將兩張表進行連接查詢大於99分的
scala> val df = hiveContext.sql("select sss.name,sss.score from wujiadong.spark_stud_info ssi join wujiadong.spark_stud_score sss on ssi.name=sss.name where sss.score > 99")
scala> df.show()
17/03/06 22:30:37 INFO FileInputFormat: Total input paths to process : 1
17/03/06 22:30:38 INFO FileInputFormat: Total input paths to process : 1
+----+-----+
|name|score|
+----+-----+
| ji| 100|
+----+-----+
//將df中數據保存到表result_stu表中
scala> hiveContext.sql("drop table if exists wujiadong.result_stud")
scala> df.saveAsTable("wujiadong.result_stu")
//然后針對表result_stu直接創建dataframe
//Hive中查看
hive> select * from result_stu;
OK
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
ji 100
Time taken: 0.252 seconds, Fetched: 1 row(s)
參考資料
http://dblab.xmu.edu.cn/blog/1086-2/
參考資料
http://blog.csdn.net/ggz631047367/article/details/50445877