Spark SQL 1.3
參考官方文檔:Spark SQL and DataFrame Guide
概覽介紹參考:平易近人、兼容並蓄——Spark SQL 1.3.0概覽
DataFrame提供了一條聯結所有主流數據源並自動轉化為可並行處理格式的渠道,通過它Spark能取悅大數據生態鏈上的所有玩家,無論是善用R的數據科學家,慣用SQL的商業分析師,還是在意效率和實時性的統計工程師。
以一個常見的場景 -- 日志解析為例,有時我們需要用到一些額外的結構化數據(比如做IP和地址的映射),通常這樣的數據會存在MySQL,而訪問的方式有兩種:一是每個worker遠程去檢索數據庫,弊端是耗費額外的網絡I/O資源;二是使用JdbcRDD的API轉化為RDD格式,然后編寫繁復的函數去實現檢索,顯然要寫更多的代碼。而現在Spark一行代碼就能實現從MySQL到DataFrame的轉化,並且支持SQL查詢。
在上一篇已經對文本格式進行測試,現在對hive hbase mysql oracle 以及臨時表之間join查詢做測試

1.訪問mysql
除了JSON之外,DataFrame現在已經能支持MySQL、Hive、HDFS、PostgreSQL等外部數據源,而對關系數據庫的讀取,是通過jdbc實現的。
bin/spark-shell --driver-class-path ./lib/mysql-connector-java-5.1.24-bin.jar
val sc = new org.apache.spark.SparkContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val jdbcDF = sqlContext.load("jdbc", Map("url" -> "jdbc:mysql://192.168.0.110:3306/hidata?user=root&password=123456", "dbtable" -> "loadinfo"))
bin/spark-sql --driver-class-path ./lib/mysql-connector-java-5.1.24-bin.jar
spark-sql> create temporary table jdbcmysql using org.apache.spark.sql.jdbc options(url "jdbc:mysql://192.168.0.110:3306/hidata?user=root&password=123456",dbtable "loadinfo")
spark-sql>select * from jdbcmysql;
//注意src是hive本來就存在的表,在spark sql中不用建立臨時表,直接可以進行操作
//實現hive和mysql中表的聯合查詢
select * from src join jdbcmysql on (src.key=jdbcmysql.id);
2.訪問Oracle
同理,但注意連接的URL不一樣,也是試了好久
bin/spark-shell --driver-class-path ./lib/ojdbc6.jar
val jdbcDF = sqlContext.load("jdbc", Map("url" -> "jdbc:oracle:thin:kang/123456@192.168.0.110:1521:orcl", "dbtable" -> "TEST"))
Spark十八般武藝又可以派上用場了。
錯誤的URL:
val jdbcDF = sqlContext.load("jdbc", Map("url" -> "jdbc:oracle:thin:@192.168.0.110:1521:orcl&user=kang&password=123456", "dbtable" -> "TEST"))
val jdbcDF = sqlContext.load("jdbc", Map("url" -> "jdbc:oracle:thin:@192.168.0.110:1521/orcl&user=kang&password=123456", "dbtable" -> "TEST"))
報錯類型:看起來最像的解決辦法,留着以后用
java.sql.SQLException: Io : NL Exception was generated錯誤解決(jdbc數據源問題)
解決Oracle ORA-12505, TNS:listener does not currently know of SID given in connect
第一種方式,會告知無法識別SID,其實在連接時將orcl&user=kang&password=123456都當做其SID,其實就接近了。一般平時用jdbc連接數據庫,url user password都分開,學習一下這種方式^^
Oracle的JDBC url三種方式:這
1.普通SID方式 jdbc:oracle:thin:username/password@x.x.x.1:1521:SID 2.普通ServerName方式 jdbc:oracle:thin:username/password@//x.x.x.1:1522/ABCD 3.RAC方式 jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=x.x.x.1)(PORT=1521))(ADDRESS=(PROTOCOL=TCP)(HOST=x.x.x.2)(PORT=1521)))(LOAD_BALANCE=yes)(CONNECT_DATA=(SERVER=DEDICATED)(SERVICE_NAME=xxrac)))
具體參看這里
3.訪問hive
hive和spark sql的關系,參見
其實spark sql從一開始就支持hive。Spark提供了一個HiveContext的上下文,其實是SQLContext的一個子類,但從作用上來說,sqlContext也支持Hive數據源。只要在部署Spark的時候加入Hive選項,並把已有的hive-site.xml文件挪到$SPARK_HOME/conf路徑下,我們就可以直接用Spark查詢包含已有元數據的Hive表了。
1.Spark-sql方式
spark-sql是Spark bin目錄下的一個可執行腳本,它的目的是通過這個腳本執行Hive的命令,即原來通過
hive>輸入的指令可以通過spark-sql>輸入的指令來完成。
spark-sql可以使用內置的Hive metadata-store,也可以使用已經獨立安裝的Hive的metadata store
配置步驟:
1. 將Hive的conf目錄的hive-site.xml拷貝到Spark的conf目錄
2. 將hive-site.xml中關於時間的配置的時間單位,比如ms,s全部刪除掉
錯誤信息:Exception in thread "main" java.lang.RuntimeException: java.lang.NumberFormatException: For input string: "5s" 一直以為是輸入格式的問題。。
3. 將mysql jdbc的驅動添加到Spark的Classpath上
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/home/hadoop/software/spark-1.2.0-bin-hadoop2.4/lib/mysql-connector-java-5.1.34.jar
[hadoop@hadoop bin]$ ./spark-sql Spark assembly has been built with Hive, including Datanucleus jars on classpath SET spark.sql.hive.version=0.13.1
提示編譯的時候要帶2個參數
重新編譯:./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -DskipTests -Dhadoop.version=2.4.1 -Phive -Phive-thriftserver
在Spark-default中已經指定
創建表
spark-sql> create table word6 (id int,word string) row format delimited fields terminated by ',' stored as textfile ; OK Time taken: 10.852 seconds
導入數據
spark-sql> load data local inpath '/home/hadoop/word.txt' into table word6; Copying data from file:/home/hadoop/word.txt Copying file: file:/home/hadoop/word.txt Loading data to table default.word6 Table default.word6 stats: [numFiles=1, numRows=0, totalSize=31, rawDataSize=0] OK Time taken: 2.307 seconds
與其他數據源聯合查詢
select * from src join jdbcmysql on (src.key=jdbcmysql.id);
2.Spark-shell方式
sqlContext.sql("select count(*) from hive_people").show()
4.將dataframe數據寫入Hive分區表
DataFrame將數據寫入hive中時,默認的是hive默認數據庫,insertInto沒有指定數據庫的參數,使用下面方式將數據寫入hive表或者hive表的分區中。這
1、將DataFrame數據寫入到Hive表中
從DataFrame類中可以看到與hive表有關的寫入Api有以下幾個:
registerTempTable(tableName: String): Unit, insertInto(tableName: String): Unit insertInto(tableName: String, overwrite: Boolean): Unit saveAsTable(tableName: String, source: String, mode: [size=13.3333320617676px]SaveMode, options: Map[String, String]): Unit
還有很多重載函數,不一一列舉
registerTempTable函數是創建spark臨時表
insertInto函數是向表中寫入數據,可以看出此函數不能指定數據庫和分區等信息,不可以直接進行寫入。
向hive數據倉庫寫入數據必須指定數據庫,hive數據表建立可以在hive上建立,或者使用hiveContext.sql(“create table ....")
下面語句是向指定數據庫數據表中寫入數據:
case class Person(name:String,col1:Int,col2:String)
val sc = new org.apache.spark.SparkContext
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
import hiveContext.implicits._
hiveContext.sql("use DataBaseName")
val data=sc.textFile("path").map(x=>x.split("\\s+")).map(x=>Person(x(0),x(1).toInt,x(2)))
data.toDF()
insertInto("tableName")
創建一個case類將RDD中數據類型轉為case類型,然后通過toDF轉換為DataFrame,調用insertInto函數時,首先指定數據庫,使用的是hiveContext.sql("use DataBaseName")語句,就可以將DataFrame數據寫入hive數據表中了
2、將DataFrame數據寫入hive指定數據表的分區中
hive數據表建立可以在hive上建立,或者使用hiveContext.sql(“create table ...."),使用saveAsTable時數據存儲格式有限,默認格式為parquet,可以指定為json,如果有其他格式指定,盡量使用語句來建立hive表。
將數據寫入分區表的思路是:首先將DataFrame數據寫入臨時表,之后是由hiveContext.sql語句將數據寫入hive分區表中。具體操作如下:
case class Person(name:String,col1:Int,col2:String)
val sc = new org.apache.spark.SparkContext
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
import hiveContext.implicits._
hiveContext.sql("use DataBaseName")
val data = sc.textFile("path").map(x=>x.split("\\s+")).map(x=>Person(x(0),x(1).toInt,x(2)))
data.toDF().registerTempTable("table1")
hiveContext.sql("insert into table2 partition(date='2015-04-02') select name,col1,col2 from table1")
使用以上方式就可以將dataframe數據寫入hive分區表了。
