關於spark寫入文件至文件系統並制定文件名之自定義outputFormat


引言:

  spark項目中通常我們需要將我們處理之后數據保存到文件中,比如將處理之后的RDD保存到hdfs上指定的目錄中,亦或是保存在本地

spark保存文件:

  1、rdd.saveAsTextFile("file:///E:/dataFile/result")

  2、rdd.saveAsHadoopFile("file:///E:/dataFile/result",classOf[T],classOf[T],classOf[outputFormat.class])

  3、df.write.format("csv").save("file:///E:/dataFile/result")

  以上都簡單的,最普遍的保存文件的方式,有時候是不能夠滿足我們的需求,上述的文件保存方式中,保存之后,文件名通常是part-00000的方式保存在result文件夾中,但是,我希望能夠根據需求自己來定義這個文件名,並且指定的保存的文件夾必須事先不能存在,如果存在的話保存文件會報錯。

  此時就需要我們自定義文件保存名。

自定義保存文件名:

  需要自定義保存的文件名的話,就需要我們重新對輸出的文件的方式進行一個格式化,也就是說不能夠使用系統默認的輸出文件的方式,需要我們自定義輸出格式,需要重寫outputFormat類。

示例:

  需求:需要將數據庫中的數據通過sparksql讀取之后進行計算,然后進行計算,最終以指定的文件名寫入到指定的目錄下面:

  數據庫內容:

      

  保存之后的文件:

    保存路徑:本地“E:/dataFile/result”,該目錄下,文件名為person.txt

  保存之后文件名:

    

  保存后文件內容:

    

代碼實現:

  需要自定一個一個類重寫outputFormat類中的方法

  這里我使用saveAsHadoopFile的方式進行保存文件,如果是使用saveAsTextFile的方式的話,因為只有能傳入一個參數,

  saveAsHadoopFile的形式保存文件,該方式是針對<k,v>對的RDD進行保存,保存的文件中內容是key和value,以空格分開,相同的key或保存在同一個文件中

上代碼:

第一步:重寫FileoutputFormat類

package cn.com.xxx.audit

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class CustomOutputFormat extends MultipleTextOutputFormat[Any, Any] {

 //重寫generateFileNameForKeyValue方法,該方法是負責自定義生成文件的文件名 override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = {
  //這里的key和value指的就是要寫入文件的rdd對,再此,我定義文件名以key.txt來命名,當然也可以根據其他的需求來進行生成文件名 val fileName = key.asInstanceOf[String] + ".txt" fileName }  /**
*因為saveAsHadoopFile是以key,value的形式保存文件,寫入文件之后的內容也是,按照key value的形式寫入,k,v之間用空格隔開,這里我只需要寫入value的值,不需要將key的值寫入到文件中個,所以我需要重寫
*該方法,讓輸入到文件中的key為空即可,當然也可以進行領過的變通,也可以重寫generateActuralValue(key:Any,value:Any),根據自己的需求來實現
*/ override def generateActualKey(key: Any, value: Any): String = { null }  //對生成的value進行轉換為字符串,當然源碼中默認也是直接返回value值,如果對value沒有特殊處理的話,不需要重寫該方法
override def generateAcutalValue(key: Any, value: Any): String = {
return value.asInstance[String]
}
 /**
* 該方法使用來檢查我們輸出的文件目錄是否存在,源碼中,是這樣判斷的,如果寫入的父目錄已經存在的話,則拋出異常
* 在這里我們沖寫這個方法,修改文件目錄的判斷方式,如果傳入的文件寫入目錄已存在的話,直接將其設置為輸出目錄即可,
* 不會拋出異常
*/ override def checkOutputSpecs(ignored: FileSystem, job: JobConf): Unit = { var outDir: Path = FileOutputFormat.getOutputPath(job) if (outDir != null) {
    //注意下面的這兩句,如果說你要是寫入文件的路徑是hdfs的話,下面的兩句不要寫,或是注釋掉,它倆的作用是標准化文件輸出目錄,根據我的理解是,他們是標准化本地路徑,寫入本地的話,可以加上,本地路徑記得要用file:///開頭,比如file:///E:/a.txt //val fs: FileSystem = ignored //outDir = fs.makeQualified(outDir) FileOutputFormat.setOutputPath(job, outDir) } } }

第二步:

package scala.spark._sql

import java.util.Properties

import mysqlUtils.OperatorMySql
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext, TaskContext}
object DataFrameToMySql {	
	def main(args: Array[String]): Unit = {
		val conf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[2]")
		val sc = new SparkContext(conf)
		val sqlContext = new SQLContext(sc)
		//配置輸出文件不生成success文件
		sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
		//配置一些參數
		//如果設置為true,sparkSql將會根據數據統計信息,自動為每一列選擇單獨的壓縮編碼方式
		sqlContext.setConf("spark.sql.inMemoryColumnarStorage.compressed", "true")
		//控制列式緩存批量的大小。增大批量大小可以提高內存的利用率和壓縮率,但同時也會帶來OOM的風險
		sqlContext.setConf("spark.sql.inMemoryColumnarStorage.batchSize", "1000")
		sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", "10485760")
		//設為true,則啟用優化的Tungsten物理執行后端。Tungsten會顯示的管理內存,並動態生成表達式求值得字節碼
		sqlContext.setConf("spark.sql.tungsten.enabled", "true")
		//配置shuffle是的使用的分區數
		sqlContext.setConf("spark.sql.shuffle.partitions", "200")

		sc.setLogLevel("WARN")
		val pro = new Properties()
		pro.put("user", "root")
		pro.put("password", "123456")
		pro.put("driver", "com.mysql.jdbc.Driver")
		val url = "jdbc:mysql://localhost:3306/test?serverTimezone=UTC"
		val rdf = sqlContext.read /*.jdbc(url,"person1",pro)*/
		  .format("jdbc")
		  .options(Map(
			"url" -> url,
			"dbtable" -> "person",
			"driver" -> "com.mysql.jdbc.Driver",
			"user" -> "root",
			"password" -> "123456",
			"fetchSize" -> "10",
			"partitionColumn" -> "age",
			"lowerBound" -> "0",
			"upperBound" -> "1000",
			"numPartitions" -> "2"
		  )).load()
         //將讀取的文件盡心個計算,並且以pairRDD的形式寫入文件中,這里在寫入文件的時候,會將key當做文件名來進行寫入,也就是說相同的key對應的value都會寫入到相同的文件中 val x = rdf.groupBy(substring(col("score"), 0, 5) as ("score")).agg(max("age") as ("max"), avg("age") as ("avg")) .rdd.map(x => ("person", x(0) + "," + x(1) + "," + x(2)))
           //這里partitionBy,只是來增加文件文件寫入的並行度,可以根據需求進行設置,影響的是文件寫入的性能,我個人是這么理解的,如果有不對的還請指正 .partitionBy(new HashPartitioner(10))
        //這里寫入的時候,要指定我們自定義的PairRDDMultipleTextOutputFormat類 .saveAsHadoopFile("file:///E:/dataFile/res", classOf[String], classOf[String], classOf[PairRDDMultipleTextOutputFormat])
    sc.stop() }

寫入結果:

  文件內容:

    

  文件名稱:

    

  文件夾名稱:

    E:\dataFile\res

    改文件夾事先已經存在,因為重寫了checkOutputSpecs方法,做了處理,所以不會拋出異常,如果改文件夾目錄實現不存在的話,程序會自動去創建一個該文件夾

跟蹤FileOutput源碼

  主要來看下我們重寫的這幾個方法,源碼中都做了些什么:

  類名:MultipleOutputFormat

      

        

   

   

  從源碼中可以很容易的看到各個類的實現。

  這樣我們就可以根據我們的需求,將spark計算之后的數據寫入到我們指定的文件夾下面,並且指定生成的文件名。

 

這個問題搞了我兩三天了,網上各種找,都說是要重寫什么getRecordWriter方法,理清了思路之后,才發現,不是我需要的,在此記錄一下

 

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM