Spark SQL(9)-Spark SQL JOIN操作源碼總結
本文主要總結下spark sql join操作的實現,本文會根據spark sql 的源碼來總結其具體的實現;大體流程還是從sql語句到邏輯算子樹再到analyzed-> optimized -> 物理計划及其處理邏輯進行大致的總結。
Join邏輯算子樹
先來一個sql:
SELECT NAME FROM NAME LEFT JOIN NAME2 ON NAME = NAME JOIN NAME3 ON NAME = NAME
這條sql形成的邏輯算子樹為:
上圖的樹結構的生成;主要關注join部分就可以;其源碼在AstBuilder中:
override def visitFromClause(ctx: FromClauseContext): LogicalPlan = withOrigin(ctx) { val from = ctx.relation.asScala.foldLeft(null: LogicalPlan) { (left, relation) => val right = plan(relation.relationPrimary) val join = right.optionalMap(left)(Join(_, _, Inner, None)) withJoinRelations(join, relation) } ctx.lateralView.asScala.foldLeft(from)(withGenerate) }
private def withJoinRelations(base: LogicalPlan, ctx: RelationContext): LogicalPlan = { val pp = ctx.joinRelation pp.asScala.foldLeft(base) { (left, join) => withOrigin(join) { val baseJoinType = join.joinType match { case null => Inner case jt if jt.CROSS != null => Cross case jt if jt.FULL != null => FullOuter case jt if jt.SEMI != null => LeftSemi case jt if jt.ANTI != null => LeftAnti case jt if jt.LEFT != null => LeftOuter case jt if jt.RIGHT != null => RightOuter case _ => Inner } // Resolve the join type and join condition val (joinType, condition) = Option(join.joinCriteria) match { case Some(c) if c.USING != null => (UsingJoin(baseJoinType, c.identifier.asScala.map(_.getText)), None) case Some(c) if c.booleanExpression != null => (baseJoinType, Option(expression(c.booleanExpression))) case None if join.NATURAL != null => if (baseJoinType == Cross) { throw new ParseException("NATURAL CROSS JOIN is not supported", ctx) } (NaturalJoin(baseJoinType), None) case None => (baseJoinType, None) } Join(left, plan(join.right), joinType, condition) } } }
從上圖可以看出來對於join的操作,形成的樹結構里面,保存的join關系是一個list<JoinReleation>,每個joinRelation包含了JoinType、relationPrimary以及joinCriteria;其中joinCriteria相當於是booleanExpression操作。
之后就是Join Analyzed 以及optimized 操作,在這里倆步主要操作就是添加子查詢別名等操作,之后在優化階段算子下推、消除子查詢別名等優化;這里面涉及的規則比較多,感興趣的同學可以查看源碼多研究研究;
物理計划階段
這一步主要涉及到 SparkPlanner 中配置的各種strategies,在這些策略中主要關注JoinSelection部分就行,他的apply方如下:
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { // --- BroadcastHashJoin -------------------------------------------------------------------- // broadcast hints were specified case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) if canBroadcastByHints(joinType, left, right) => val buildSide = broadcastSideByHints(joinType, left, right) Seq(joins.BroadcastHashJoinExec( leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right))) // broadcast hints were not specified, so need to infer it from size and configuration. case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) if canBroadcastBySizes(joinType, left, right) => val buildSide = broadcastSideBySizes(joinType, left, right) Seq(joins.BroadcastHashJoinExec( leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right))) // --- ShuffledHashJoin --------------------------------------------------------------------- case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) if !conf.preferSortMergeJoin && canBuildRight(joinType) && canBuildLocalHashMap(right) && muchSmaller(right, left) || !RowOrdering.isOrderable(leftKeys) => Seq(joins.ShuffledHashJoinExec( leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right))) case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) if !conf.preferSortMergeJoin && canBuildLeft(joinType) && canBuildLocalHashMap(left) && muchSmaller(left, right) || !RowOrdering.isOrderable(leftKeys) => Seq(joins.ShuffledHashJoinExec( leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), planLater(right))) // --- SortMergeJoin ------------------------------------------------------------ case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) if RowOrdering.isOrderable(leftKeys) => joins.SortMergeJoinExec( leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil // --- Without joining keys ------------------------------------------------------------ // Pick BroadcastNestedLoopJoin if one side could be broadcast case j @ logical.Join(left, right, joinType, condition) if canBroadcastByHints(joinType, left, right) => val buildSide = broadcastSideByHints(joinType, left, right) joins.BroadcastNestedLoopJoinExec( planLater(left), planLater(right), buildSide, joinType, condition) :: Nil case j @ logical.Join(left, right, joinType, condition) if canBroadcastBySizes(joinType, left, right) => val buildSide = broadcastSideBySizes(joinType, left, right) joins.BroadcastNestedLoopJoinExec( planLater(left), planLater(right), buildSide, joinType, condition) :: Nil // Pick CartesianProduct for InnerJoin case logical.Join(left, right, _: InnerLike, condition) => joins.CartesianProductExec(planLater(left), planLater(right), condition) :: Nil case logical.Join(left, right, joinType, condition) => val buildSide = broadcastSide( left.stats.hints.broadcast, right.stats.hints.broadcast, left, right) // This join could be very slow or OOM joins.BroadcastNestedLoopJoinExec( planLater(left), planLater(right), buildSide, joinType, condition) :: Nil // --- Cases where this strategy does not apply --------------------------------------------- case _ => Nil } }
從上面的代碼可以看出其根據不同的條件生成不同的join操作:BroadcastHashJoinExec、ShuffledHashJoinExec、SortMergeJoinExec、BroadcastNestedLoopJoinExec;
在介紹在四個操作之前,先介紹下join操作實現的大體思想:
假設有倆張表,在spark中進行操作的時候;
一張表為流表;一張表為構建表;默認的大表為流表,小表為構建表;基於流表的迭代,然后和構建表進行匹配,生成join之后的行數據。其實可以想象一種極端情況;大表特別的大有幾百萬行數據,小表數據只有10行,這個時候只需要迭代遍歷流表,然后去小表(構建表)去匹配數據,匹配到之后生成join完成之后的行;
在spark中join的大體實現是分流表和構建表;基於這倆個角色來實現join操作。接下來簡單介紹下上面的幾種join操作:
1、BroadcastHashJoinExec主要通過廣播形式實現join操作;其生成的條件是:一種是標記了hint;並且可以創建構建右表或者構建左表;另外一種是小表小於配置的spark.sql.autoBroadcastJoinThreshold參數的大小,則會進行基於廣播的join;這里面spark會先將構建表的數據拉倒driver端,之后再分發到各個worker節點,所以這一步如果構建表比較大的情況下對spark的driver節點來說可能會有壓力。
2、ShuffledHashJoinExec 通過shuffle之后在內存中保存join構建表來實現join操作;其生成的條件是:可以構建左表或者右表,其次表的大小小於分區數和配置的廣播參數的乘積(保證可以加載到本地內存進行計算),並且打開了優先考慮基於hash join的開關、其次需要保證構建表足夠小(構建表*3小於流表);其主要思想就是對流表進行迭代,之后和內存中的構建表數據匹配生成join之后的行數據。
3、SortMergeJoinExec 通過shuffle操作之后進行排序,再然后進行基於排序的join操作;如果上述倆個都不滿足的情況就會進行就排序的join(前提是可以排序);排序的join就是先對數據進行shuffle分區,保證相同的key分到相同的分區,之后進行排序操作,保證數據有序,之后進行merge join操作,同時讀取流表和構建表;因為數據有序,所以只要順序遍歷流表和構建表;匹配生成join行數據就行
4、BroadcastNestedLoopJoinExec 主要針對的是沒有join條件的連接操作;暫時不做研究;
接下來主要總結下hashJoin和SortMergeJoinExec的實現邏輯;
ShuffledHashJoinExec
private def buildHashedRelation(iter: Iterator[InternalRow]): HashedRelation = { val buildDataSize = longMetric("buildDataSize") val buildTime = longMetric("buildTime") val start = System.nanoTime() val context = TaskContext.get() val relation = HashedRelation(iter, buildKeys, taskMemoryManager = context.taskMemoryManager()) buildTime += (System.nanoTime() - start) / 1000000 buildDataSize += relation.estimatedSize // This relation is usually used until the end of task. context.addTaskCompletionListener(_ => relation.close()) relation } protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") val avgHashProbe = longMetric("avgHashProbe") streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, buildIter) => val hashed = buildHashedRelation(buildIter) join(streamIter, hashed, numOutputRows, avgHashProbe) } }
先看上面的doExecute方法,一般物理計划都是觸發這個方法來執行的,這里主要的邏輯是調用了buildHashedRelation方法,在這個方法中主要關注HashedRelation就行:
private[execution] object HashedRelation { /** * Create a HashedRelation from an Iterator of InternalRow. */ def apply( input: Iterator[InternalRow], key: Seq[Expression], sizeEstimate: Int = 64, taskMemoryManager: TaskMemoryManager = null): HashedRelation = { val mm = Option(taskMemoryManager).getOrElse { new TaskMemoryManager( new StaticMemoryManager( new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"), Long.MaxValue, Long.MaxValue, 1), 0) } if (key.length == 1 && key.head.dataType == LongType) { LongHashedRelation(input, key, sizeEstimate, mm) } else { UnsafeHashedRelation(input, key, sizeEstimate, mm) } } }
這里面根據類型dataType如果是long那么就生成LongHashedRelation(基於LongToUnsafeRowMap實現),如果不是就是UnsafeHashedRelation(基於BytesToBytesMap實現)這里主要關注UnsafeHashedRelation就行:
private[joins] object UnsafeHashedRelation { def apply( input: Iterator[InternalRow], key: Seq[Expression], sizeEstimate: Int, taskMemoryManager: TaskMemoryManager): HashedRelation = { val pageSizeBytes = Option(SparkEnv.get).map(_.memoryManager.pageSizeBytes) .getOrElse(new SparkConf().getSizeAsBytes("spark.buffer.pageSize", "16m")) val binaryMap = new BytesToBytesMap( taskMemoryManager, // Only 70% of the slots can be used before growing, more capacity help to reduce collision (sizeEstimate * 1.5 + 1).toInt, pageSizeBytes, true) // Create a mapping of buildKeys -> rows val keyGenerator = UnsafeProjection.create(key) var numFields = 0 while (input.hasNext) { val row = input.next().asInstanceOf[UnsafeRow] numFields = row.numFields() val key = keyGenerator(row) if (!key.anyNull) { val loc = binaryMap.lookup(key.getBaseObject, key.getBaseOffset, key.getSizeInBytes) val success = loc.append( key.getBaseObject, key.getBaseOffset, key.getSizeInBytes, row.getBaseObject, row.getBaseOffset, row.getSizeInBytes) if (!success) { binaryMap.free() throw new SparkException("There is no enough memory to build hash map") } } } new UnsafeHashedRelation(numFields, binaryMap) }
從上面的代碼可以看出,這里主要是根據從ShuffledHashJoinExec傳過來的buildKeys,構建一個基於buildKeys和rows的映射表,其實就是上面提到的構建表。這里准備好構建表之后,回到上面提到的ShuffledHashJoinExec.doExecute中可以看到:
protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") val avgHashProbe = longMetric("avgHashProbe") streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, buildIter) => val hashed = buildHashedRelation(buildIter) join(streamIter, hashed, numOutputRows, avgHashProbe) } }
可以看到基於streamIter(流表)、hashed(構建表)構成了一個join操作:
protected def join( streamedIter: Iterator[InternalRow], hashed: HashedRelation, numOutputRows: SQLMetric, avgHashProbe: SQLMetric): Iterator[InternalRow] = { val joinedIter = joinType match { case _: InnerLike => innerJoin(streamedIter, hashed) case LeftOuter | RightOuter => outerJoin(streamedIter, hashed) case LeftSemi => semiJoin(streamedIter, hashed) case LeftAnti => antiJoin(streamedIter, hashed) case j: ExistenceJoin => existenceJoin(streamedIter, hashed) case x => throw new IllegalArgumentException( s"BroadcastHashJoin should not take $x as the JoinType") } // At the end of the task, we update the avg hash probe. TaskContext.get().addTaskCompletionListener(_ => avgHashProbe.set(hashed.getAverageProbesPerLookup)) val resultProj = createResultProjection joinedIter.map { r => numOutputRows += 1 resultProj(r) } }
這里可以看看innerJoin的操作:
private def innerJoin( streamIter: Iterator[InternalRow], hashedRelation: HashedRelation): Iterator[InternalRow] = { val joinRow = new JoinedRow val joinKeys = streamSideKeyGenerator() streamIter.flatMap { srow => joinRow.withLeft(srow) val matches = hashedRelation.get(joinKeys(srow)) if (matches != null) { matches.map(joinRow.withRight(_)).filter(boundCondition) } else { Seq.empty } } }
可以看出,遍歷流表,從構建表獲取相同的key,如果不為空就構建joinRow,並應用join的條件進行篩選。到這里整個hash join的實現就算是完成了。對於其他類型的join可以自己跟代碼閱讀。
SortMergeJoinExec
doExecute方法如下:
protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") val spillThreshold = getSpillThreshold val inMemoryThreshold = getInMemoryThreshold left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) => val boundCondition: (InternalRow) => Boolean = { condition.map { cond => newPredicate(cond, left.output ++ right.output).eval _ }.getOrElse { (r: InternalRow) => true } } // An ordering that can be used to compare keys from both sides. val keyOrdering = newNaturalAscendingOrdering(leftKeys.map(_.dataType)) val resultProj: InternalRow => InternalRow = UnsafeProjection.create(output, output) joinType match { case _: InnerLike => new RowIterator { private[this] var currentLeftRow: InternalRow = _ private[this] var currentRightMatches: ExternalAppendOnlyUnsafeRowArray = _ private[this] var rightMatchesIterator: Iterator[UnsafeRow] = null private[this] val smjScanner = new SortMergeJoinScanner( createLeftKeyGenerator(), createRightKeyGenerator(), keyOrdering, RowIterator.fromScala(leftIter), RowIterator.fromScala(rightIter), inMemoryThreshold, spillThreshold ) private[this] val joinRow = new JoinedRow if (smjScanner.findNextInnerJoinRows()) { currentRightMatches = smjScanner.getBufferedMatches currentLeftRow = smjScanner.getStreamedRow rightMatchesIterator = currentRightMatches.generateIterator() } override def advanceNext(): Boolean = { while (rightMatchesIterator != null) { if (!rightMatchesIterator.hasNext) { if (smjScanner.findNextInnerJoinRows()) { currentRightMatches = smjScanner.getBufferedMatches currentLeftRow = smjScanner.getStreamedRow rightMatchesIterator = currentRightMatches.generateIterator() } else { currentRightMatches = null currentLeftRow = null rightMatchesIterator = null return false } } joinRow(currentLeftRow, rightMatchesIterator.next()) if (boundCondition(joinRow)) { numOutputRows += 1 return true } } false } override def getRow: InternalRow = resultProj(joinRow) }.toScala case LeftOuter => val smjScanner = new SortMergeJoinScanner( streamedKeyGenerator = createLeftKeyGenerator(), bufferedKeyGenerator = createRightKeyGenerator(), keyOrdering, streamedIter = RowIterator.fromScala(leftIter), bufferedIter = RowIterator.fromScala(rightIter), inMemoryThreshold, spillThreshold ) val rightNullRow = new GenericInternalRow(right.output.length) new LeftOuterIterator( smjScanner, rightNullRow, boundCondition, resultProj, numOutputRows).toScala case RightOuter => val smjScanner = new SortMergeJoinScanner( streamedKeyGenerator = createRightKeyGenerator(), bufferedKeyGenerator = createLeftKeyGenerator(), keyOrdering, streamedIter = RowIterator.fromScala(rightIter), bufferedIter = RowIterator.fromScala(leftIter), inMemoryThreshold, spillThreshold ) val leftNullRow = new GenericInternalRow(left.output.length) new RightOuterIterator( smjScanner, leftNullRow, boundCondition, resultProj, numOutputRows).toScala case FullOuter => val leftNullRow = new GenericInternalRow(left.output.length) val rightNullRow = new GenericInternalRow(right.output.length) val smjScanner = new SortMergeFullOuterJoinScanner( leftKeyGenerator = createLeftKeyGenerator(), rightKeyGenerator = createRightKeyGenerator(), keyOrdering, leftIter = RowIterator.fromScala(leftIter), rightIter = RowIterator.fromScala(rightIter), boundCondition, leftNullRow, rightNullRow) new FullOuterIterator( smjScanner, resultProj, numOutputRows).toScala case LeftSemi => new RowIterator { private[this] var currentLeftRow: InternalRow = _ private[this] val smjScanner = new SortMergeJoinScanner( createLeftKeyGenerator(), createRightKeyGenerator(), keyOrdering, RowIterator.fromScala(leftIter), RowIterator.fromScala(rightIter), inMemoryThreshold, spillThreshold ) private[this] val joinRow = new JoinedRow override def advanceNext(): Boolean = { while (smjScanner.findNextInnerJoinRows()) { val currentRightMatches = smjScanner.getBufferedMatches currentLeftRow = smjScanner.getStreamedRow if (currentRightMatches != null && currentRightMatches.length > 0) { val rightMatchesIterator = currentRightMatches.generateIterator() while (rightMatchesIterator.hasNext) { joinRow(currentLeftRow, rightMatchesIterator.next()) if (boundCondition(joinRow)) { numOutputRows += 1 return true } } } } false } override def getRow: InternalRow = currentLeftRow }.toScala case LeftAnti => new RowIterator { private[this] var currentLeftRow: InternalRow = _ private[this] val smjScanner = new SortMergeJoinScanner( createLeftKeyGenerator(), createRightKeyGenerator(), keyOrdering, RowIterator.fromScala(leftIter), RowIterator.fromScala(rightIter), inMemoryThreshold, spillThreshold ) private[this] val joinRow = new JoinedRow override def advanceNext(): Boolean = { while (smjScanner.findNextOuterJoinRows()) { currentLeftRow = smjScanner.getStreamedRow val currentRightMatches = smjScanner.getBufferedMatches if (currentRightMatches == null || currentRightMatches.length == 0) { numOutputRows += 1 return true } var found = false val rightMatchesIterator = currentRightMatches.generateIterator() while (!found && rightMatchesIterator.hasNext) { joinRow(currentLeftRow, rightMatchesIterator.next()) if (boundCondition(joinRow)) { found = true } } if (!found) { numOutputRows += 1 return true } } false } override def getRow: InternalRow = currentLeftRow }.toScala case j: ExistenceJoin => new RowIterator { private[this] var currentLeftRow: InternalRow = _ private[this] val result: InternalRow = new GenericInternalRow(Array[Any](null)) private[this] val smjScanner = new SortMergeJoinScanner( createLeftKeyGenerator(), createRightKeyGenerator(), keyOrdering, RowIterator.fromScala(leftIter), RowIterator.fromScala(rightIter), inMemoryThreshold, spillThreshold ) private[this] val joinRow = new JoinedRow override def advanceNext(): Boolean = { while (smjScanner.findNextOuterJoinRows()) { currentLeftRow = smjScanner.getStreamedRow val currentRightMatches = smjScanner.getBufferedMatches var found = false if (currentRightMatches != null && currentRightMatches.length > 0) { val rightMatchesIterator = currentRightMatches.generateIterator() while (!found && rightMatchesIterator.hasNext) { joinRow(currentLeftRow, rightMatchesIterator.next()) if (boundCondition(joinRow)) { found = true } } } result.setBoolean(0, found) numOutputRows += 1 return true } false } override def getRow: InternalRow = resultProj(joinRow(currentLeftRow, result)) }.toScala case x => throw new IllegalArgumentException( s"SortMergeJoin should not take $x as the JoinType") } } }
這里首先看下InnerLike分支下的實現:
具體邏輯很簡單:
實例化了一個SortMergeJoinScanner,具體實現可以看實現的advanceNext方法,調用findNextInnerJoinRows找到下一行可以join的數據;這里面:
1、currentLeftRow相當於是流表數據,觸發是:smjScanner.getStreamedRow
2、currentRightMatches相當於是構建表數據,觸發是:smjScanner.getBufferedMatches
3、advanceNext這里面主要就是findNextInnerJoinRows方法,如果返回true那么就是有新行,直接重置1、2的值,然后構建joinRow,之后再應用過濾條件
4、findNextInnerJoinRows:
final def findNextInnerJoinRows(): Boolean = { while (advancedStreamed() && streamedRowKey.anyNull) { // Advance the streamed side of the join until we find the next row whose join key contains // no nulls or we hit the end of the streamed iterator. } if (streamedRow == null) { // We have consumed the entire streamed iterator, so there can be no more matches. matchJoinKey = null bufferedMatches.clear() false } else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, matchJoinKey) == 0) { // The new streamed row has the same join key as the previous row, so return the same matches. true } else if (bufferedRow == null) { // The streamed row's join key does not match the current batch of buffered rows and there are // no more rows to read from the buffered iterator, so there can be no more matches. matchJoinKey = null bufferedMatches.clear() false } else { // Advance both the streamed and buffered iterators to find the next pair of matching rows. var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey) do { if (streamedRowKey.anyNull) { advancedStreamed() } else { assert(!bufferedRowKey.anyNull) comp = keyOrdering.compare(streamedRowKey, bufferedRowKey) if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey() else if (comp < 0) advancedStreamed() } } while (streamedRow != null && bufferedRow != null && comp != 0) if (streamedRow == null || bufferedRow == null) { // We have either hit the end of one of the iterators, so there can be no more matches. matchJoinKey = null bufferedMatches.clear() false } else { // The streamed row's join key matches the current buffered row's join, so walk through the // buffered iterator to buffer the rest of the matching rows. assert(comp == 0) bufferMatchingRows() true } } }
主要邏輯如下:
如果流表為空直接返回,
如何流表的行可以和當前的緩存matchJoinKey對應上,則返回true;
如果構建表為空,直接返回false;
之后具體邏輯在do while中,首先還是校驗;之后對流表和構建表數據的key進行比對,如果大於0;則重新拿構建表的數據,如果小於0,就拿流表的數據,如果不是就循環,直到倆個key相同,或者倆個表為空;之后會一直添加bufferedMatches(相當於對擁有同一個key的構建表數據進行append操作,加入bufferedMatches中);
其次在bufferMatchingRows方法中記錄了matchJoinKey,之后再調用findNextInnerJoinRows的時候,如果發現新的流表key和matchJoinKey相同直接返回true,進行join操作。
關於LeftOuter和RightOuter主要實現是基於LeftOuterIterator和RightOuterIterator,這倆個是OneSideOuterIterator的具體實現,其實依賴SortMergeJoinScanner.findNextOuterJoinRows來判斷流表和構建表的key,然后進行相應的處理;這倆個主要實現setBufferedSideOutput和setStreamSideOutput這倆個方法,之后的邏輯都在advanceStream中。
對於FullOuter主要實現就是FullOuterIterator,這里:
private class FullOuterIterator( smjScanner: SortMergeFullOuterJoinScanner, resultProj: InternalRow => InternalRow, numRows: SQLMetric) extends RowIterator { private[this] val joinedRow: JoinedRow = smjScanner.getJoinedRow() override def advanceNext(): Boolean = { val r = smjScanner.advanceNext() if (r) numRows += 1 r } override def getRow: InternalRow = resultProj(joinedRow) }
這么看FullOuter的實現倒是最簡單的;
因為返回的是一個迭代器,所以在查看源碼的時候,主要關注advanceNext方法的實現,根據這個可以追溯到整個的join的過程。
總結,這里主要簡單總結了下spark join的實現思想。具體的實現細節還是要深入代碼去了解,比如SortMergeJoinExec中,他的溢出是基於什么的?這個其實在SortMergeJoinScanner
中的ExternalAppendOnlyUnsafeRowArray,他基於UnsafeExternalSorter來實現對應的溢寫操作。