spark將計算結果寫入到hdfs的兩種方法
第一種方法:
rdd.saveAsTextFile(path, classOf[com.hadoop.compression.lzo.LzopCodec])
這種方法有這么幾個問題
1、生成的lzo文件不會創建index文件,需要手動進行創建。
2、每個文件的名稱不能自定義。
第二種方法是直接調用LzopOutputstream的接口和hdfs的api,直接操作hdfs文件。可以規避以上幾個問題。
def main(args: Array[String]) { //保存的路徑 val basePath = "/tmp/kuan2" //設置日志級別 // Example.setStreamingLogLevels() //創建sparkConf val sparkConf = new SparkConf().setAppName("runJob") //設置master,此處設置本地執行 // sparkConf.setMaster("local[*]") //創建SparkContext val sc = new SparkContext(sparkConf) /** val hadoopRDD = sc.newAPIHadoopFile[LongWritable, Text, LzoTextInputFormat]("/tmp/mls.nginx201601281705", classOf[LzoTextInputFormat], classOf[LongWritable], classOf[Text]) val rdd:RDD[String] = hadoopRDD.map(x=>x._2.toString) * */ val rdd = sc.makeRDD(List("1", "2", "會議", "1", "2", "會議", "1", "2", "會議", "1", "2", "會議"), 2) //在每個executor上執行的函數 //此處定義的是,針對每個分區,我們把計算好的結果寫入到本地目錄中 val func = (tc: TaskContext, it: Iterator[String]) => { //輸出文件路徑 val outFilePath: String = s"""$basePath/${tc.partitionId()}.lzo""" val outIndexFilePath = s"""${basePath}/${tc.partitionId()}.index""" //****************開始往文件中寫數據********************// //得到文件系統 val fs: FileSystem = FileSystem.get(new Configuration) //目標路徑 val dstPath = new Path(outFilePath) val dstIndexPath = new Path(outIndexFilePath) //打開一個輸出流 val lzopCodec = new LzopCodec() lzopCodec.setConf(new Configuration()) val lzoIndexOuputStream = lzopCodec.createIndexedOutputStream(fs.create(dstPath),fs.create(dstIndexPath)) val pw:PrintWriter = new PrintWriter(new OutputStreamWriter(lzoIndexOuputStream,Charset.forName("UTF-8"))); try { var count = 0 while (it.hasNext) { //寫數據 pw.println(it.next()) //增加計數 count = count + 1 //判斷是否需要將數據寫到硬盤中 if (count >= 1000) { //強制寫入到存儲中 pw.flush() //數量重新計算 count = 0 } } } finally { //關閉數據流 pw.close() fs.close() } //此處單機測試,所有的輸出本機文件,如果分布式運行,那么輸出文件還是放到hdfs吧 s"I Am Partition ${tc.partitionId()}" } //開始執行函數 val res = sc.runJob(rdd, func) //輸出各個partition的執行結果.如果返回結果比較小,直接返回到driver res.foreach(println) sc.stop() }
每個task輸出的文件的文件名可以自定義,同時可以生成索引文件
輸出的目錄如果不存在,可以在執行job之前進行創建。如果存在也不影響運行,生成的文件會進行覆蓋。