一、問題背景
考慮到 Hadoop 3.0.0 的新特性 EC 碼,HDFS 在存儲數據時能獲得很好的壓縮比,同時 Hadoop 2.6.0 集群 HDFS 存儲壓力較大,我們將 Hadoop 2.6.0 集群的數據冷備到 Hadoop 3.0.0,來緩解 HDFS 存儲的壓力,但在冷備操作進行了一段時間后,用戶反饋數據讀取存在異常報錯,先花了一些時間根據異常信息從集群層面去排查問題,但都於事無補。后續根據對比數據在冷備前和冷備后的區別,發現文件的本身屬性已經破壞的,也就是在 distcp 過程中文件被異常損壞了,但我們又沒有進行文件的完整性校驗(之前同版本集群冷備都是正常的),導致有一部分數據已經損壞,而且還補可修復,因此我們要做兩件事,一件事是針對已冷備的數據,找出所有異常的文件,剔除掉保證用戶任務正常執行,另一件事修改 distcp 拷貝數據的協議,不再依賴 hdfs 協議,而是改為 webhdfs 協議,擺脫不同集群版本之間差異的影響。
而本文主要是解決第一件事,即掃描出所有已冷備的異常文件。在集群數據中,目前發現出錯的文件主要是 orc、lzo 和 gz 三種格式的壓縮文件,因此開發相應的程序來找出所有的異常文件(具體如何恢復數據還有待研究。下面是訪問三類壓縮文件報錯的異常信息。
# 讀 orc 文件異常信息 Caused by: java.lang.IllegalArgumentException: Buffer size too small. size = 262144 needed = 3043758 # 讀 lzo 文件異常信息 Caused by: java.io.IOException: Invalid LZO header # 讀 gz 文件異常信息 Caused by: java.io.IOException: invalid stored block lengths
二、Spark-shell 腳本方式掃描
前期是嘗試開發 MapReduce 程序去實現,但在代碼編寫中發現並不太好實現,后續嘗試用 Spark 來實現,最直接的方式也就是通過 spark-shell 去讀取不同格式文件,如下也是通過 spark-shell 去讀取三類壓縮文件(包括正常文件和異常文件)的命令,對於異常文件,均能正常復現出上面的三種異常信息,也就是說通過這種方式訪問是可行的。
# 讀取 orc 文件(正常文件) sqlContext.read.orc("/user/11085245/orc/000124_0").count() # 讀取 orc 文件(異常文件) sqlContext.read.orc("/user/11085245/orc/000097_0").count() # 讀取 lzo 文件(正常文件) sc.newAPIHadoopFile("/user/11085245/lzo/part-00415.lzo", classOf[com.hadoop.mapreduce.LzoTextInputFormat],classOf[org.apache.hadoop.io.LongWritable],classOf[org.apache.hadoop.io.Text]).map(_._2.toString).count() # 讀取 lzo 文件(異常文件) sc.newAPIHadoopFile("/user/11085245/lzo/part-00040.lzo", classOf[com.hadoop.mapreduce.LzoTextInputFormat],classOf[org.apache.hadoop.io.LongWritable],classOf[org.apache.hadoop.io.Text]).map(_._2.toString).count() # 讀取 gz 文件(正常文件) sc.textFile("/user/11085245/gz/000096_0.gz").count() # 讀取 gz 文件(異常文件) sc.textFile("/user/11085245/gz/000100_0.gz").count()
三、Spark 項目方式掃描
3.1 代碼開發
在章節二中用 spark-shell 能夠復現出異常信息,對於要掃描給定 HDFS 目錄下所有文件的完整性,順理成章地也就想到用 Spark 工程化的代碼(這里選擇用 scala 語言)去實現這一邏輯。其實主要就是三個流程:一是創建 scala 工程,二是根據 HDFS 目錄列出目錄下所有文件,三個根據單個文件調用類似 spark-shell 的 api 去操作文件,並輸出異常壓縮的文件。
idea 工具創建 scala 工程代碼參考 https://blog.csdn.net/qq_32575047/article/details/103045641。
具體的實現代碼如下:
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.hive.HiveContext import java.net.URI import org.apache.hadoop.io.{LongWritable,Text} import com.hadoop.mapreduce.LzoTextInputFormat object FileScan { def main(args: Array[String]) { val sparkConf = new SparkConf() sparkConf.setAppName("SparkFileScan") val sc = new SparkContext(sparkConf) val hiveContext = new HiveContext(sc) if (args.length != 1) { println("------Please type correct input-----------") sc.stop() sys.exit(-1) } // 獲取給定的 HDFS 目錄 val path = args.head // 針對單個文件依次掃描,並輸出異常文件 listFile(path).foreach( patha => try { if (patha.contains(".lzo")) { sc.newAPIHadoopFile[LongWritable, Text, LzoTextInputFormat](patha).map(_._2.toString).count() } else if (patha.contains(".gz")) { sc.textFile(patha).count() } else if (patha.contains("SUCCESS")) { // success 文件不作處理 } else { hiveContext.read.orc(patha).count() } } catch { case e: Exception => println(patha) e.printStackTrace() } ) sc.stop() } /** * 生成 FileSystem 對象 */ def getHdfs(path: String): FileSystem = { val conf = new Configuration(); FileSystem.newInstance(URI.create(path), conf) } /** * 獲取目錄下的一級文件 */ def listFile(path: String): Array[String] = { val hdfs = getHdfs(path) val fs = hdfs.listStatus(new Path(path)) FileUtil.stat2Paths(fs).filter(hdfs.getFileStatus(_).isFile()).map(_.toString) } }
對應的 xml 依賴文件:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.kwang.bigdata</groupId> <artifactId>SparkFileScan</artifactId> <version>1.0-SNAPSHOT</version> <name>${project.artifactId}</name> <properties> <spark.version>1.6.0</spark.version> <scala.version>2.10.5</scala.version> <hadoop.version>2.6.0-cdh5.14.0</hadoop.version> </properties> <dependencies> <!-- scala 依賴 --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- spark依賴 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.10</artifactId> <version>${spark.version}</version> </dependency> <!-- Hadoop 依賴 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${hadoop.version}</version> </dependency> <!-- hadoop lzo 依賴 --> <dependency> <groupId>org.anarres.lzo</groupId> <artifactId>lzo-hadoop</artifactId> <version>1.0.6</version> </dependency> </dependencies> </project>
至此,代碼工作已開發完成,接下來就是如何編譯打包運行項目了。
3.2 項目運行
運行項目執行當然是要把工程打包呀,打包方式如下:
mvn clean package
打包好了當然就是在 Hadoop 集群上去運行 Spark 項目咯,運行方式如下。
spark-submit --master yarn --deploy-mode client --executor-memory 2g --queue root.exquery --class com.vivo.bigdata.FileScan SparkFileScan-1.0-SNAPSHOT.jar hdfs://nameservice/user/11085245/file/ >errfile 參數說明: hdfs://nameservice/user/11085245/file/:要掃描的HDFS目錄 errfile:輸出的異常文件列表
至此,整個項目開發的工作基本完成了。第一次寫項目開發代碼,從如何創建 scala 項目,到寫下 scala 的第一行代碼,到編譯運行項目,摸爬滾打,整個流程都打通了,功能不復雜,但還挺有意思也小有成就感的。^_^
【參考資料】
-
https://stackoverflow.com/questions/62565953/pyspark-read-orc-files-with-new-schema (Spark 讀取 orc 文件)
-
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-LZO-file-in-Spark-td29382.html (Spark 讀取 lzo 文件)
-
https://blog.csdn.net/dkl12/article/details/84312307 (Spark 獲取 HDFS 目錄下文件)