如果用戶希望在spark sql 中,執行某個sql 后,將其結果集保存到本地,並且指定csv 或者 json 格式,在 beeline 中,實現起來很麻煩。通常的做法是將其create table tempTable as *** ,通過將結果集寫入到新的臨時表中,進行保存,然后再通過其他方式export 到本地。
這種方式,對於 HDFS 是可行到,但是如果數據是保存在像SequoiaDB 中,就比較難辦了。因為spark 向 SequoiaDB 寫入記錄時,可能部分task 會失敗重試,這樣就容易造成SequoiaDB 目標表中寫入了重復記錄,從而造成數據不准確的問題。
因此,需要尋找一種的新的方式,將其結果集准確地讀取出來,並且寫入本地文件。
在網上有很多替代方案,無外乎是通過beeline 或者 spark-sql ,執行 SQL 命令,通過重定向的方式,將結果集保存到指定文件中。
這樣的方式,首先不討論其輸出格式的問題,最無法讓人接受的是,spark-sql 需要將所有的結果數據收集到一個 Driver 進程中后,才會開始輸出終端。這個過程有以下 3 個問題
- 時間久,如果數據量大了,Driver 收集的過程會很久,並且通過top 可以查看到進程CPU 飆升
- 容易OOM,當數據量增大后,因為需要將所有結果數據存儲在內存中,一旦數據量用超了,就拋出 OOM 的錯誤,一切前功盡棄
- 輸出格式,因為保存本地文件的內容就是輸出終端的數據,CSV 格式不友好,有時候甚至會因為不可見字符而導致整個本地文件格式錯亂,最終導致數據無法恢復
所以本文主要是向讀者們介紹一種新的方式,直接使用 scala / python 語言開發的程序,利用 RDD 將其結果數據保存本地,輸出格式支持 CSV 和 JSON。
- scala 版本
scala 版本作者沒有直接編寫程序,但是通過 spark-shell 進行了驗證
import org.apache.spark.sql.hive.HiveContext // sc - existing spark context val sqlContext = new HiveContext(sc) val df = sqlContext.sql("SELECT * FROM test_sdb") df.coalesce(1).write.format("com.databricks.spark.csv").mode("overwrite").option("header", "true").save("/opt/sequoiadb/chenfool")
如果有用戶喜歡這個方式,可以考慮將程序打包成jar 包來執行。
導出格式的更多參數,請參考 python 版本
- python 版本
在執行python 的腳本前,首先需要設置一下環境變量
export SPARK_HOME=/root/software/spark-2.1.1-bin-hadoop2.7 export PYTHONPATH=${SPARK_HOME}/python/:${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip;
注意:py4j-0.10.4-src.zip 文件名可能隨不同的spark 版本有所變化
然后准備以下腳本程序, spark_sql_export.py
import atexit import os import platform import pyspark from pyspark.context import SparkContext from pyspark.sql import SparkSession, SQLContext spark = SparkSession \ .builder \ .enableHiveSupport() \ .getOrCreate() df = spark.sql("SELECT * FROM test_sdb limit 100") #df.coalesce(1).write.format("org.apache.spark.sql.json").mode("overwrite") \ # .save("/opt/sequoiadb/chenfool") df.coalesce(1).write.format("com.databricks.spark.csv").mode("overwrite") \ .option("enforceSchema", "false") \ .option("quoteAll", "true") \ .option("escapeQuotes", "false") \ .option("header", "true") \ .option("delimiter", "|") \ .option("charToEscapeQuoteEscaping", "\"") \ .option("inferSchema", "true") \ .option("ignoreLeadingWhiteSpace", "false") \ .option("ignoreTrailingWhiteSpace", "false") \ .save("/opt/sequoiadb/chenfool")
執行方式
python spark_sql_export.py
結果數據就會被保存在 /opt/sequoiadb/chenfool/part-00000* 文件中。
結果數據只會被保存在一個文件中,因為設置了 coalesce 參數。
JSON 格式請參考 spark_sql_export.py 注釋部分。
CSV 的詳細參數,可以參考spark 源碼:${SPARK_HOME}/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
注意:
在spark 2.1.1 版本中,ignoreLeadingWhiteSpace 和 ignoreTrailingWhiteSpace 參數無法生效,默認值為:true。在 spark 2.4.0 版本中,經過測試,這兩個參數才能夠生效。如果要求保存的數據中不做 trim 操作,只能夠將spark 升級為2.4.0 版本。
本博客參考了之前 spark 學習(二) 的內容,里面有介紹如果利用python 來執行spark 的程序的說明,感興趣的讀者們可以移步查閱