6大數據實戰系列-sparkSql實戰


sparkSql兩個最重要的類SqlContext、DataFrame,DataFrame功能強大,能夠與rdd互轉換、支持sql操作如sql().where.order.join.groupBy.limit等。 
SparkSql的查詢響應性能是hive的幾何級倍數,並且SparkSql支持多種數據源操作包括hive、hdfs、rdd、json、mysql,本文先講解hive、hdfs、rdd、json4種數據源操作。

1 基礎環境

  • 1.1 版本預覽
Cnetos 6.5    已安裝
Hadoop 2.8   已安裝集群
Hive 2.3      待安裝
Mysql 5.6     已安裝
Spark 2.1.1    已安裝
  • 1.2 機器環境
192.168.0.251 slave
192.168.0.252 master
Hadoop:hadoop已做雙機無密碼登錄
  • 1.3 工作路徑
Hadoop:/home/data/app/hadoop/hadoop-2.8.0/etc/hadoop Spark:/home/data/app/hadoop/spark-2.1.1-bin-hadoop2.7 Hive數據路徑: /user/hive/warehouse/

2 初始化配置

  • 2.1 spark連接hive

節點Spark conf下增加hive-site.xml

<configuration> <property> <name>hive.metastore.uris</name> <value>thrift://shulaibao2:9083</value> <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description> </property> </configuration>
  • 2.2 啟動hive支持metastore
nohup hive --service metastore > metastore.log 2>&1 &
  • 2.3 spark集群重啟
./stop-all.sh ./start-all.sh

3 sparkSql - hive數據源

  • 3.1 sparkSql操作
./spark-sql --master spark://shulaibao2:7077 --executor-memory 1g
按年統計交易訂單數量、交易金額
select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber join tbDate c on a.dateid=c.dateid group by c.theyear order by c.theyear; 計算每年銷售額最大的訂單 select c.theyear,max(d.sumofamount) from tbDate c join (select a.dateid,a.ordernumber,sum(b.amount) as sumofamount from tbStock a join tbStockDetail b on a.ordernumber=b.ordernumber group by a.dateid,a.ordernumber ) d on c.dateid=d.dateid group by c.theyear sort by c.theyear;
  • 3.2 spark shell編碼
val hiveQuery = sql("select * from hive_data.tbstock limit 10") hiveQuery.collect() res14: Array[org.apache.spark.sql.Row] = Array([BYSL00000893,ZHAO,2007-8-23], [BYSL00000897,ZHAO,2007-8-24], [BYSL00000898,ZHAO,2007-8-25], [BYSL00000899,ZHAO,2007-8-26], [BYSL00000900,ZHAO,2007-8-26], [BYSL00000901,ZHAO,2007-8-27], [BYSL00000902,ZHAO,2007-8-27], [BYSL00000904,ZHAO,2007-8-28], [BYSL00000905,ZHAO,2007-8-28], [BYSL00000906,ZHAO,2007-8-28])

4 sparkSql - RDD數據源

  • 4.1 hdfs數據源
import spark.implicits._ case class Person(name: String, age: Int) val peopleDF = spark.sparkContext.textFile("hdfs://shulaibao2:9010/home/hadoop/upload/test/people.txt").map(_.split(",")).map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF() peopleDF.createOrReplaceTempView("people") : registerTempTable - deprecation val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 24 AND 40") teenagersDF.map(teenager => "Name: " + teenager(0)).show() teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
  • 4.2 RDD數據源
import spark.implicits._ case class Person(name:String, age:Int, state:String) sc.parallelize(Person("Michael",29,"CA")::Person("Andy",30,"NY")::Person("Justin",19,"CA")::Person("Justin",25,"CA")::Nil).toDF().registerTempTable("people") val query= sql("select * from people") : @return dataFrame 查詢的schem query.printSchema query.collect() : @return Array[org.apache.spark.sql.Row] 查看整個運行計划: query.queryExecution

5 json 數據源

hadoop fs -put /data/software/sougou/jsonPerson.json /home/hadoop/upload/test/ spark.sqlContext.jsonFile("/home/hadoop/upload/test/jsonPerson.json").registerTempTable("jsonPerson") val jsonQuery = sql("select * from jsonPerson") 查看結構: jsonQuery.printSchema 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM