Spark Parquet file split


轉載:https://my.oschina.net/tjt/blog/2250953


 

在實際使用 spark + parquet 的時候, 遇到了兩個不解的地方:

  1. 我們只有一個 parquet 文件(小於 hdfs block size), 但是 spark 在某個 stage 生成了4個 tasks 來處理.

  2. 4個 tasks 中只有一個 task 處理了所有數據, 其他幾個都沒有處理數據.

這兩個問題牽涉到對於 parquet spark 是如何來進行切分 partitions, 以及每個 partition 要處理哪部分數據的.

先說結論, spark 中, parquet 是 splitable 的, 代碼見ParquetFileFormat#isSplitable. 那會不會把數據切碎? 答案是不會, 因為是以 spark row group 為最小單位切分 parquet 的, 這也會導致一些 partitions 會沒有數據, 極端情況下, 只有一個 row group 的話, partitions 再多, 也只會一個有數據.

接下來開始我們的源碼之旅:

處理流程

1. 根據 parquet 按文件大小切塊生成 partitions:

FileSourceScanExec#createNonBucketedReadRDD 中, 如果文件是 splitable 的, 會按照 maxSplitBytes 把文件切分, 最后生成的數量, 就是 RDD partition 的數量, 這個解釋了不解1, 代碼如下:

val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
      s"open cost is considered as scanning $openCostInBytes bytes.")
​
val splitFiles = selectedPartitions.flatMap { partition =>
  partition.files.flatMap { file =>
    val blockLocations = getBlockLocations(file)
    if (fsRelation.fileFormat.isSplitable(
        fsRelation.sparkSession, fsRelation.options, file.getPath)) {
      (0L until file.getLen by maxSplitBytes).map { offset =>
        val remaining = file.getLen - offset
        val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
        val hosts = getBlockHosts(blockLocations, offset, size)
        PartitionedFile(
          partition.values, file.getPath.toUri.toString, offset, size, hosts)
      }
    } else {
      val hosts = getBlockHosts(blockLocations, 0, file.getLen)
      Seq(PartitionedFile(
        partition.values, file.getPath.toUri.toString, 0, file.getLen, hosts))
    }
  }
}.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
​
val partitions = new ArrayBuffer[FilePartition]
val currentFiles = new ArrayBuffer[PartitionedFile]
var currentSize = 0L/** Close the current partition and move to the next. */
def closePartition(): Unit = {
  if (currentFiles.nonEmpty) {
    val newPartition =
      FilePartition(
        partitions.size,
        currentFiles.toArray.toSeq) // Copy to a new Array.
    partitions += newPartition
  }
  currentFiles.clear()
  currentSize = 0
}
​
// Assign files to partitions using "First Fit Decreasing" (FFD)
splitFiles.foreach { file =>
  if (currentSize + file.length > maxSplitBytes) {
    closePartition()
  }
  // Add the given file to the current partition.
  currentSize += file.length + openCostInBytes
  currentFiles += file
}
closePartition()
​
new FileScanRDD(fsRelation.sparkSession, readFile, partitions)

 

2. 使用 ParquetInputSplit 構造 reader:

ParquetFileFormat#buildReaderWithPartitionValues 實現中, 會使用 split 來初始化 reader, 並且根據配置可以把 reader 分為否是 vectorized 的:

  • vectorizedReader.initialize(split, hadoopAttemptContext)

  • reader.initialize(split, hadoopAttemptContext)

關於 步驟2 在畫外中還有更詳細的代碼, 但與本文的主流程關系不大, 這里先不表.

3. 划分 parquet 的 row groups 到不同的 partitions 中去

在 步驟1 中根據文件大小均分了一些 partitions, 但不是所有這些 partitions 最后都會有數據.

接回 步驟2 中的 init, 在 SpecificParquetRecordReaderBase#initialize 中, 會在 readFooter 的時候傳入一個 RangeMetadataFilter, 這個 filter 的range 是根據你的 split 的邊界來的, 最后會用這個 range 來划定 row group 的歸屬:

public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
    throws IOException, InterruptedException {
    ...
    footer = readFooter(configuration, file, range(inputSplit.getStart(), inputSplit.getEnd()));
    ...
}

 

parquet 的ParquetFileReader#readFooter方法會用到ParquetMetadataConverter#converter.readParquetMetadata(f, filter);, 這個readParquetMetadata對於RangeMetadataFilter的處理是:

@Override
public FileMetaData visit(RangeMetadataFilter filter) throws IOException {
  return filterFileMetaDataByMidpoint(readFileMetaData(from), filter);
}

 

終於到了最關鍵的切分的地方, 最關鍵的就是這一段, 誰擁有這個 row group的中點, 誰就可以處理這個 row group.

現在假設我們有一個40m 的文件, 只有一個 row group, 10m 一分, 那么將會有4個 partitions, 但是只有一個 partition 會占有這個 row group 的中點, 所以也只有這一個 partition 會有數據.

long midPoint = startIndex + totalSize / 2;
if (filter.contains(midPoint)) {
  newRowGroups.add(rowGroup);
}

 

完整代碼如下:

static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMetadataFilter filter) {
  List<RowGroup> rowGroups = metaData.getRow_groups();
  List<RowGroup> newRowGroups = new ArrayList<RowGroup>();
  for (RowGroup rowGroup : rowGroups) {
    long totalSize = 0;
    long startIndex = getOffset(rowGroup.getColumns().get(0));
    for (ColumnChunk col : rowGroup.getColumns()) {
      totalSize += col.getMeta_data().getTotal_compressed_size();
    }
    long midPoint = startIndex + totalSize / 2;
    if (filter.contains(midPoint)) {
      newRowGroups.add(rowGroup);
    }
  }
  metaData.setRow_groups(newRowGroups);
  return metaData;
}

 

畫外:

步驟2 中的代碼其實是 spark 正兒八經如何讀文件的代碼, 最后返回一個FileScanRDD, 也很值得順路看一下, 完整代碼如下:

  (file: PartitionedFile) => {
      assert(file.partitionValues.numFields == partitionSchema.size)
​
      val fileSplit =
        new FileSplit(new Path(new URI(file.filePath)), file.start, file.length, Array.empty)
​
      val split =
        new org.apache.parquet.hadoop.ParquetInputSplit(
          fileSplit.getPath,
          fileSplit.getStart,
          fileSplit.getStart + fileSplit.getLength,
          fileSplit.getLength,
          fileSplit.getLocations,
          null)
​
      val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
      val hadoopAttemptContext =
        new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)
​
      // Try to push down filters when filter push-down is enabled.
      // Notice: This push-down is RowGroups level, not individual records.
      if (pushed.isDefined) {
        ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
      }
      val parquetReader = if (enableVectorizedReader) {
        val vectorizedReader = new VectorizedParquetRecordReader()
        vectorizedReader.initialize(split, hadoopAttemptContext)
        logDebug(s"Appending $partitionSchema ${file.partitionValues}")
        vectorizedReader.initBatch(partitionSchema, file.partitionValues)
        if (returningBatch) {
          vectorizedReader.enableReturningBatches()
        }
        vectorizedReader
      } else {
        logDebug(s"Falling back to parquet-mr")
        // ParquetRecordReader returns UnsafeRow
        val reader = pushed match {
          case Some(filter) =>
            new ParquetRecordReader[UnsafeRow](
              new ParquetReadSupport,
              FilterCompat.get(filter, null))
          case _ =>
            new ParquetRecordReader[UnsafeRow](new ParquetReadSupport)
        }
        reader.initialize(split, hadoopAttemptContext)
        reader
      }
​
      val iter = new RecordReaderIterator(parquetReader)
      Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))
​
      // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy.
      if (parquetReader.isInstanceOf[VectorizedParquetRecordReader] &&
          enableVectorizedReader) {
        iter.asInstanceOf[Iterator[InternalRow]]
      } else {
        val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
        val joinedRow = new JoinedRow()
        val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
​
        // This is a horrible erasure hack...  if we type the iterator above, then it actually check
        // the type in next() and we get a class cast exception.  If we make that function return
        // Object, then we can defer the cast until later!
        if (partitionSchema.length == 0) {
          // There is no partition columns
          iter.asInstanceOf[Iterator[InternalRow]]
        } else {
          iter.asInstanceOf[Iterator[InternalRow]]
            .map(d => appendPartitionColumns(joinedRow(d, file.partitionValues)))
        }
      }
    }

 

這個返回的(PartitionedFile) => Iterator[InternalRow], 是在FileSourceScanExec#inputRDD用的

private lazy val inputRDD: RDD[InternalRow] = {
  val readFile: (PartitionedFile) => Iterator[InternalRow] =
    relation.fileFormat.buildReaderWithPartitionValues(
      sparkSession = relation.sparkSession,
      dataSchema = relation.dataSchema,
      partitionSchema = relation.partitionSchema,
      requiredSchema = requiredSchema,
      filters = pushedDownFilters,
      options = relation.options,
      hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
​
  relation.bucketSpec match {
    case Some(bucketing) if relation.sparkSession.sessionState.conf.bucketingEnabled =>
      createBucketedReadRDD(bucketing, readFile, selectedPartitions, relation)
    case _ =>
      createNonBucketedReadRDD(readFile, selectedPartitions, relation)
  }
}

 

FileScanRDD

class FileScanRDD(
    @transient private val sparkSession: SparkSession,
    readFunction: (PartitionedFile) => Iterator[InternalRow],
    @transient val filePartitions: Seq[FilePartition])
  extends RDD[InternalRow](sparkSession.sparkContext, Nil) {
​
  override def compute(split: RDDPartition, context: TaskContext): Iterator[InternalRow] = {
​
    private[this] val files = split.asInstanceOf[FilePartition].files.toIterator
    private[this] var currentFile: PartitionedFile = null // 根據 currentFile = files.next() 來的, 具體實現我就不貼了 有興趣的可以自己看下.
    ...
    readFunction(currentFile)
    ...
  }
}

 

結論

提升一個 parquet 中的 row group 中的行數閾值, 籍此提示 spark 並行度.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM