引言:
spark項目中通常我們需要將我們處理之后數據保存到文件中,比如將處理之后的RDD保存到hdfs上指定的目錄中,亦或是保存在本地
spark保存文件:
1、rdd.saveAsTextFile("file:///E:/dataFile/result")
2、rdd.saveAsHadoopFile("file:///E:/dataFile/result",classOf[T],classOf[T],classOf[outputFormat.class])
3、df.write.format("csv").save("file:///E:/dataFile/result")
以上都簡單的,最普遍的保存文件的方式,有時候是不能夠滿足我們的需求,上述的文件保存方式中,保存之后,文件名通常是part-00000的方式保存在result文件夾中,但是,我希望能夠根據需求自己來定義這個文件名,並且指定的保存的文件夾必須事先不能存在,如果存在的話保存文件會報錯。
此時就需要我們自定義文件保存名。
自定義保存文件名:
需要自定義保存的文件名的話,就需要我們重新對輸出的文件的方式進行一個格式化,也就是說不能夠使用系統默認的輸出文件的方式,需要我們自定義輸出格式,需要重寫outputFormat類。
示例:
需求:需要將數據庫中的數據通過sparksql讀取之后進行計算,然后進行計算,最終以指定的文件名寫入到指定的目錄下面:
數據庫內容:
保存之后的文件:
保存路徑:本地“E:/dataFile/result”,該目錄下,文件名為person.txt
保存之后文件名:
保存后文件內容:
代碼實現:
需要自定一個一個類重寫outputFormat類中的方法
這里我使用saveAsHadoopFile的方式進行保存文件,如果是使用saveAsTextFile的方式的話,因為只有能傳入一個參數,
saveAsHadoopFile的形式保存文件,該方式是針對<k,v>對的RDD進行保存,保存的文件中內容是key和value,以空格分開,相同的key或保存在同一個文件中
上代碼:
第一步:重寫FileoutputFormat類
package cn.com.xxx.audit import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.mapred.{FileOutputFormat, JobConf} import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat class CustomOutputFormat extends MultipleTextOutputFormat[Any, Any] {
//重寫generateFileNameForKeyValue方法,該方法是負責自定義生成文件的文件名 override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = {
//這里的key和value指的就是要寫入文件的rdd對,再此,我定義文件名以key.txt來命名,當然也可以根據其他的需求來進行生成文件名 val fileName = key.asInstanceOf[String] + ".txt" fileName } /**
*因為saveAsHadoopFile是以key,value的形式保存文件,寫入文件之后的內容也是,按照key value的形式寫入,k,v之間用空格隔開,這里我只需要寫入value的值,不需要將key的值寫入到文件中個,所以我需要重寫
*該方法,讓輸入到文件中的key為空即可,當然也可以進行領過的變通,也可以重寫generateActuralValue(key:Any,value:Any),根據自己的需求來實現
*/ override def generateActualKey(key: Any, value: Any): String = { null } //對生成的value進行轉換為字符串,當然源碼中默認也是直接返回value值,如果對value沒有特殊處理的話,不需要重寫該方法
override def generateAcutalValue(key: Any, value: Any): String = {
return value.asInstance[String]
}
/**
* 該方法使用來檢查我們輸出的文件目錄是否存在,源碼中,是這樣判斷的,如果寫入的父目錄已經存在的話,則拋出異常
* 在這里我們沖寫這個方法,修改文件目錄的判斷方式,如果傳入的文件寫入目錄已存在的話,直接將其設置為輸出目錄即可,
* 不會拋出異常
*/ override def checkOutputSpecs(ignored: FileSystem, job: JobConf): Unit = { var outDir: Path = FileOutputFormat.getOutputPath(job) if (outDir != null) {
//注意下面的這兩句,如果說你要是寫入文件的路徑是hdfs的話,下面的兩句不要寫,或是注釋掉,它倆的作用是標准化文件輸出目錄,根據我的理解是,他們是標准化本地路徑,寫入本地的話,可以加上,本地路徑記得要用file:///開頭,比如file:///E:/a.txt //val fs: FileSystem = ignored //outDir = fs.makeQualified(outDir) FileOutputFormat.setOutputPath(job, outDir) } } }
第二步:
package scala.spark._sql import java.util.Properties import mysqlUtils.OperatorMySql import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat import org.apache.hadoop.mapred.{FileOutputFormat, JobConf} import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.{SparkConf, SparkContext, TaskContext} object DataFrameToMySql { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[2]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) //配置輸出文件不生成success文件 sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") //配置一些參數 //如果設置為true,sparkSql將會根據數據統計信息,自動為每一列選擇單獨的壓縮編碼方式 sqlContext.setConf("spark.sql.inMemoryColumnarStorage.compressed", "true") //控制列式緩存批量的大小。增大批量大小可以提高內存的利用率和壓縮率,但同時也會帶來OOM的風險 sqlContext.setConf("spark.sql.inMemoryColumnarStorage.batchSize", "1000") sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", "10485760") //設為true,則啟用優化的Tungsten物理執行后端。Tungsten會顯示的管理內存,並動態生成表達式求值得字節碼 sqlContext.setConf("spark.sql.tungsten.enabled", "true") //配置shuffle是的使用的分區數 sqlContext.setConf("spark.sql.shuffle.partitions", "200") sc.setLogLevel("WARN") val pro = new Properties() pro.put("user", "root") pro.put("password", "123456") pro.put("driver", "com.mysql.jdbc.Driver") val url = "jdbc:mysql://localhost:3306/test?serverTimezone=UTC" val rdf = sqlContext.read /*.jdbc(url,"person1",pro)*/ .format("jdbc") .options(Map( "url" -> url, "dbtable" -> "person", "driver" -> "com.mysql.jdbc.Driver", "user" -> "root", "password" -> "123456", "fetchSize" -> "10", "partitionColumn" -> "age", "lowerBound" -> "0", "upperBound" -> "1000", "numPartitions" -> "2" )).load()
//將讀取的文件盡心個計算,並且以pairRDD的形式寫入文件中,這里在寫入文件的時候,會將key當做文件名來進行寫入,也就是說相同的key對應的value都會寫入到相同的文件中 val x = rdf.groupBy(substring(col("score"), 0, 5) as ("score")).agg(max("age") as ("max"), avg("age") as ("avg")) .rdd.map(x => ("person", x(0) + "," + x(1) + "," + x(2)))
//這里partitionBy,只是來增加文件文件寫入的並行度,可以根據需求進行設置,影響的是文件寫入的性能,我個人是這么理解的,如果有不對的還請指正 .partitionBy(new HashPartitioner(10))
//這里寫入的時候,要指定我們自定義的PairRDDMultipleTextOutputFormat類 .saveAsHadoopFile("file:///E:/dataFile/res", classOf[String], classOf[String], classOf[PairRDDMultipleTextOutputFormat])
sc.stop() }
寫入結果:
文件內容:
文件名稱:
文件夾名稱:
E:\dataFile\res
改文件夾事先已經存在,因為重寫了checkOutputSpecs方法,做了處理,所以不會拋出異常,如果改文件夾目錄實現不存在的話,程序會自動去創建一個該文件夾
跟蹤FileOutput源碼
主要來看下我們重寫的這幾個方法,源碼中都做了些什么:
類名:MultipleOutputFormat
從源碼中可以很容易的看到各個類的實現。
這樣我們就可以根據我們的需求,將spark計算之后的數據寫入到我們指定的文件夾下面,並且指定生成的文件名。
這個問題搞了我兩三天了,網上各種找,都說是要重寫什么getRecordWriter方法,理清了思路之后,才發現,不是我需要的,在此記錄一下