《深入理解Spark:核心思想與源碼分析》一書前言的內容請看鏈接《深入理解SPARK:核心思想與源碼分析》一書正式出版上市
《深入理解Spark:核心思想與源碼分析》一書第一章的內容請看鏈接《第1章 環境准備》
《深入理解Spark:核心思想與源碼分析》一書第二章的內容請看鏈接《第2章 SPARK設計理念與基本架構》
《深入理解Spark:核心思想與源碼分析》一書第三章第一部分的內容請看鏈接《深入理解Spark:核心思想與源碼分析》——SparkContext的初始化(伯篇)》
《深入理解Spark:核心思想與源碼分析》一書第三章第二部分的內容請看鏈接《深入理解Spark:核心思想與源碼分析》——SparkContext的初始化(仲篇)》
《深入理解Spark:核心思想與源碼分析》一書第三章第三部分的內容請看鏈接《深入理解Spark:核心思想與源碼分析》——SparkContext的初始化(叔篇)》
《深入理解Spark:核心思想與源碼分析》一書第三章第四部分的內容請看鏈接《深入理解Spark:核心思想與源碼分析》——SparkContext的初始化(季篇)》
Utils是Spark中最常用的工具類之一,如果不關心其實現,也不會對理解Spark有太多影響。但是對於Scala或者Spark的初學者來說,通過了解Utils工具類的實現,也是個不錯的入門途徑。下面將逐個介紹Utils工具類提供的常用方法。
1.localHostName
功能描述:獲取本地機器名。
def localHostName(): String = {
customHostname.getOrElse(localIpAddressHostname)
}
2.getDefaultPropertiesFile
功能描述:獲取默認的Spark屬性文件。
def getDefaultPropertiesFile(env: Map[String, String] = sys.env): String = { env.get("SPARK_CONF_DIR") .orElse(env.get("SPARK_HOME").map{ t => s"$t${File.separator}conf"}) .map { t => new File(s"$t${File.separator}spark-defaults.conf")} .filter(_.isFile) .map(_.getAbsolutePath) .orNull }
3.loadDefaultSparkProperties
功能描述:加載指定文件中的Spark屬性,如果沒有指定文件,則加載默認Spark屬性文件的屬性。
def loadDefaultSparkProperties(conf:SparkConf, filePath: String = null):String = { val path =Option(filePath).getOrElse(getDefaultPropertiesFile()) Option(path).foreach { confFile => getPropertiesFromFile(confFile).filter{ case (k,v) => k.startsWith("spark.") }.foreach { case (k, v) => conf.setIfMissing(k, v) sys.props.getOrElseUpdate(k, v) } } path }
4.getCallSite
功能描述:獲取當前SparkContext的當前調用堆棧,將棧里最靠近棧底的屬於spark或者Scala核心的類壓入callStack的棧頂,並將此類的方法存入lastSparkMethod;將棧里最靠近棧頂的用戶類放入callStack,將此類的行號存入firstUserLine,類名存入firstUserFile,最終返回的樣例類CallSite存儲了最短棧和長度默認為20的最長棧的樣例類。在JavaWordCount例子中,獲得的數據如下:
最短棧:JavaSparkContext at JavaWordCount.java:44;
最長棧:org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:61)org.apache.spark.examples.JavaWordCount.main(JavaWordCount.java:44)。
def getCallSite(skipClass: String => Boolean = coreExclusionFunction): CallSite = { val trace = Thread.currentThread.getStackTrace().filterNot { ste: StackTraceElement => ste == null || ste.getMethodName == null || ste.getMethodName.contains("getStackTrace") } var lastSparkMethod = "<unknown>" var firstUserFile = "<unknown>" var firstUserLine = 0 var insideSpark = true var callStack = new ArrayBuffer[String]() :+ "<unknown>" for (el <- trace) { if (insideSpark) { if (skipClass(el.getClassName)) { lastSparkMethod = if (el.getMethodName == "<init>") { el.getClassName.substring(el.getClassName.lastIndexOf('.') + 1) } else { el.getMethodName } callStack(0) = el.toString // Put last Spark method on top of the stack trace. } else { firstUserLine = el.getLineNumber firstUserFile = el.getFileName callStack += el.toString insideSpark = false } } else { callStack += el.toString } } val callStackDepth = System.getProperty("spark.callstack.depth", "20").toInt CallSite( shortForm = s"$lastSparkMethod at $firstUserFile:$firstUserLine", longForm = callStack.take(callStackDepth).mkString("\n")) }
5.startServiceOnPort
功能描述:Scala跟其它腳本語言一樣,函數也可以傳遞,此方法正是通過回調startService這個函數來啟動服務,並最終返回startService返回的service地址及端口。如果啟動過程有異常,還會多次重試,直到達到maxRetries表示的最大次數。
def startServiceOnPort[T]( startPort: Int, startService: Int => (T, Int), conf: SparkConf, serviceName: String = ""): (T, Int) = { require(startPort == 0 || (1024 <= startPort && startPort < 65536), "startPort should be between 1024 and 65535 (inclusive), or 0 for a random free port.") val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" val maxRetries = portMaxRetries(conf) for (offset <- 0 to maxRetries) { val tryPort = if (startPort == 0) { startPort } else { ((startPort + offset - 1024) % (65536 - 1024)) + 1024 } try { val (service, port) = startService(tryPort) logInfo(s"Successfully started service$serviceString on port $port.") return (service, port) } catch { case e: Exception if isBindCollision(e) => if (offset >= maxRetries) { val exceptionMessage = s"${e.getMessage}: Service$serviceString failed after $maxRetries retries!" val exception = new BindException(exceptionMessage) exception.setStackTrace(e.getStackTrace) throw exception } logWarning(s"Service$serviceString could not bind on port $tryPort. " + s"Attempting port ${tryPort + 1}.") } } throw new SparkException(s"Failed to start service$serviceString on port $startPort") }
6.createDirectory
功能描述:用spark+UUID的方式創建臨時文件目錄,如果創建失敗會多次重試,最多重試10次。
def createDirectory(root: String, namePrefix: String = "spark"): File = { var attempts = 0 val maxAttempts = MAX_DIR_CREATION_ATTEMPTS var dir: File = null while (dir == null) { attempts += 1 if (attempts > maxAttempts) { throw new IOException("Failed to create a temp directory (under " + root + ") after " + maxAttempts + " attempts!") } try { dir = new File(root, "spark-" + UUID.randomUUID.toString) if (dir.exists() || !dir.mkdirs()) { dir = null } } catch { case e: SecurityException => dir = null; } } dir }
7.getOrCreateLocalRootDirs
功能描述:根據spark.local.dir的配置,作為本地文件的根目錄,在創建一、二級目錄之前要確保根目錄是存在的。然后調用createDirectory創建一級目錄。
private[spark] def getOrCreateLocalRootDirs(conf: SparkConf): Array[String] = { if (isRunningInYarnContainer(conf)) { getYarnLocalDirs(conf).split(",") } else { Option(conf.getenv("SPARK_LOCAL_DIRS")) .getOrElse(conf.get("spark.local.dir", System.getProperty("java.io.tmpdir"))) .split(",") .flatMap { root => try { val rootDir = new File(root) if (rootDir.exists || rootDir.mkdirs()) { val dir = createDirectory(root) chmod700(dir) Some(dir.getAbsolutePath) } else { logError(s"Failed to create dir in $root. Ignoring this directory.") None } } catch { case e: IOException => logError(s"Failed to create local root dir in $root. Ignoring this directory.") None } } .toArray } }
8.getLocalDir
功能描述:查詢Spark本地文件的一級目錄。
def getLocalDir(conf: SparkConf): String = { getOrCreateLocalRootDirs(conf)(0) }
9.createTempDir
功能描述:在Spark一級目錄下創建臨時目錄,並將目錄注冊到shutdownDeletePaths:scala.collection.mutable.HashSet[String]中。
def createTempDir( root: String = System.getProperty("java.io.tmpdir"), namePrefix: String = "spark"): File = { val dir = createDirectory(root, namePrefix) registerShutdownDeleteDir(dir) dir }
10.RegisterShutdownDeleteDir
功能描述:將目錄注冊到shutdownDeletePaths:scala.collection.mutable.HashSet[String]中,以便在進程退出時刪除。
def registerShutdownDeleteDir(file: File) { val absolutePath = file.getAbsolutePath() shutdownDeletePaths.synchronized { shutdownDeletePaths += absolutePath } }
11.hasRootAsShutdownDeleteDir
功能描述:判斷文件是否匹配關閉時要刪除的文件及目錄,shutdownDeletePaths:scala.collection.mutable.HashSet[String]存儲在進程關閉時要刪除的文件及目錄。
def hasRootAsShutdownDeleteDir(file: File): Boolean = { val absolutePath = file.getAbsolutePath() val retval = shutdownDeletePaths.synchronized { shutdownDeletePaths.exists { path => !absolutePath.equals(path) && absolutePath.startsWith(path) } } if (retval) { logInfo("path = " + file + ", already present as root for deletion.") } retval }
12.deleteRecursively
功能描述:用於刪除文件或者刪除目錄及其子目錄、子文件,並且從shutdownDeletePaths:scala.collection.mutable.HashSet[String]中移除此文件或目錄。
def deleteRecursively(file: File) { if (file != null) { try { if (file.isDirectory && !isSymlink(file)) { var savedIOException: IOException = null for (child <- listFilesSafely(file)) { try { deleteRecursively(child) } catch { case ioe: IOException => savedIOException = ioe } } if (savedIOException != null) { throw savedIOException } shutdownDeletePaths.synchronized { shutdownDeletePaths.remove(file.getAbsolutePath) } } } finally { if (!file.delete()) { if (file.exists()) { throw new IOException("Failed to delete: " + file.getAbsolutePath) } } } } }
13.getSparkClassLoader
功能描述:獲取加載當前class的ClassLoader。
def getSparkClassLoader = getClass.getClassLoader
14.getContextOrSparkClassLoader
功能描述:用於獲取線程上下文的ClassLoader,沒有設置時獲取加載Spark的ClassLoader。
def getContextOrSparkClassLoader =
Option(Thread.currentThread().getContextClassLoader).getOrElse(getSparkClassLoader)
15.newDaemonCachedThreadPool
功能描述:使用Executors.newCachedThreadPool創建的緩存線程池。
def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = { val threadFactory = namedThreadFactory(prefix) Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor] }
16.doFetchFile
功能描述:使用URLConnection通過http協議下載文件。
private def doFetchFile(url: String, targetDir: File, filename: String, conf: SparkConf, securityMgr: SecurityManager, hadoopConf: Configuration) { val tempFile = File.createTempFile("fetchFileTemp", null, new File(targetDir.getAbsolutePath)) val targetFile = new File(targetDir, filename) val uri = new URI(url) val fileOverwrite = conf.getBoolean("spark.files.overwrite", defaultValue = false) Option(uri.getScheme).getOrElse("file") match { case "http" | "https" | "ftp" => logInfo("Fetching " + url + " to " + tempFile) var uc: URLConnection = null if (securityMgr.isAuthenticationEnabled()) { logDebug("fetchFile with security enabled") val newuri = constructURIForAuthentication(uri, securityMgr) uc = newuri.toURL().openConnection() uc.setAllowUserInteraction(false) } else { logDebug("fetchFile not using security") uc = new URL(url).openConnection() } val timeout = conf.getInt("spark.files.fetchTimeout", 60) * 1000 uc.setConnectTimeout(timeout) uc.setReadTimeout(timeout) uc.connect() val in = uc.getInputStream() downloadFile(url, in, tempFile, targetFile, fileOverwrite) case "file" => val sourceFile = if (uri.isAbsolute) new File(uri) else new File(url) copyFile(url, sourceFile, targetFile, fileOverwrite) case _ => val fs = getHadoopFileSystem(uri, hadoopConf) val in = fs.open(new Path(uri)) downloadFile(url, in, tempFile, targetFile, fileOverwrite) } }
17.fetchFile
功能描述:如果文件在本地有緩存,則從本地獲取,否則通過HTTP遠程下載。最后對.tar、.tar.gz等格式的文件解壓縮后,調用shell命令行的chmod命令給文件增加a+x的權限。
def fetchFile( url: String, targetDir: File, conf: SparkConf, securityMgr: SecurityManager, hadoopConf: Configuration, timestamp: Long, useCache: Boolean) { val fileName = url.split("/").last val targetFile = new File(targetDir, fileName) val fetchCacheEnabled = conf.getBoolean("spark.files.useFetchCache", defaultValue = true) if (useCache && fetchCacheEnabled) { val cachedFileName = s"${url.hashCode}${timestamp}_cache" val lockFileName = s"${url.hashCode}${timestamp}_lock" val localDir = new File(getLocalDir(conf)) val lockFile = new File(localDir, lockFileName) val raf = new RandomAccessFile(lockFile, "rw") val lock = raf.getChannel().lock() val cachedFile = new File(localDir, cachedFileName) try { if (!cachedFile.exists()) { doFetchFile(url, localDir, cachedFileName, conf, securityMgr, hadoopConf) } } finally { lock.release() } copyFile( url, cachedFile, targetFile, conf.getBoolean("spark.files.overwrite", false) ) } else { doFetchFile(url, targetDir, fileName, conf, securityMgr, hadoopConf) } if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) { logInfo("Untarring " + fileName) Utils.execute(Seq("tar", "-xzf", fileName), targetDir) } else if (fileName.endsWith(".tar")) { logInfo("Untarring " + fileName) Utils.execute(Seq("tar", "-xf", fileName), targetDir) } FileUtil.chmod(targetFile.getAbsolutePath, "a+x") }
18.executeAndGetOutput
功能描述:執行一條command命令,並且獲取它的輸出。調用stdoutThread的join方法,讓當前線程等待stdoutThread執行完成。
def executeAndGetOutput( command: Seq[String], workingDir: File = new File("."), extraEnvironment: Map[String, String] = Map.empty): String = { val builder = new ProcessBuilder(command: _*) .directory(workingDir) val environment = builder.environment() for ((key, value) <- extraEnvironment) { environment.put(key, value) } val process = builder.start() new Thread("read stderr for " + command(0)) { override def run() { for (line <- Source.fromInputStream(process.getErrorStream).getLines()) { System.err.println(line) } } }.start() val output = new StringBuffer val stdoutThread = new Thread("read stdout for " + command(0)) { override def run() { for (line <- Source.fromInputStream(process.getInputStream).getLines()) { output.append(line) } } } stdoutThread.start() val exitCode = process.waitFor() stdoutThread.join() // Wait for it to finish reading output if (exitCode != 0) { logError(s"Process $command exited with code $exitCode: $output") throw new SparkException(s"Process $command exited with code $exitCode") } output.toString }
19.memoryStringToMb
功能描述:將內存大小字符串轉換為以MB為單位的整型值。
def memoryStringToMb(str: String): Int = { val lower = str.toLowerCase if (lower.endsWith("k")) { (lower.substring(0, lower.length-1).toLong / 1024).toInt } else if (lower.endsWith("m")) { lower.substring(0, lower.length-1).toInt } else if (lower.endsWith("g")) { lower.substring(0, lower.length-1).toInt * 1024 } else if (lower.endsWith("t")) { lower.substring(0, lower.length-1).toInt * 1024 * 1024 } else {// no suffix, so it's just a number in bytes (lower.toLong / 1024 / 1024).toInt } }