一:SparkSQL支持的外部數據源
1.支持情況
2.External LIbraries
不是內嵌的,看起來不支持。
但是現在已經有很多開源插件,可以進行支持。
3.參考材料
· 支持的格式:https://github.com/databricks
二:准備
1.啟動服務
RunJar是metastore服務,在hive那邊開啟。
只需要啟動三個服務就可以了,以后runjar都要啟動,因為這里使用hive與spark集成了,不啟動這個服務,就會總是報錯。
2.啟動spark-shell
三:測試檢驗程序
1.DataFrame的構成
2.結果
3.測試
4.結果
四:DataFrame的創建
1.創建SQLContext
val sqlContext=new SQLContext(sc)
2.創建DataFrame(兩種方式)
val df=sqlContext.#
val df=sqlContext.read.#
3.DataFrame數據轉換
val ndf=df.#.#
4.結果保存
ndf.#
ndf.write.#
五:DataFrame的保存
1.第一種方式
將DataFrame轉換為RDD,RDD數據保存
2.第二種方式
直接通過DataFrame的write屬性將數據寫出。
但是有限制,必須有定義類實現,默認情況:SparkSQL只支持parquet,json,jdbc
六:兩個常用的網站(數據源問題)
1.金磚公司提供的一些插件
2.package網址
https://spark-packages.org/
七:DataFrameReader編程模式
功能: 通過SQLContext提供的reader讀取器讀取外部數據源的數據,並形成DataFrame
1.源碼的主要方法
format:給定數據源數據格式類型,eg: json、parquet
schema:給定讀入數據的數據schema,可以不給定,不給定的情況下,進行數據類型推斷
option:添加參數,這些參數在數據解析的時候可能會用到
load:
有參數的指從參數給定的path路徑中加載數據,比如:JSON、Parquet...
無參數的指直接加載數據(根據option相關的參數)
jdbc:讀取關系型數據庫的數據
json:讀取json格式數據
parquet:讀取parquet格式數據
orc: 讀取orc格式數據
table:直接讀取關聯的Hive數據庫中的對應表數據
八:Reader的程序測試
1.新建文件夾
2.上傳數據
3.加載json數據
val df=sqlContext.read.format("json").load("spark/sql/people.json")
結果:
4.數據展示
df.show()
結果:
5.數據注冊成臨時表並操作展示
結果:
6.和上面的方法等效的方式
sqlContext.sql("select * from json.`spark/sql/people.json`").show()
結果:
7.讀取顯示parquet格式的數據
sqlContext.read.format("parquet").load("spark/sql/users.parquet").show()
結果:
8.加載mysql中的數據
這個是服務器上的mysql。
sqlContext.read.jdbc("jdbc:mysql://linux-hadoop01.ibeifeng.com:3306/mysql?user=root&password=123456", "user", new java.util.Properties()).show()
這個地方比較特殊。
第一次使用bin/spark-shell進入后,使用命令,效果如下:
然后使用這種方式進行啟動,加上jar
bin/spark-shell --jars /opt/softwares/mysql-connector-java-5.1.27-bin.jar --driver-class-path /opt/softwares/mysql-connector-java-5.1.27-bin.jar
九:DataFrameWriter編程模式
功能:將DataFrame的數據寫出到外部數據源
1.源碼主要方法
mode: 給定數據輸出的模式
`overwrite`: overwrite the existing data.
`append`: append the data.
`ignore`: ignore the operation (i.e. no-op).
`error`: default option, throw an exception at runtime.
format:給定輸出文件所屬類型, eg: parquet、json
option: 給定參數
partitionBy:給定分區字段(要求輸出的文件類型支持數據分區)
save: 觸發數據保存操作 --> 當該API被調用后,數據已經寫出到具體的數據保存位置了
jdbc:將數據輸出到關系型數據庫
當mode為append的時候,數據追加方式是:
先將表中的所有索引刪除
再追加數據
沒法實現,數據不存在就添加,存在就更新的需求
十:writer的程序測試
1.讀取hive數據,形成DateFrame
2.結果保存為json格式
自動創建存儲目錄。
效果:
3.不再詳細粘貼結果了
1 讀取Hive表數據形成DataFrame 2 val df = sqlContext.read.table("common.emp") 3 4 結果保存json格式 5 df.select("empno","ename").write.mode("ignore").format("json").save("/beifeng/result/json") 6 df.select("empno","ename").write.mode("error").format("json").save("/beifeng/result/json") 7 df.select("empno","ename", "sal").write.mode("overwrite").format("json").save("/beifeng/result/json") 8 df.select("empno","ename").write.mode("append").format("json").save("/beifeng/result/json")\ 9 上面雖然在追加的時候加上了sal,但是解析沒有問題 10 sqlContext.read.format("json").load("/beifeng/result/json").show() 11 12 結果保存parquet格式 13 df.select("empno", "ename", "deptno").write.format("parquet").save("/beifeng/result/parquet01") 14 df.select("empno", "ename","sal", "deptno").write.mode("append").format("parquet").save("/beifeng/result/parquet01") ## 加上sal導致解析失敗,讀取數據的時候 15 16 sqlContext.read.format("parquet").load("/beifeng/result/parquet01").show(100) 17 sqlContext.read.format("parquet").load("/beifeng/result/parquet01/part*").show(100) 18 19 partitionBy按照給定的字段進行分區 20 df.select("empno", "ename", "deptno").write.format("parquet").partitionBy("deptno").save("/beifeng/result/parquet02") 21 sqlContext.read.format("parquet").load("/beifeng/result/parquet02").show(100)