Spark:DataFrame 寫入文本文件


將DataFrame寫成文件方法有很多
最簡單的將DataFrame轉換成RDD,通過saveASTextFile進行保存但是這個方法存在一些局限性:
1.將DataFrame轉換成RDD或導致數據結構的改變
2.RDD的saveASTextFile如果文件存在則無法寫入,也就意味着數據只能覆蓋無法追加,對於有數據追加需求的人很不友好
3.如果數據需要二次處理,RDD指定分隔符比較繁瑣

基於以上原因,在研讀了Spark的官方文檔后,決定采取DataFrame的自帶方法 write 來實現。
此處采用mysql的數據作為數據源,讀取mysql的方法在 Spark:讀取mysql數據作為DataFrame 有詳細介紹。

1.mysql的信息

mysql的信息我保存在了外部的配置文件,這樣方便后續的配置添加。

1 //配置文件示例:
2 [hdfs@iptve2e03 tmp_lillcol]$ cat job.properties 
3 #mysql數據庫配置
4 mysql.driver=com.mysql.jdbc.Driver
5 mysql.url=jdbc:mysql://127.0.0.1:3306/database1?useSSL=false&autoReconnect=true&failOverReadOnly=false&rewriteBatchedStatements=true
6 mysql.username=user
7 mysql.password=123456

 2.需要的jar依賴

sbt版本,maven的對應修改即可

 1 libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0-cdh5.7.2"
 2 libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0-cdh5.7.2"
 3 libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.6.0-cdh5.7.2"
 4 libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.0-cdh5.7.2"
 5 libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.0-cdh5.7.2"
 6 libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.0-cdh5.7.2"
 7 libraryDependencies += "org.apache.hbase" % "hbase-protocol" % "1.2.0-cdh5.7.2"
 8 libraryDependencies += "mysql" % "mysql-connector-java" % "5.1.38"
 9 libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0-cdh5.7.2"
10 libraryDependencies += "com.yammer.metrics" % "metrics-core" % "2.2.0"

 3.完整實現代碼

 1 import java.io.FileInputStream
 2 import java.util.Properties
 3 
 4 import org.apache.spark.sql.hive.HiveContext
 5 import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
 6 import org.apache.spark.{SparkConf, SparkContext}
 7 
 8 /**
 9   * @author Administrator
10   *         2018/10/16-14:35
11   *
12   */
13 object TestSaveFile {
14   var hdfsPath: String = ""
15   var proPath: String = ""
16   var DATE: String = ""
17 
18   val sparkConf: SparkConf = new SparkConf().setAppName(getClass.getSimpleName)
19   val sc: SparkContext = new SparkContext(sparkConf)
20   val sqlContext: SQLContext = new HiveContext(sc)
21 
22   def main(args: Array[String]): Unit = {
23     hdfsPath = args(0)
24     proPath = args(1)
25     //不過濾讀取
26     val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "TestMysqlTble1", proPath)
27     saveAsFileAbsPath(dim_sys_city_dict, hdfsPath + "TestSaveFile", "|", SaveMode.Overwrite)
28   }
29 
30   /**
31     * 獲取 Mysql 表的數據
32     *
33     * @param sqlContext
34     * @param tableName 讀取Mysql表的名字
35     * @param proPath   配置文件的路徑
36     * @return 返回 Mysql 表的 DataFrame
37     */
38   def readMysqlTable(sqlContext: SQLContext, tableName: String, proPath: String): DataFrame = {
39     val properties: Properties = getProPerties(proPath)
40     sqlContext
41       .read
42       .format("jdbc")
43       .option("url", properties.getProperty("mysql.url"))
44       .option("driver", properties.getProperty("mysql.driver"))
45       .option("user", properties.getProperty("mysql.username"))
46       .option("password", properties.getProperty("mysql.password"))
47       .option("dbtable", tableName)
48       .load()
49   }
50 
51   /**
52     * 將 DataFrame 保存為 hdfs 文件 同時指定保存絕對路徑 與 分隔符
53     *
54     * @param dataFrame  需要保存的 DataFrame
55     * @param absSaveDir 保存保存的路徑 (據對路徑)
56     * @param splitRex   指定分割分隔符
57     * @param saveMode   保存的模式:Append、Overwrite、ErrorIfExists、Ignore
58     */
59   def saveAsFileAbsPath(dataFrame: DataFrame, absSaveDir: String, splitRex: String, saveMode: SaveMode): Unit = {
60     dataFrame.sqlContext.sparkContext.hadoopConfiguration.set("mapred.output.compress", "false")
61     //為了方便觀看結果去掉壓縮格式
62     val allClumnName: String = dataFrame.columns.mkString(",")
63     val result: DataFrame = dataFrame.selectExpr(s"concat_ws('$splitRex',$allClumnName) as allclumn")
64     result.write.mode(saveMode).text(absSaveDir)
65   }
66 
67   /**
68     * 獲取配置文件
69     *
70     * @param proPath
71     * @return
72     */
73   def getProPerties(proPath: String): Properties = {
74     val properties: Properties = new Properties()
75     properties.load(new FileInputStream(proPath))
76     properties
77   }
78 }

 4.測試

1 def main(args: Array[String]): Unit = {
2     hdfsPath = args(0)
3     proPath = args(1)
4     //不過濾讀取
5     val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "TestMysqlTble1", proPath)
6     saveAsFileAbsPath(dim_sys_city_dict, hdfsPath + "TestSaveFile", "|", SaveMode.Overwrite)
7   }

5.執行命令

 1 nohup spark-submit --master yarn \
 2 --driver-memory 4G \
 3 --num-executors 2 \
 4 --executor-cores 4 \
 5 --executor-memory 8G \
 6 --class com.iptv.job.basedata.TestSaveFile \
 7 --jars /var/lib/hadoop-hdfs/tmp_lillcol/mysql-connector-java-5.1.38.jar \
 8 test.jar \
 9 hdfs://ns1/user/hive/../ \
10 /var/.../job.properties > ./TestSaveFile.log 2>&1 &

 6.運行結果

 1 [hdfs@iptve4e03 tmp_lillcol]$ hadoop fs -du -h hdfs://ns1/user/hive/warehouse/TestSaveFile
 2 0      0      hdfs://ns1/user/hive/warehouse/TestSaveFile/_SUCCESS
 3 4.1 K  4.1 K  hdfs://ns1/user/hive/warehouse/TestSaveFile/part-r-123412340-ec83e1f1-4bd9-4b4a-89a3-8489c1f908dc
 4 
 5 [hdfs@iptve4e03 tmp_lillcol]$ hadoop fs -cat hdfs://ns1/user/hive/warehouse/TestSaveFile/part-r-123412340-ec83e1f1-4bd9-4b4a-89a3-8489c1f908dc
 6 1234|12349|張三|韓服_G|11234|張三艾歐尼亞|韓服-G|1234D5A3434|3|張三天庭
 7 12343|1234|1234|韓服_M|31234|李四艾歐尼亞|韓服-M|5F4EE4345|8|1234天庭
 8 1234|12340|石中劍山|韓服_s8|11234|張三艾歐尼亞|韓服-s8|59B403434|5|石中劍山天庭
 9 12344|12344|靈山|韓服_J|31234|李四艾歐尼亞|韓服-J|CF19F434B|40|靈山天庭
10 1234|1234|他家|韓服_H|11234|張三艾歐尼亞|韓服-Z|51234EB1434|9|他家天庭
11 12345|12340|雲浮|韓服_F|31234|李四艾歐尼亞|韓服-Y|9C9C04344|41|浮天庭
12 1234|12348|潮邊疆|韓服_Z|41234|佛山艾歐尼亞|韓服-Z|5B034340F|15|邊疆天庭
13 12340|12344|河姆渡人源|韓服_HY|41234|深圳艾歐尼亞|韓服-HY434123490808|18|河姆渡人源天庭
14 1234|1234|佛山|韓服_S|41234|佛山艾歐尼亞|韓服-FS|EEA981434|4|佛祖天庭
15 12340|12343|揭陽|韓服_J|41234|深圳艾歐尼亞|韓服-JY|9FF084349|10|天庭
16 1234|1234|石中劍邊疆|韓服_|41234|佛山艾歐尼亞|韓服-HZ|440A434FC|0|石中劍邊疆天庭
17 12348|1234|梅邊疆|韓服_Z|41234|深圳艾歐尼亞|韓服-MZ|E9B434F09|14|梅邊疆天庭
18 1234|12348|石中劍名|韓服_M|41234|佛山艾歐尼亞|韓服-MM|5D0A94434|14|石中劍名天庭
19 12349|1234|日本|韓服_|41234|深圳艾歐尼亞|韓服-SG|BD0F34349|19|日本天庭
20 1234|1234|石中劍石中劍|韓服_ST|41234|佛山艾歐尼亞|韓服-ST|18D0D0434|0|石中劍石中劍天庭
21 12340|1234|深圳|韓服_Z|41234|深圳艾歐尼亞|韓服-Z|31E4C4344|4|深天庭
22 12340|12340|石中劍尾|韓服_SW|41234|佛山艾歐尼亞|韓服-SW|1BA1234434B|10|石中劍尾天庭
23 12341|1234|美國|韓服_Z|41234|深圳艾歐尼亞|韓服-Q|3C09D434B|13|美國天庭
24 12341|1234|湛江|韓服_Z|41234|佛山艾歐尼亞|韓服-Z|3A49A4340|11|我家天庭
25 1234|12343|清詩和遠方|韓服_Y|11234|張三艾歐尼亞|韓服-Y|4344E0F31|10|清詩和遠方天庭
26 1234|41234|李四|韓服_AZ|31234|李四艾歐尼亞|韓服-Z|13F1D4344|1|李四天庭

 7.總結

在整個過程中有幾個需要注意的點

  • 只能存一個列
 1 /**
 2    * Saves the content of the [[DataFrame]] in a text file at the specified path.
 3    * The DataFrame must have only one column that is of string type.
 4    * Each row becomes a new line in the output file. For example:
 5    * {{{
 6    *   // Scala:
 7    *   df.write.text("/path/to/output")
 8    *
 9    *   // Java:
10    *   df.write().text("/path/to/output")
11    * }}}
12    *
13    * @since 1.6.0
14    */
15   def text(path: String): Unit = format("text").save(path)

這段代碼已經說明了一切,是的,只能保存只有一列的DataFrame.

但是比起RDD,DataFrame能夠比較輕易的處理這種情況

1 def saveAsFileAbsPath(dataFrame: DataFrame, absSaveDir: String, splitRex: String, saveMode: SaveMode): Unit = {
2     dataFrame.sqlContext.sparkContext.hadoopConfiguration.set("mapred.output.compress", "false")
3     //為了方便觀看結果去掉壓縮格式
4     val allClumnName: String = dataFrame.columns.mkString(",")
5     val result: DataFrame = dataFrame.selectExpr(s"concat_ws('$splitRex',$allClumnName) as allclumn")
6     result.write.mode(saveMode).text(absSaveDir)
7   }

 

上述代碼中 我們通過columns.mkString(",")獲取 dataFrame 的所有列名並用","分隔,然后通過selectExpr(s"concat_ws('$splitRex',$allClumnName) as allclumn")將所有數據拼接當成一列,完美解決只能保存一列的問題

  • DataFrame 某個字段為空

如果 DataFrame 中某個字段為null,那么在你最中生成的文件中不會有該字段,所以,如果對結果字段的個數有要求的,最好在數據處理的時候將有可能為null的數據賦值空串"",特別是還有將數據load進Hive需求的,否則數據會出現錯位

至此DataFrame 寫文件功能實現

此文為本人工作總結,轉載請標明出處!!!!!!!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM