spark將計算結果寫入到hdfs的兩種方法


spark將計算結果寫入到hdfs的兩種方法
第一種方法:

rdd.saveAsTextFile(path, classOf[com.hadoop.compression.lzo.LzopCodec])

這種方法有這么幾個問題

1、生成的lzo文件不會創建index文件,需要手動進行創建。

2、每個文件的名稱不能自定義。


第二種方法是直接調用LzopOutputstream的接口和hdfs的api,直接操作hdfs文件。可以規避以上幾個問題。

 def main(args: Array[String]) {
    //保存的路徑
    val basePath = "/tmp/kuan2"
    //設置日志級別
    //    Example.setStreamingLogLevels()
    //創建sparkConf
    val sparkConf = new SparkConf().setAppName("runJob")
    //設置master,此處設置本地執行
    //    sparkConf.setMaster("local[*]")
    //創建SparkContext
    val sc = new SparkContext(sparkConf)
    /**
    val hadoopRDD = sc.newAPIHadoopFile[LongWritable, Text, LzoTextInputFormat]("/tmp/mls.nginx201601281705",
      classOf[LzoTextInputFormat],
      classOf[LongWritable],
      classOf[Text])
    val rdd:RDD[String] = hadoopRDD.map(x=>x._2.toString)
      * */
    val rdd = sc.makeRDD(List("1", "2", "會議", "1", "2", "會議", "1", "2", "會議", "1", "2", "會議"), 2)

    //在每個executor上執行的函數
    //此處定義的是,針對每個分區,我們把計算好的結果寫入到本地目錄中
    val func = (tc: TaskContext, it: Iterator[String]) => {
      //輸出文件路徑
      val outFilePath: String =
        s"""$basePath/${tc.partitionId()}.lzo"""
      val outIndexFilePath = s"""${basePath}/${tc.partitionId()}.index"""

      //****************開始往文件中寫數據********************//
      //得到文件系統
      val fs: FileSystem = FileSystem.get(new Configuration)
      //目標路徑
      val dstPath = new Path(outFilePath)
      val dstIndexPath = new Path(outIndexFilePath)

      //打開一個輸出流
      val lzopCodec = new LzopCodec()
      lzopCodec.setConf(new Configuration())
      val lzoIndexOuputStream = lzopCodec.createIndexedOutputStream(fs.create(dstPath),fs.create(dstIndexPath))
      val pw:PrintWriter = new PrintWriter(new OutputStreamWriter(lzoIndexOuputStream,Charset.forName("UTF-8")));

      try {
        var count = 0
        while (it.hasNext) {
          //寫數據
          pw.println(it.next())

          //增加計數
          count = count + 1
          //判斷是否需要將數據寫到硬盤中
          if (count >= 1000) {

            //強制寫入到存儲中
            pw.flush()
            //數量重新計算
            count = 0
          }

        }
      } finally {
        //關閉數據流
        pw.close()
        fs.close()
      }

      //此處單機測試,所有的輸出本機文件,如果分布式運行,那么輸出文件還是放到hdfs吧
      s"I Am Partition ${tc.partitionId()}"
    }

    //開始執行函數
    val res = sc.runJob(rdd, func)
    //輸出各個partition的執行結果.如果返回結果比較小,直接返回到driver
    res.foreach(println)
    sc.stop()
  }

每個task輸出的文件的文件名可以自定義,同時可以生成索引文件

輸出的目錄如果不存在,可以在執行job之前進行創建。如果存在也不影響運行,生成的文件會進行覆蓋。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM