Spark定期合並Hive表小文件


一、需求背景

App端的埋點日志通過LogerServer收集到Kafka,再用Flink寫入到HDFS,按天或天加小時分區,文件格式為text 或者Parquet,Checkpoint間隔為5分鍾,Sink 並行度為10,每個小時產生600個小文件,由於數據量大,每天幾十億的數據,產生的小文件很多,Namenode壓力大,影響Hive Sql & Spark Sql的查詢性能。定期對HDFS小文件合並成為迫切的問題,也是數據治理的重點。開始嘗試使用Hive Job定期合並小文件,帶來的問題是占用資源多,執行時間長,后面改用Spark Job定期合並,效率有明顯提升。

 

二、Spark定期合並Hive表小文件Spark代碼實現

object MergeFile {
  def main(args: Array[String]): Unit = {

    val jobName = args(0)   // 任務名
    val tableName = args(1)  // hive表名
    val format = args(2).toInt   // 1 text格式 && 2 parquet格式
    val pa = args(3).toInt // 並發
    val dt_str = args(4)
    val dt = args(5)       // 分區天 開始dt
    val last = args(6)     // 截止dt
    val hour_str = args(7)
    val hour = args(8)     // 分區小時

    val spark = SparkSession
      .builder()
      .config("spark.seriailzer", "org.apache.spark.serializer.KryoSerializer")
      .appName(jobName + "_MergeFile" + dt)
      .master("yarn")
      .enableHiveSupport
      .getOrCreate

    val db = tableName.split("[.]")(0) + ".db"
    val orgTableName = tableName.split("[.]")(1)

    // 天+小時分區
    if (!hour_str.equals("null")) {
      // 原表導入到文件
      val df: DataFrame = spark.sql(s"select * from ${tableName} where ${dt_str}=${dt} and `${hour_str}`= ${hour} ")
      val origin_table_path = s"hdfs://emr-cluster/user/hive/warehouse/${db}/${orgTableName}/ ${dt_str}=$dt/hour=$hour"

      if (format == 1) {
        // text格式文件
        val text_path = s"hdfs://emr-cluster/user/hive/warehouse/temp.db/${jobName}/${dt_str}=${dt}/${hour_str}=${hour}"
        df.rdd.map(_.mkString("\001")).coalesce(pa).saveAsTextFile(text_path)

        // 文件導入覆蓋原表
        spark.read.textFile(text_path).write.mode(SaveMode.Overwrite).save(origin_table_path)

      } else {
        // parquet格式文件
        val parquet_path = s"hdfs://emr-cluster/user/hive/warehouse/temp.db/${jobName}/${dt_str}=${dt}/${hour_str}=${hour}"
        df.coalesce(pa).write.mode(SaveMode.Overwrite).parquet(parquet_path)

        // 文件導入覆蓋原表
        spark.read.parquet(parquet_path).write.mode(SaveMode.Overwrite).save(origin_table_path)
      }

    } else {
      // 原表導入到文件
      val df: DataFrame = spark.sql(s"select * from ${tableName} where dt=${dt} ")
      val origin_table_path = s"hdfs://emr-cluster/user/hive/warehouse/${db}/${orgTableName}/${dt_str}=${dt}"

      if (format == 1) {
        // text格式文件
        val text_path = s"hdfs://emr-cluster/user/hive/warehouse/temp.db/${jobName}/${dt_str}=${dt}"
        df.rdd.map(_.mkString("\001")).coalesce(pa).saveAsTextFile(text_path)

        // 文件導入覆蓋原表
        spark.read.textFile(text_path).write.mode(SaveMode.Overwrite).save(origin_table_path)

      } else {
        // parquet格式文件
        val parquet_path = s"hdfs://emr-cluster/user/hive/warehouse/temp.db/${jobName}/${dt_str}=${dt}"
        df.coalesce(pa).write.mode(SaveMode.Overwrite).parquet(parquet_path)

        val aa = spark.read.parquet(parquet_path)
        aa.show(10)

        // 文件導入覆蓋原表
        spark.read.parquet(parquet_path).write.mode(SaveMode.Overwrite).save(origin_table_path)
      }
    }

    spark.close()
  }
}

  

三、定期執行合並Job

寫個shell腳本傳入所需參數,可設定任意的分區開始日期和結束日期,靈活合並Hive表的分區文件。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM