spark sql執行insert overwrite table時,寫到新表或者新分區的文件個數,有可能是200個,也有可能是任意個,為什么會有這種差別?
首先看一下spark sql執行insert overwrite table流程:
- 1 創建臨時目錄,比如2 將數據寫到臨時目錄;
- .hive-staging_hive_2018-06-23_00-39-39_825_3122897139441535352-2312/-ext-10000
- 2 執行loadTable或loadPartition將臨時目錄數據move到正式目錄;
對應的代碼為:
org.apache.spark.sql.hive.execution.InsertIntoHiveTable
case class InsertIntoHiveTable( table: MetastoreRelation, partition: Map[String, Option[String]], child: SparkPlan, overwrite: Boolean, ifNotExists: Boolean) extends UnaryExecNode { ... protected[sql] lazy val sideEffectResult: Seq[InternalRow] = { ... val tmpLocation = getExternalTmpPath(tableLocation, hadoopConf) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) ... @transient val outputClass = writerContainer.newSerializer(table.tableDesc).getSerializedClass saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer) ... private def saveAsHiveFile( rdd: RDD[InternalRow], valueClass: Class[_], fileSinkConf: FileSinkDesc, conf: SerializableJobConf, writerContainer: SparkHiveWriterContainer): Unit = { assert(valueClass != null, "Output value class not set") conf.value.setOutputValueClass(valueClass) val outputFileFormatClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName assert(outputFileFormatClassName != null, "Output format class not set") conf.value.set("mapred.output.format.class", outputFileFormatClassName) FileOutputFormat.setOutputPath( conf.value, SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName(), conf.value)) log.debug("Saving as hadoop file of type " + valueClass.getSimpleName) writerContainer.driverSideSetup() sqlContext.sparkContext.runJob(rdd, writerContainer.writeToFile _) writerContainer.commitJob() }
下面先看第一步創建臨時目錄過程,即getExternalTmpPath
val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") def getExternalTmpPath(path: Path, hadoopConf: Configuration): Path = { val extURI: URI = path.toUri if (extURI.getScheme == "viewfs") { getExtTmpPathRelTo(path.getParent, hadoopConf) } else { new Path(getExternalScratchDir(extURI, hadoopConf), "-ext-10000") } } private def getExternalScratchDir(extURI: URI, hadoopConf: Configuration): Path = { getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), hadoopConf) } private def getStagingDir(inputPath: Path, hadoopConf: Configuration): Path = { val inputPathUri: URI = inputPath.toUri val inputPathName: String = inputPathUri.getPath val fs: FileSystem = inputPath.getFileSystem(hadoopConf) val stagingPathName: String =
if (inputPathName.indexOf(stagingDir) == -1) { new Path(inputPathName, stagingDir).toString } else { inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length) } val dir: Path = fs.makeQualified( new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID)) logDebug("Created staging dir = " + dir + " for path = " + inputPath) try { if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) { throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'") } fs.deleteOnExit(dir) } catch { case e: IOException =>
throw new RuntimeException( "Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e) } return dir } private def executionId: String = { val rand: Random = new Random val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US) "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong) }
臨時目錄組成為【.hive-staging(配置hive.exec.stagingdir)】_【hive(硬編碼)】_【2018-06-23_00-39-39_825(時分秒)】_【3122897139441535352(隨機串)】_【2312(taskId)】/-ext-10000(硬編碼)
下面看寫文件過程,即
sqlContext.sparkContext.runJob(rdd, writerContainer.writeToFile _)
org.apache.spark.SparkContext
/** * Run a job on all partitions in an RDD and return the results in an array. */ def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = { runJob(rdd, func, 0 until rdd.partitions.length) }
可見是將rdd逐個分區執行寫入操作,rdd有多少個分區就會寫入多少個文件,rdd是通過child.execute返回的,即SparkPlan.execute,下面看SparkPlan
org.apache.spark.sql.execution.SparkPlan
final def execute(): RDD[InternalRow] = executeQuery { doExecute() } protected def doExecute(): RDD[InternalRow]
doExecute是抽象方法,執行計划中的過程都對應到SparkPlan的子類,比如Project對應ProjectExec,SortMergeJoin對應SortMergeJoinExec;
SparkPlan是由SparkPlanner生成的,下面看SparkPlanner:
org.apache.spark.sql.execution.SparkPlanner
def numPartitions: Int = conf.numShufflePartitions
這里直接取的是SQLConf.numShufflePartitions,下面看SQLConf:
org.apache.spark.sql.internal.SQLConf
val SHUFFLE_PARTITIONS = SQLConfigBuilder("spark.sql.shuffle.partitions") .doc("The default number of partitions to use when shuffling data for joins or aggregations.") .intConf .createWithDefault(200) def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
這里取的是配置spark.sql.shuffle.partitions,默認200;那么分區數量是怎樣用到的?下面看BasicOperators:
org.apache.spark.sql.execution.SparkStrategies.BasicOperators
def numPartitions: Int = self.numPartitions def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { ... case logical.RepartitionByExpression(expressions, child, nPartitions) => exchange.ShuffleExchange(HashPartitioning( expressions, nPartitions.getOrElse(numPartitions)), planLater(child)) :: Nil
可見shuffle過程會根據numPartitions來創建HashPartitioning,如果sql執行過程需要shuffle(比如有join,group by等操作),那么默認會寫200個文件;如果sql執行過程沒有shuffle,那么會由HiveTableScan和Filter等來決定寫多少個文件;
也可以通過執行計划來看,如果有shuffle過程,執行計划中通常有這么一步:
: +- Exchange(coordinator id: 371605426) hashpartitioning(id#60, 200), coordinator[target post-shuffle partition size: 67108864]
其中hashpartitioning(id#60, 200)中的200就是spark.sql.shuffle.partitions的默認值;
附ShuffleExchange過程:
org.apache.spark.sql.execution.exchange.ShuffleExchange
def apply(newPartitioning: Partitioning, child: SparkPlan): ShuffleExchange = { ShuffleExchange(newPartitioning, child, coordinator = Option.empty[ExchangeCoordinator]) } protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { // Returns the same ShuffleRowRDD if this plan is used by multiple plans.
if (cachedShuffleRDD == null) { cachedShuffleRDD = coordinator match { case Some(exchangeCoordinator) => val shuffleRDD = exchangeCoordinator.postShuffleRDD(this) assert(shuffleRDD.partitions.length == newPartitioning.numPartitions) shuffleRDD case None => val shuffleDependency = prepareShuffleDependency() preparePostShuffleRDD(shuffleDependency) } } cachedShuffleRDD } /** * Returns a [[ShuffleDependency]] that will partition rows of its child based on * the partitioning scheme defined in `newPartitioning`. Those partitions of * the returned ShuffleDependency will be the input of shuffle. */
private[exchange] def prepareShuffleDependency() : ShuffleDependency[Int, InternalRow, InternalRow] = { ShuffleExchange.prepareShuffleDependency( child.execute(), child.output, newPartitioning, serializer) } /** * Returns a [[ShuffledRowRDD]] that represents the post-shuffle dataset. * This [[ShuffledRowRDD]] is created based on a given [[ShuffleDependency]] and an optional * partition start indices array. If this optional array is defined, the returned * [[ShuffledRowRDD]] will fetch pre-shuffle partitions based on indices of this array. */
private[exchange] def preparePostShuffleRDD( shuffleDependency: ShuffleDependency[Int, InternalRow, InternalRow], specifiedPartitionStartIndices: Option[Array[Int]] = None): ShuffledRowRDD = { // If an array of partition start indices is provided, we need to use this array // to create the ShuffledRowRDD. Also, we need to update newPartitioning to // update the number of post-shuffle partitions.
specifiedPartitionStartIndices.foreach { indices => assert(newPartitioning.isInstanceOf[HashPartitioning]) newPartitioning = UnknownPartitioning(indices.length) } new ShuffledRowRDD(shuffleDependency, specifiedPartitionStartIndices) } /** * Returns a [[ShuffleDependency]] that will partition rows of its child based on * the partitioning scheme defined in `newPartitioning`. Those partitions of * the returned ShuffleDependency will be the input of shuffle. */ def prepareShuffleDependency( rdd: RDD[InternalRow], outputAttributes: Seq[Attribute], newPartitioning: Partitioning, serializer: Serializer): ShuffleDependency[Int, InternalRow, InternalRow] = { val part: Partitioner = newPartitioning match { case RoundRobinPartitioning(numPartitions) => new HashPartitioner(numPartitions) case HashPartitioning(_, n) =>
new Partitioner { override def numPartitions: Int = n // For HashPartitioning, the partitioning key is already a valid partition ID, as we use // `HashPartitioning.partitionIdExpression` to produce partitioning key.
override def getPartition(key: Any): Int = key.asInstanceOf[Int] } case RangePartitioning(sortingExpressions, numPartitions) =>
// Internally, RangePartitioner runs a job on the RDD that samples keys to compute // partition bounds. To get accurate samples, we need to copy the mutable keys.
val rddForSampling = rdd.mapPartitionsInternal { iter => val mutablePair = new MutablePair[InternalRow, Null]() iter.map(row => mutablePair.update(row.copy(), null)) } implicit val ordering = new LazilyGeneratedOrdering(sortingExpressions, outputAttributes) new RangePartitioner(numPartitions, rddForSampling, ascending = true) case SinglePartition =>
new Partitioner { override def numPartitions: Int = 1
override def getPartition(key: Any): Int = 0 } case _ => sys.error(s"Exchange not implemented for $newPartitioning") // TODO: Handle BroadcastPartitioning.
} def getPartitionKeyExtractor(): InternalRow => Any = newPartitioning match { case RoundRobinPartitioning(numPartitions) =>
// Distributes elements evenly across output partitions, starting from a random partition.
var position = new Random(TaskContext.get().partitionId()).nextInt(numPartitions) (row: InternalRow) => { // The HashPartitioner will handle the `mod` by the number of partitions
position += 1 position } case h: HashPartitioning => val projection = UnsafeProjection.create(h.partitionIdExpression :: Nil, outputAttributes) row => projection(row).getInt(0) case RangePartitioning(_, _) | SinglePartition => identity case _ => sys.error(s"Exchange not implemented for $newPartitioning") } val rddWithPartitionIds: RDD[Product2[Int, InternalRow]] = { if (needToCopyObjectsBeforeShuffle(part, serializer)) { rdd.mapPartitionsInternal { iter => val getPartitionKey = getPartitionKeyExtractor() iter.map { row => (part.getPartition(getPartitionKey(row)), row.copy()) } } } else { rdd.mapPartitionsInternal { iter => val getPartitionKey = getPartitionKeyExtractor() val mutablePair = new MutablePair[Int, InternalRow]() iter.map { row => mutablePair.update(part.getPartition(getPartitionKey(row)), row) } } } } // Now, we manually create a ShuffleDependency. Because pairs in rddWithPartitionIds // are in the form of (partitionId, row) and every partitionId is in the expected range // [0, part.numPartitions - 1]. The partitioner of this is a PartitionIdPassthrough.
val dependency =
new ShuffleDependency[Int, InternalRow, InternalRow]( rddWithPartitionIds, new PartitionIdPassthrough(part.numPartitions), serializer) dependency }