SparkSQL數據讀寫
DataFrameReader
DataFrameWriter
DataFrameReader 對應的組件 SCHEMA OPTION FORMAT
DataFrameReader 有兩種訪問方式,
一種是使用 load 方法加載, 使用 format 指定加載格式,
還有一種是使用封裝方法, 類似 csv, json, jdbc 等
//.第一種形式 READ + FORMAT +load讀取
spark.read
.format("csv")
.option("header",value=true)
.option("inferSchema",value = true)
.load("dataset/1231.csv")
//2.第二種形式 使用具體文件類型讀取 READ
spark.read
.option("header",value=true)
.option("inferSchema",value = true)
.csv("dataset/1231.csv")
DataFrameWriter 也有兩種使用方式, 一種是使用 format 配合 save,
默認的 format 是 Parquet
還有一種是使用封裝方法, 例如 csv, json, saveAsTable 等
def parquet(path: String): Unit = { format("parquet").save(path)}
def csv (path: String): Unit = { format("csv").save(path)}
DataFrameWriter
增量操作
使用spark做增量操作的時候,
insertInto 和 mode(SaveMode.Append).saveAsTable()
insertInto
insertInto 使用基於位置
It requires that the schema of the `DataFrame` is the same as the schema of the table.
Unlike `saveAsTable`, `insertInto` ignores the column names and just uses position-based resolution
saveAsTable
基於列名 column_names-based
有不同的模式,
如果是 SaveMode.Overwrite
the schema of the `DataFrame` does not need to be the same as that of the existing table.
如果是 `Append`,
if there is an existing table, we will use the format and options OF the existing table.
The column order in the schema of the `DataFrame` doesn't need to be same as that of the existing table.
Unlike `insertInto`,
`saveAsTable` will use the column names to find the correct column positions
####覆蓋
report_info.write.insertInto(tableName="dwd.t_dwd_report_info_d",overwrite=True)
DataFrameWriter 可以將數據保存到 Hive 表中, 所以也可以指定分區和分桶信息
對存儲的
SCHEMA
文件的覆蓋和追加
Spark的讀寫
Spark can create distributed datasets from any storage source supported by Hadoop,
including your local file system, HDFS, Cassandra, HBase, Amazon S3,
etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.
寫
saveAsTextFile saveAsSequenceFile saveAsSequenceFile
存儲問題
存儲的載體
存儲的數據格式
存儲所使用的命令或函數
存數遇到的問題
小文件問題
spark.default.parallelism 在處理RDD時才會起作用,對SparkSql無效。
spark.sql.shuffle.partitions 則是對sparks SQL專用的設置。
源碼
org.apache.spark.sql
Interface used to write a [[Dataset]] to external storage systems (e.g. file systems,key-value stores, etc).
Use `Dataset.write` to access this.
使用了 // Builder pattern config options 構建者模式
本地數據寫入到Hive表
在Hive中創建表,從外部數據源以txt的形式導入數據
方案一: 利用系統ftp上傳數據,並使用系統的數據交換任務將任務寫入到Hive表中
方案二: 利用程序 WriteToHive 將數據打包到程序中,程序中讀取數據,然后解析寫入到Hive表中
01.方案一步驟:
01.查看外部給的數據的情況,確認數據格式正確等情況
做一些必要的查詢。修正以及確認工作
02. 創建表
03. 修改數據文件的編碼格式以及文件的類型,放到系統的位置-上傳到服務器的位置要記錄下來
04. 創建或者使用已存在的System_任務,導入數據任務配置和執行
05. 任務執行成功后,確認數據表的情況
02.具體過程
04任務配置和執行
任務流: 數據導出專用
任務 System_數據_ting_no_repeat_f
需要完成的任務:
001.源數據源配置
選擇數據源: ftp
基本信息: 文件路徑 /test 文件名 test_import_date_info_td.txt
高級配置:
編碼 UTF-8
002.目標數據源配置
數據源類型:Hive
數據源名稱:
表名: expbase.import_date_info_td
導入配置:
SQL: use expbase; truncate table import_date_info_td ;
Hive文件
行分割符 \n 列分隔符 \t
05.任務執行成功后 -查看數據量以及示例數據情況
select count(*) from expbase.import_date_info_td
select * from expbase.import_date_info_td
03.注意事項
01.數據確認工作:
數據有無重復數據
Excel的數據中的空格,tab的個數,引號等符號問題
異常數據
最終確認數據量,以及示例數據情況
02.創建表的形式
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS INPUTFORMAT
'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
;
03.修改文件的編碼方式和格式,並上傳到Unix系統中
編碼 UTF-8
文件類型 Unix
04.任務的配置
配置工作
Unix系統的文件 行分割符 \n
數據文件的列分隔符保持一致,在這里是 \t
05.確認工作
確認數據量是否准確
查看示例數據各個字段是否對應,字段的值是否正確
參考
https://github.com/apache/spark/pull/13013
Spark寫入hive表時saveAsTable和insertInto的區別 https://blog.csdn.net/huihuixia123/article/details/107658299
Spark(三)-- SparkSQL擴展(數據讀寫) -- 讀寫 Parquet、Json 格式文件(二) https://blog.csdn.net/qq_18800463/article/details/101421490
Spark SQL, DataFrames and Datasets Guide http://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html
RDD Programming Guide http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-programming-guide