Spark 掃描 HDFS lzo/gz/orc異常壓縮文件


一、問題背景

考慮到 Hadoop 3.0.0 的新特性 EC 碼,HDFS 在存儲數據時能獲得很好的壓縮比,同時 Hadoop 2.6.0 集群 HDFS 存儲壓力較大,我們將 Hadoop 2.6.0 集群的數據冷備到 Hadoop 3.0.0,來緩解 HDFS 存儲的壓力,但在冷備操作進行了一段時間后,用戶反饋數據讀取存在異常報錯,先花了一些時間根據異常信息從集群層面去排查問題,但都於事無補。后續根據對比數據在冷備前和冷備后的區別,發現文件的本身屬性已經破壞的,也就是在 distcp 過程中文件被異常損壞了,但我們又沒有進行文件的完整性校驗(之前同版本集群冷備都是正常的),導致有一部分數據已經損壞,而且還補可修復,因此我們要做兩件事,一件事是針對已冷備的數據,找出所有異常的文件,剔除掉保證用戶任務正常執行,另一件事修改 distcp 拷貝數據的協議,不再依賴 hdfs 協議,而是改為 webhdfs 協議,擺脫不同集群版本之間差異的影響。

而本文主要是解決第一件事,即掃描出所有已冷備的異常文件。在集群數據中,目前發現出錯的文件主要是 orc、lzo 和 gz 三種格式的壓縮文件,因此開發相應的程序來找出所有的異常文件(具體如何恢復數據還有待研究。下面是訪問三類壓縮文件報錯的異常信息。

# 讀 orc 文件異常信息
Caused by: java.lang.IllegalArgumentException: Buffer size too small. size = 262144 needed = 3043758

# 讀 lzo 文件異常信息
Caused by: java.io.IOException: Invalid LZO header

# 讀 gz 文件異常信息
Caused by: java.io.IOException: invalid stored block lengths

二、Spark-shell 腳本方式掃描

前期是嘗試開發 MapReduce 程序去實現,但在代碼編寫中發現並不太好實現,后續嘗試用 Spark 來實現,最直接的方式也就是通過 spark-shell 去讀取不同格式文件,如下也是通過 spark-shell 去讀取三類壓縮文件(包括正常文件和異常文件)的命令,對於異常文件,均能正常復現出上面的三種異常信息,也就是說通過這種方式訪問是可行的。

# 讀取 orc 文件(正常文件)
sqlContext.read.orc("/user/11085245/orc/000124_0").count()

# 讀取 orc 文件(異常文件)
sqlContext.read.orc("/user/11085245/orc/000097_0").count()


# 讀取 lzo 文件(正常文件)
sc.newAPIHadoopFile("/user/11085245/lzo/part-00415.lzo", classOf[com.hadoop.mapreduce.LzoTextInputFormat],classOf[org.apache.hadoop.io.LongWritable],classOf[org.apache.hadoop.io.Text]).map(_._2.toString).count()

# 讀取 lzo 文件(異常文件)
sc.newAPIHadoopFile("/user/11085245/lzo/part-00040.lzo", classOf[com.hadoop.mapreduce.LzoTextInputFormat],classOf[org.apache.hadoop.io.LongWritable],classOf[org.apache.hadoop.io.Text]).map(_._2.toString).count()


# 讀取 gz 文件(正常文件)
sc.textFile("/user/11085245/gz/000096_0.gz").count()

# 讀取 gz 文件(異常文件)
sc.textFile("/user/11085245/gz/000100_0.gz").count()

三、Spark 項目方式掃描

3.1 代碼開發

在章節二中用 spark-shell 能夠復現出異常信息,對於要掃描給定 HDFS 目錄下所有文件的完整性,順理成章地也就想到用 Spark 工程化的代碼(這里選擇用 scala 語言)去實現這一邏輯。其實主要就是三個流程:一是創建 scala 工程,二是根據 HDFS 目錄列出目錄下所有文件,三個根據單個文件調用類似 spark-shell 的 api 去操作文件,並輸出異常壓縮的文件。

idea 工具創建 scala 工程代碼參考 https://blog.csdn.net/qq_32575047/article/details/103045641

具體的實現代碼如下:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import java.net.URI
import org.apache.hadoop.io.{LongWritable,Text}
import com.hadoop.mapreduce.LzoTextInputFormat

object FileScan {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf()
    sparkConf.setAppName("SparkFileScan")
    val sc =  new SparkContext(sparkConf)
    val hiveContext = new HiveContext(sc)
    if (args.length != 1) {
      println("------Please type correct input-----------")
      sc.stop()
      sys.exit(-1)
    }
    // 獲取給定的 HDFS 目錄
    val path = args.head

    // 針對單個文件依次掃描,並輸出異常文件
    listFile(path).foreach( patha =>
      try {
        if (patha.contains(".lzo")) {
          sc.newAPIHadoopFile[LongWritable, Text, LzoTextInputFormat](patha).map(_._2.toString).count()
        } else if (patha.contains(".gz")) {
          sc.textFile(patha).count()
        } else if (patha.contains("SUCCESS")) {
          // success 文件不作處理
        } else {
          hiveContext.read.orc(patha).count()
        }
      } catch {
        case e: Exception =>
          println(patha)
          e.printStackTrace()
      }
    )
    sc.stop()
  }

  /**
    * 生成 FileSystem 對象
    */
  def getHdfs(path: String): FileSystem = {
    val conf = new Configuration();
    FileSystem.newInstance(URI.create(path), conf)
  }

  /**
    * 獲取目錄下的一級文件
    */
  def listFile(path: String): Array[String] = {
    val hdfs = getHdfs(path)
    val fs = hdfs.listStatus(new Path(path))
    FileUtil.stat2Paths(fs).filter(hdfs.getFileStatus(_).isFile()).map(_.toString)
  }
}

 

對應的 xml 依賴文件:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.kwang.bigdata</groupId>
  <artifactId>SparkFileScan</artifactId>
  <version>1.0-SNAPSHOT</version>
  <name>${project.artifactId}</name>

  <properties>
    <spark.version>1.6.0</spark.version>
    <scala.version>2.10.5</scala.version>
    <hadoop.version>2.6.0-cdh5.14.0</hadoop.version>
  </properties>

  <dependencies>
      <!-- scala 依賴 -->
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>

    <!-- spark依賴 -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.10</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.10</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <!-- Hadoop 依賴 -->
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>${hadoop.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>${hadoop.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-core</artifactId>
      <version>${hadoop.version}</version>
    </dependency>

    <!-- hadoop lzo 依賴 -->
    <dependency>
      <groupId>org.anarres.lzo</groupId>
      <artifactId>lzo-hadoop</artifactId>
      <version>1.0.6</version>
    </dependency>

  </dependencies>
</project>

至此,代碼工作已開發完成,接下來就是如何編譯打包運行項目了。

3.2 項目運行

運行項目執行當然是要把工程打包呀,打包方式如下:

mvn clean package

打包好了當然就是在 Hadoop 集群上去運行 Spark 項目咯,運行方式如下。

spark-submit --master yarn --deploy-mode client --executor-memory 2g --queue root.exquery --class com.vivo.bigdata.FileScan SparkFileScan-1.0-SNAPSHOT.jar hdfs://nameservice/user/11085245/file/ >errfile
 
參數說明:
hdfs://nameservice/user/11085245/file/:要掃描的HDFS目錄
errfile:輸出的異常文件列表

 

至此,整個項目開發的工作基本完成了。第一次寫項目開發代碼,從如何創建 scala 項目,到寫下 scala 的第一行代碼,到編譯運行項目,摸爬滾打,整個流程都打通了,功能不復雜,但還挺有意思也小有成就感的。^_^

【參考資料】

  1. https://blog.csdn.net/qq_32575047/article/details/103045641

  2. https://stackoverflow.com/questions/62565953/pyspark-read-orc-files-with-new-schema (Spark 讀取 orc 文件)

  3. http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-LZO-file-in-Spark-td29382.html  (Spark 讀取 lzo 文件)

  4. https://blog.csdn.net/dkl12/article/details/84312307 (Spark 獲取 HDFS 目錄下文件)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM