版本:spak2.3
相關源碼:org.apache.spark.SparkContext
在創建spark任務時候,往往會指定一些依賴文件,通常我們可以在spark-submit腳本使用--files /path/to/file指定來實現。
但是公司產品的架構是通過livy來調spark任務,livy的實現其實是對spark-submit的一個包裝,所以如何指定依賴文件歸根到底還是在spark這邊。既然不能通過命令行--files指定,那在編程中怎么指定?任務在各個節點上運行時又是如何獲取到這些文件的呢?
根據spark-submit的參數傳遞源碼分析得知,spark-submit --files其實是由參數"spark.files"接收,所以在代碼中可以通過sparkConf設置該參數。
比如:
SparkConf conf = new SparkConf(); conf.set("spark.files","/path/to/file"); //如果文件是放在hdfs上,可以通過conf.set("spark.files","hdfs:/path/to/file")指定,注意這里只需要加上個hdfs的schema即可,不需要ip port
spark官網關於該參數的解釋:
spark.files Comma-separated list of files to be placed in the working directory of each executor. Globs are allowed.
具體怎么讀取用戶指定的文件相關源碼在SparkContext.scala中,如下(--jars指定依賴jar包同理):
def jars: Seq[String] = _jars def files: Seq[String] = _files ... _jars = Utils.getUserJars(_conf) _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty)) .toSeq.flatten ... // Add each JAR given through the constructor if (jars != null) { jars.foreach(addJar) } if (files != null) { files.foreach(addFile) }
addFile實現如下:
/** * Add a file to be downloaded with this Spark job on every node. * * If a file is added during execution, it will not be available until the next TaskSet starts. * * @param path can be either a local file, a file in HDFS (or other Hadoop-supported * filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs, * use `SparkFiles.get(fileName)` to find its download location. * @param recursive if true, a directory can be given in `path`. Currently directories are * only supported for Hadoop-supported filesystems. * 1. 文件會下載到每一個節點 * 2. 如果在運行中增加文件,那么只有到下一批taskset開始執行時有效 * 3. 文件的位置可以是本地文件,HDFS文件或者其他hadoop支持的文件系統上,HTTP,HTTPS或者FTP URI也可以。在spark jobs中可以通過 * SparkFiles.get(fileName)訪問此文件 * 4. 如果要遞歸獲取文件,那么可以給定一個目錄,但是這種方式只對Hadoop-supported filesystems有效。 */ def addFile(path: String, recursive: Boolean): Unit = { val uri = new Path(path).toUri val schemeCorrectedPath = uri.getScheme match { //如果路徑中不指定schema,也就是null. //在命令行指定--files 時候,--files /home/kong/log4j.properties等同於--files local:/home/kong/log4j.properties case null | "local" => new File(path).getCanonicalFile.toURI.toString case _ => path } val hadoopPath = new Path(schemeCorrectedPath) val scheme = new URI(schemeCorrectedPath).getScheme if (!Array("http", "https", "ftp").contains(scheme)) { val fs = hadoopPath.getFileSystem(hadoopConfiguration) val isDir = fs.getFileStatus(hadoopPath).isDirectory if (!isLocal && scheme == "file" && isDir) { throw new SparkException(s"addFile does not support local directories when not running " + "local mode.") } if (!recursive && isDir) { throw new SparkException(s"Added file $hadoopPath is a directory and recursive is not " + "turned on.") } } else { // SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies Utils.validateURL(uri) } val key = if (!isLocal && scheme == "file") { env.rpcEnv.fileServer.addFile(new File(uri.getPath)) } else { schemeCorrectedPath } val timestamp = System.currentTimeMillis if (addedFiles.putIfAbsent(key, timestamp).isEmpty) { logInfo(s"Added file $path at $key with timestamp $timestamp") // Fetch the file locally so that closures which are run on the driver can still use the // SparkFiles API to access files. Utils.fetchFile(uri.toString, new File(SparkFiles.getRootDirectory()), conf, env.securityManager, hadoopConfiguration, timestamp, useCache = false) postEnvironmentUpdate() } }
在addJar和addFile方法的最后都調用了postEnvironmentUpdate方法,而且在SparkContext初始化過程的
最后也會調用postEnvironmentUpdate,代碼如下:
/** Post the environment update event once the task scheduler is ready */ private def postEnvironmentUpdate() { if (taskScheduler != null) { val schedulingMode = getSchedulingMode.toString val addedJarPaths = addedJars.keys.toSeq val addedFilePaths = addedFiles.keys.toSeq // 通過調用SparkEnv的方法environmentDetails將環境的JVM參數、Spark 屬性、系統屬性、classPath等信息設置為環境明細信息。 val environmentDetails = SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths, addedFilePaths) // 生成SparkListenerEnvironmentUpdate事件,並投遞到事件總線 val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails) listenerBus.post(environmentUpdate) } }
environmentDetails方法:
/** * Return a map representation of jvm information, Spark properties, system properties, and * class paths. Map keys define the category, and map values represent the corresponding * attributes as a sequence of KV pairs. This is used mainly for SparkListenerEnvironmentUpdate. */ private[spark] def environmentDetails( conf: SparkConf, schedulingMode: String, addedJars: Seq[String], addedFiles: Seq[String]): Map[String, Seq[(String, String)]] = { import Properties._ val jvmInformation = Seq( ("Java Version", s"$javaVersion ($javaVendor)"), ("Java Home", javaHome), ("Scala Version", versionString) ).sorted // Spark properties // This includes the scheduling mode whether or not it is configured (used by SparkUI) val schedulerMode = if (!conf.contains("spark.scheduler.mode")) { Seq(("spark.scheduler.mode", schedulingMode)) } else { Seq.empty[(String, String)] } val sparkProperties = (conf.getAll ++ schedulerMode).sorted // System properties that are not java classpaths val systemProperties = Utils.getSystemProperties.toSeq val otherProperties = systemProperties.filter { case (k, _) => k != "java.class.path" && !k.startsWith("spark.") }.sorted // Class paths including all added jars and files val classPathEntries = javaClassPath .split(File.pathSeparator) .filterNot(_.isEmpty) .map((_, "System Classpath")) val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User")) val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted Map[String, Seq[(String, String)]]( "JVM Information" -> jvmInformation, "Spark Properties" -> sparkProperties, "System Properties" -> otherProperties, "Classpath Entries" -> classPaths) }