040 DataFrame中的write與read編程


一:SparkSQL支持的外部數據源

1.支持情況

  

 

2.External LIbraries

  不是內嵌的,看起來不支持。

  但是現在已經有很多開源插件,可以進行支持。

 

3.參考材料

·  支持的格式:https://github.com/databricks

 

二:准備

1.啟動服務

  RunJar是metastore服務,在hive那邊開啟。

  只需要啟動三個服務就可以了,以后runjar都要啟動,因為這里使用hive與spark集成了,不啟動這個服務,就會總是報錯。

  

 

2.啟動spark-shell

  

 

三:測試檢驗程序

1.DataFrame的構成

  

 

2.結果

  

 

3.測試

   

 

4.結果

  

 

四:DataFrame的創建

1.創建SQLContext

  val sqlContext=new SQLContext(sc)

2.創建DataFrame(兩種方式)

  val df=sqlContext.#

  val df=sqlContext.read.#

3.DataFrame數據轉換

  val ndf=df.#.#

4.結果保存

  ndf.#

  ndf.write.#

 

五:DataFrame的保存

1.第一種方式

  將DataFrame轉換為RDD,RDD數據保存

 

2.第二種方式

  直接通過DataFrame的write屬性將數據寫出。

  但是有限制,必須有定義類實現,默認情況:SparkSQL只支持parquet,json,jdbc

 

六:兩個常用的網站(數據源問題)

1.金磚公司提供的一些插件

  

 

2.package網址

  https://spark-packages.org/

  

 

七:DataFrameReader編程模式

功能: 通過SQLContext提供的reader讀取器讀取外部數據源的數據,並形成DataFrame

1.源碼的主要方法

  format:給定數據源數據格式類型,eg: json、parquet
  schema:給定讀入數據的數據schema,可以不給定,不給定的情況下,進行數據類型推斷
  option:添加參數,這些參數在數據解析的時候可能會用到
  load:
    有參數的指從參數給定的path路徑中加載數據,比如:JSON、Parquet...
    無參數的指直接加載數據(根據option相關的參數)
  jdbc:讀取關系型數據庫的數據
  json:讀取json格式數據
  parquet:讀取parquet格式數據
  orc: 讀取orc格式數據
  table:直接讀取關聯的Hive數據庫中的對應表數據

 

八:Reader的程序測試

1.新建文件夾

  

 

2.上傳數據

  

 

3.加載json數據

  val df=sqlContext.read.format("json").load("spark/sql/people.json")

  結果:

  

 

4.數據展示

  df.show()

  結果:

  

 

5.數據注冊成臨時表並操作展示

  

  結果:

  

 

6.和上面的方法等效的方式

  sqlContext.sql("select * from json.`spark/sql/people.json`").show()

  結果:

  

 

7.讀取顯示parquet格式的數據

  sqlContext.read.format("parquet").load("spark/sql/users.parquet").show()

  結果:

  

 

8.加載mysql中的數據

  這個是服務器上的mysql。

  sqlContext.read.jdbc("jdbc:mysql://linux-hadoop01.ibeifeng.com:3306/mysql?user=root&password=123456", "user", new java.util.Properties()).show()

  這個地方比較特殊。

  第一次使用bin/spark-shell進入后,使用命令,效果如下:

  

  然后使用這種方式進行啟動,加上jar

   bin/spark-shell --jars /opt/softwares/mysql-connector-java-5.1.27-bin.jar --driver-class-path /opt/softwares/mysql-connector-java-5.1.27-bin.jar

  

 

九:DataFrameWriter編程模式

功能:將DataFrame的數據寫出到外部數據源

 

1.源碼主要方法

mode: 給定數據輸出的模式
  `overwrite`: overwrite the existing data.
  `append`: append the data.
  `ignore`: ignore the operation (i.e. no-op).
  `error`: default option, throw an exception at runtime.
format:給定輸出文件所屬類型, eg: parquet、json
option: 給定參數
partitionBy:給定分區字段(要求輸出的文件類型支持數據分區)
save: 觸發數據保存操作 --> 當該API被調用后,數據已經寫出到具體的數據保存位置了
jdbc:將數據輸出到關系型數據庫
  當mode為append的時候,數據追加方式是:
    先將表中的所有索引刪除
    再追加數據

  沒法實現,數據不存在就添加,存在就更新的需求

 

十:writer的程序測試

 1.讀取hive數據,形成DateFrame

  

 

2.結果保存為json格式

  自動創建存儲目錄。

  

  效果:

  

 

3.不再詳細粘貼結果了

 1 讀取Hive表數據形成DataFrame
 2 val df = sqlContext.read.table("common.emp")
 3 
 4 結果保存json格式
 5 df.select("empno","ename").write.mode("ignore").format("json").save("/beifeng/result/json")
 6 df.select("empno","ename").write.mode("error").format("json").save("/beifeng/result/json")
 7 df.select("empno","ename", "sal").write.mode("overwrite").format("json").save("/beifeng/result/json")
 8 df.select("empno","ename").write.mode("append").format("json").save("/beifeng/result/json")\
 9 上面雖然在追加的時候加上了sal,但是解析沒有問題
10 sqlContext.read.format("json").load("/beifeng/result/json").show()
11 
12 結果保存parquet格式
13 df.select("empno", "ename", "deptno").write.format("parquet").save("/beifeng/result/parquet01")
14 df.select("empno", "ename","sal", "deptno").write.mode("append").format("parquet").save("/beifeng/result/parquet01") ## 加上sal導致解析失敗,讀取數據的時候
15 
16 sqlContext.read.format("parquet").load("/beifeng/result/parquet01").show(100)
17 sqlContext.read.format("parquet").load("/beifeng/result/parquet01/part*").show(100)
18 
19 partitionBy按照給定的字段進行分區
20 df.select("empno", "ename", "deptno").write.format("parquet").partitionBy("deptno").save("/beifeng/result/parquet02")
21 sqlContext.read.format("parquet").load("/beifeng/result/parquet02").show(100)

 

 

 

 

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM