spark sql 導出數據


如果用戶希望在spark sql 中,執行某個sql 后,將其結果集保存到本地,並且指定csv 或者 json 格式,在 beeline 中,實現起來很麻煩。通常的做法是將其create table tempTable as *** ,通過將結果集寫入到新的臨時表中,進行保存,然后再通過其他方式export 到本地。

這種方式,對於 HDFS 是可行到,但是如果數據是保存在像SequoiaDB 中,就比較難辦了。因為spark 向 SequoiaDB 寫入記錄時,可能部分task 會失敗重試,這樣就容易造成SequoiaDB 目標表中寫入了重復記錄,從而造成數據不准確的問題。

 

因此,需要尋找一種的新的方式,將其結果集准確地讀取出來,並且寫入本地文件。

 

在網上有很多替代方案,無外乎是通過beeline 或者 spark-sql ,執行 SQL 命令,通過重定向的方式,將結果集保存到指定文件中。

這樣的方式,首先不討論其輸出格式的問題,最無法讓人接受的是,spark-sql 需要將所有的結果數據收集到一個 Driver 進程中后,才會開始輸出終端。這個過程有以下 3 個問題

  1. 時間久,如果數據量大了,Driver 收集的過程會很久,並且通過top 可以查看到進程CPU 飆升
  2. 容易OOM,當數據量增大后,因為需要將所有結果數據存儲在內存中,一旦數據量用超了,就拋出 OOM 的錯誤,一切前功盡棄
  3. 輸出格式,因為保存本地文件的內容就是輸出終端的數據,CSV 格式不友好,有時候甚至會因為不可見字符而導致整個本地文件格式錯亂,最終導致數據無法恢復

 

所以本文主要是向讀者們介紹一種新的方式,直接使用 scala / python 語言開發的程序,利用 RDD 將其結果數據保存本地,輸出格式支持 CSV 和 JSON。

 

  • scala 版本

scala 版本作者沒有直接編寫程序,但是通過 spark-shell 進行了驗證

import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM test_sdb")
df.coalesce(1).write.format("com.databricks.spark.csv").mode("overwrite").option("header", "true").save("/opt/sequoiadb/chenfool")

如果有用戶喜歡這個方式,可以考慮將程序打包成jar 包來執行。

導出格式的更多參數,請參考 python 版本

  • python 版本

在執行python 的腳本前,首先需要設置一下環境變量

export SPARK_HOME=/root/software/spark-2.1.1-bin-hadoop2.7
export PYTHONPATH=${SPARK_HOME}/python/:${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip;

注意:py4j-0.10.4-src.zip 文件名可能隨不同的spark 版本有所變化

然后准備以下腳本程序, spark_sql_export.py

import atexit
import os
import platform


import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SparkSession, SQLContext

spark = SparkSession \
    .builder \
    .enableHiveSupport() \
    .getOrCreate()

df = spark.sql("SELECT * FROM test_sdb limit 100")

#df.coalesce(1).write.format("org.apache.spark.sql.json").mode("overwrite") \
#  .save("/opt/sequoiadb/chenfool")


df.coalesce(1).write.format("com.databricks.spark.csv").mode("overwrite") \
  .option("enforceSchema", "false") \
  .option("quoteAll", "true") \
  .option("escapeQuotes", "false") \
  .option("header", "true") \
  .option("delimiter", "|") \
  .option("charToEscapeQuoteEscaping", "\"") \
  .option("inferSchema", "true") \
  .option("ignoreLeadingWhiteSpace", "false") \
  .option("ignoreTrailingWhiteSpace", "false") \
  .save("/opt/sequoiadb/chenfool")

執行方式

python spark_sql_export.py

結果數據就會被保存在 /opt/sequoiadb/chenfool/part-00000* 文件中。

結果數據只會被保存在一個文件中,因為設置了 coalesce 參數。

JSON 格式請參考 spark_sql_export.py 注釋部分。

CSV 的詳細參數,可以參考spark 源碼:${SPARK_HOME}/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala 

 

注意:

在spark 2.1.1 版本中,ignoreLeadingWhiteSpace 和 ignoreTrailingWhiteSpace 參數無法生效,默認值為:true。在 spark 2.4.0 版本中,經過測試,這兩個參數才能夠生效。如果要求保存的數據中不做 trim 操作,只能夠將spark 升級為2.4.0 版本。

 

本博客參考了之前 spark 學習(二) 的內容,里面有介紹如果利用python 來執行spark 的程序的說明,感興趣的讀者們可以移步查閱


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM