spark小文件合並


package spark99

import java.io.IOException

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.log4j.Logger
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SaveMode}

/**
* Created by hadoop on 下午11:54.
*/
object FilesManage {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("mergefile")
val sc = new SparkContext(sparkConf)
val sqlContext = new HiveContext(sc)

val logger = Logger.getLogger("org")

val fileSystem = FileSystem.get(sc.hadoopConfiguration)

val srcDataPath = "/tmp/sparkdf2/dayid=20180322/hourid=20"
val mergePath = "/hadoop/merge"
val mergeTime = "20180331-20"
val partitionSize = 128
val result = mergeFiles(sqlContext, fileSystem, mergeTime, srcDataPath, mergePath, partitionSize)
logger.info("result: " + result)
}

/**
* 合並文件
* 合並步驟:
* 1. 將小文件目錄(srcDataPath)下的文件移動到臨時目錄/mergePath/${mergeTime}/src
* 2. 計算臨時目錄(/mergePath/${mergeTime}/src)的大小。 根據大小確定分區的數。
* 3. 使用coalesce或者repartition, 傳入分區數。 將臨時目錄數據寫入臨時的數據目錄(/mergePath/${mergeTime}/data)
* 4. 將臨時數據目錄文件move到文件目錄(srcDataPath)
* 5. 刪除臨時目錄(/merge/tmp)
*
* @param sqlContext
* @param fileSystem
* @param mergeTime 批次的時間
* @param srcDataPath 需要合並文件目錄
* @param mergePath 合並文件的臨時目錄
* @return
*/
def mergeFiles(sqlContext:SQLContext, fileSystem:FileSystem, mergeTime:String,
srcDataPath:String, mergePath:String, partitionSize:Int): String ={
val mergeSrcPath = mergePath + "/" + mergeTime + "/src"
val mergeDataPath = mergePath + "/" + mergeTime + "/data"

var mergeInfo = "merge success"

try{

// 將需要合並的文件mv到臨時目錄
moveFiles(fileSystem, mergeTime, srcDataPath, mergeSrcPath, true)

val partitionNum = computePartitionNum(fileSystem, mergeSrcPath, partitionSize)

val srcDF = sqlContext.read.format("orc").load(mergeSrcPath + "/")

// 將合並目錄的src子目錄下的文件合並后保存到合並目錄的data子目錄下
srcDF.coalesce(partitionNum).write.format("orc").mode(SaveMode.Overwrite).save(mergeDataPath)

// 將合並目錄的data目錄下的文件移動到原目錄
moveFiles(fileSystem, mergeTime, mergeDataPath, srcDataPath, false)

// 刪除 合並目錄src的子目錄
fileSystem.delete(new Path(mergePath + "/" + mergeTime), true)

}catch {
case e:Exception => {
e.printStackTrace()
mergeInfo = "merge failed"
}
}

mergeInfo
}


/**
*
* 將源目錄中的文件移動到目標目錄中
*
* @param fileSystem
* @param mergeTime
* @param fromDir
* @param destDir
* @param ifTruncDestDir
*/
def moveFiles(fileSystem: FileSystem, mergeTime: String, fromDir: String,
destDir: String, ifTruncDestDir: Boolean): Unit = {

val fromDirPath = new Path(fromDir)
val destDirPath = new Path(destDir)

if (!fileSystem.exists(new Path(destDir))) {
fileSystem.mkdirs(destDirPath.getParent)
}

// 是否清空目標目錄下面的所有文件
if (ifTruncDestDir) {
fileSystem.globStatus(new Path(destDir + "/*") ).foreach(x => fileSystem.delete(x.getPath(), true))
}

var num = 0
fileSystem.globStatus(new Path(fromDir + "/*")).foreach(x => {

val fromLocation = x.getPath().toString
val fileName = fromLocation.substring(fromLocation.lastIndexOf("/") + 1)
val fromPath = new Path(fromLocation)

if (fileName != "_SUCCESS") {
var destLocation = fromLocation.replace(fromDir, destDir)
val fileSuffix = if (fileName.contains("."))
fileName.substring(fileName.lastIndexOf(".")) else ""
val newFileName = mergeTime + "_" + num + fileSuffix

destLocation = destLocation.substring(0, destLocation.lastIndexOf("/") + 1) + newFileName
num = num + 1
val destPath = new Path(destLocation)

if (!fileSystem.exists(destPath.getParent)) {
fileSystem.mkdirs(destPath.getParent)
}
fileSystem.rename(fromPath, destPath) // hdfs dfs -mv
}

})
}

/**
* 根據目錄下文件的大小計算partition數
*
* @param fileSystem
* @param filePath
* @param partitionSize
* @return
*/
def computePartitionNum(fileSystem: FileSystem, filePath: String, partitionSize: Int): Int = {
val path = new Path(filePath)
try {
val filesize = fileSystem.getContentSummary(path).getLength
val msize = filesize.asInstanceOf[Double] / 1024 / 1024 / partitionSize
Math.ceil(msize).toInt
} catch {
case e: IOException => e.printStackTrace()
1
}
}

}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM