Spark開發-SparkSQL讀寫數據


SparkSQL數據讀寫

 DataFrameReader
 DataFrameWriter
   DataFrameReader 對應的組件 SCHEMA  OPTION   FORMAT
    DataFrameReader 有兩種訪問方式, 
	      一種是使用 load 方法加載, 使用 format 指定加載格式, 
	       還有一種是使用封裝方法, 類似 csv, json, jdbc 等

        //.第一種形式 READ + FORMAT +load讀取 
           spark.read
             .format("csv")
             .option("header",value=true)
             .option("inferSchema",value = true)
             .load("dataset/1231.csv")
        
           //2.第二種形式 使用具體文件類型讀取 READ 
           spark.read
             .option("header",value=true)
             .option("inferSchema",value = true)
             .csv("dataset/1231.csv")
	DataFrameWriter 也有兩種使用方式, 一種是使用 format 配合 save, 
	   默認的 format 是 Parquet
	  還有一種是使用封裝方法, 例如 csv, json, saveAsTable 等
	    def parquet(path: String): Unit = { format("parquet").save(path)}
	    def csv    (path: String): Unit = { format("csv").save(path)}

DataFrameWriter

增量操作

使用spark做增量操作的時候, 
   insertInto 和 mode(SaveMode.Append).saveAsTable()

insertInto 
    insertInto 使用基於位置
     It requires that the schema of the `DataFrame` is the same as the schema of the table.
     Unlike `saveAsTable`, `insertInto` ignores the column names and just uses position-based resolution
    
saveAsTable 
     基於列名 column_names-based
      有不同的模式,
	  如果是  SaveMode.Overwrite
       the schema of the `DataFrame` does not need to be the same as that of the existing table.
      如果是 `Append`, 
     if there is an existing table, we will use the format and options OF  the existing table. 
	 The column order in the schema of the `DataFrame` doesn't need to be same as that of the existing table. 
	 Unlike `insertInto`,
     `saveAsTable` will use the column names to find the correct column positions
  ####覆蓋
   report_info.write.insertInto(tableName="dwd.t_dwd_report_info_d",overwrite=True)
   DataFrameWriter 可以將數據保存到 Hive 表中, 所以也可以指定分區和分桶信息
	 
對存儲的
    SCHEMA
	文件的覆蓋和追加

Spark的讀寫

  Spark can create distributed datasets from any storage source supported by Hadoop, 
        including your local file system, HDFS, Cassandra, HBase, Amazon S3, 
		        etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.
寫
  saveAsTextFile  saveAsSequenceFile  saveAsSequenceFile

存儲問題

  存儲的載體
  存儲的數據格式
  存儲所使用的命令或函數
  存數遇到的問題
    小文件問題
     spark.default.parallelism      在處理RDD時才會起作用,對SparkSql無效。
     spark.sql.shuffle.partitions   則是對sparks SQL專用的設置。

源碼

org.apache.spark.sql
  Interface used to write a [[Dataset]] to external storage systems (e.g. file systems,key-value stores, etc). 
  Use `Dataset.write` to access this.
  使用了  // Builder pattern config options 構建者模式

本地數據寫入到Hive表

在Hive中創建表,從外部數據源以txt的形式導入數據
 方案一: 利用系統ftp上傳數據,並使用系統的數據交換任務將任務寫入到Hive表中
 方案二: 利用程序 WriteToHive 將數據打包到程序中,程序中讀取數據,然后解析寫入到Hive表中

01.方案一步驟:

01.查看外部給的數據的情況,確認數據格式正確等情況
  做一些必要的查詢。修正以及確認工作
02. 創建表
03. 修改數據文件的編碼格式以及文件的類型,放到系統的位置-上傳到服務器的位置要記錄下來
04. 創建或者使用已存在的System_任務,導入數據任務配置和執行
05. 任務執行成功后,確認數據表的情況

02.具體過程

04任務配置和執行
    任務流:  數據導出專用
    任務  System_數據_ting_no_repeat_f
    需要完成的任務:
        001.源數據源配置
        	  選擇數據源: ftp
        	  基本信息:  文件路徑   /test  文件名   test_import_date_info_td.txt
        	高級配置:
        	  編碼 UTF-8
        002.目標數據源配置
        		 數據源類型:Hive
        		 數據源名稱:
        		 表名:  expbase.import_date_info_td       
        	導入配置:
    		SQL: use expbase; truncate table  import_date_info_td ;
    		Hive文件
    		  行分割符 \n  列分隔符 \t
05.任務執行成功后  -查看數據量以及示例數據情況
  select count(*) from expbase.import_date_info_td
  select * from expbase.import_date_info_td

03.注意事項

 01.數據確認工作:
  數據有無重復數據
  Excel的數據中的空格,tab的個數,引號等符號問題
  異常數據
  最終確認數據量,以及示例數據情況
 02.創建表的形式
  ROW FORMAT DELIMITED  FIELDS TERMINATED BY '\t' 
  STORED AS INPUTFORMAT 
    'org.apache.hadoop.mapred.TextInputFormat' 
  OUTPUTFORMAT 
    'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
  ;
 03.修改文件的編碼方式和格式,並上傳到Unix系統中
    編碼 UTF-8
    文件類型 Unix
 04.任務的配置
  配置工作
         Unix系統的文件 行分割符 \n
    	 數據文件的列分隔符保持一致,在這里是  \t
 05.確認工作
    確認數據量是否准確
	查看示例數據各個字段是否對應,字段的值是否正確

參考

	https://github.com/apache/spark/pull/13013
	Spark寫入hive表時saveAsTable和insertInto的區別 https://blog.csdn.net/huihuixia123/article/details/107658299
	Spark(三)-- SparkSQL擴展(數據讀寫) -- 讀寫 Parquet、Json 格式文件(二) https://blog.csdn.net/qq_18800463/article/details/101421490
	Spark SQL, DataFrames and Datasets Guide http://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html
	RDD Programming Guide http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-programming-guide


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM