Spark SQL(9)-Spark SQL JOIN操作源碼總結
本文主要總結下spark sql join操作的實現,本文會根據spark sql 的源碼來總結其具體的實現;大體流程還是從sql語句到邏輯算子樹再到analyzed-> optimized -> 物理計划及其處理邏輯進行大致的總結。
Join邏輯算子樹
先來一個sql:
SELECT NAME FROM NAME LEFT JOIN NAME2 ON NAME = NAME JOIN NAME3 ON NAME = NAME
這條sql形成的邏輯算子樹為:

上圖的樹結構的生成;主要關注join部分就可以;其源碼在AstBuilder中:
override def visitFromClause(ctx: FromClauseContext): LogicalPlan = withOrigin(ctx) {
val from = ctx.relation.asScala.foldLeft(null: LogicalPlan) { (left, relation) =>
val right = plan(relation.relationPrimary)
val join = right.optionalMap(left)(Join(_, _, Inner, None))
withJoinRelations(join, relation)
}
ctx.lateralView.asScala.foldLeft(from)(withGenerate)
}
private def withJoinRelations(base: LogicalPlan, ctx: RelationContext): LogicalPlan = {
val pp = ctx.joinRelation
pp.asScala.foldLeft(base) { (left, join) =>
withOrigin(join) {
val baseJoinType = join.joinType match {
case null => Inner
case jt if jt.CROSS != null => Cross
case jt if jt.FULL != null => FullOuter
case jt if jt.SEMI != null => LeftSemi
case jt if jt.ANTI != null => LeftAnti
case jt if jt.LEFT != null => LeftOuter
case jt if jt.RIGHT != null => RightOuter
case _ => Inner
}
// Resolve the join type and join condition
val (joinType, condition) = Option(join.joinCriteria) match {
case Some(c) if c.USING != null =>
(UsingJoin(baseJoinType, c.identifier.asScala.map(_.getText)), None)
case Some(c) if c.booleanExpression != null =>
(baseJoinType, Option(expression(c.booleanExpression)))
case None if join.NATURAL != null =>
if (baseJoinType == Cross) {
throw new ParseException("NATURAL CROSS JOIN is not supported", ctx)
}
(NaturalJoin(baseJoinType), None)
case None =>
(baseJoinType, None)
}
Join(left, plan(join.right), joinType, condition)
}
}
}
從上圖可以看出來對於join的操作,形成的樹結構里面,保存的join關系是一個list<JoinReleation>,每個joinRelation包含了JoinType、relationPrimary以及joinCriteria;其中joinCriteria相當於是booleanExpression操作。
之后就是Join Analyzed 以及optimized 操作,在這里倆步主要操作就是添加子查詢別名等操作,之后在優化階段算子下推、消除子查詢別名等優化;這里面涉及的規則比較多,感興趣的同學可以查看源碼多研究研究;
物理計划階段
這一步主要涉及到 SparkPlanner 中配置的各種strategies,在這些策略中主要關注JoinSelection部分就行,他的apply方如下:
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
// --- BroadcastHashJoin --------------------------------------------------------------------
// broadcast hints were specified
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if canBroadcastByHints(joinType, left, right) =>
val buildSide = broadcastSideByHints(joinType, left, right)
Seq(joins.BroadcastHashJoinExec(
leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))
// broadcast hints were not specified, so need to infer it from size and configuration.
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if canBroadcastBySizes(joinType, left, right) =>
val buildSide = broadcastSideBySizes(joinType, left, right)
Seq(joins.BroadcastHashJoinExec(
leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))
// --- ShuffledHashJoin ---------------------------------------------------------------------
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if !conf.preferSortMergeJoin && canBuildRight(joinType) && canBuildLocalHashMap(right)
&& muchSmaller(right, left) ||
!RowOrdering.isOrderable(leftKeys) =>
Seq(joins.ShuffledHashJoinExec(
leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right)))
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if !conf.preferSortMergeJoin && canBuildLeft(joinType) && canBuildLocalHashMap(left)
&& muchSmaller(left, right) ||
!RowOrdering.isOrderable(leftKeys) =>
Seq(joins.ShuffledHashJoinExec(
leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), planLater(right)))
// --- SortMergeJoin ------------------------------------------------------------
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if RowOrdering.isOrderable(leftKeys) =>
joins.SortMergeJoinExec(
leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil
// --- Without joining keys ------------------------------------------------------------
// Pick BroadcastNestedLoopJoin if one side could be broadcast
case j @ logical.Join(left, right, joinType, condition)
if canBroadcastByHints(joinType, left, right) =>
val buildSide = broadcastSideByHints(joinType, left, right)
joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
case j @ logical.Join(left, right, joinType, condition)
if canBroadcastBySizes(joinType, left, right) =>
val buildSide = broadcastSideBySizes(joinType, left, right)
joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
// Pick CartesianProduct for InnerJoin
case logical.Join(left, right, _: InnerLike, condition) =>
joins.CartesianProductExec(planLater(left), planLater(right), condition) :: Nil
case logical.Join(left, right, joinType, condition) =>
val buildSide = broadcastSide(
left.stats.hints.broadcast, right.stats.hints.broadcast, left, right)
// This join could be very slow or OOM
joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
// --- Cases where this strategy does not apply ---------------------------------------------
case _ => Nil
}
}
從上面的代碼可以看出其根據不同的條件生成不同的join操作:BroadcastHashJoinExec、ShuffledHashJoinExec、SortMergeJoinExec、BroadcastNestedLoopJoinExec;
在介紹在四個操作之前,先介紹下join操作實現的大體思想:
假設有倆張表,在spark中進行操作的時候;
一張表為流表;一張表為構建表;默認的大表為流表,小表為構建表;基於流表的迭代,然后和構建表進行匹配,生成join之后的行數據。其實可以想象一種極端情況;大表特別的大有幾百萬行數據,小表數據只有10行,這個時候只需要迭代遍歷流表,然后去小表(構建表)去匹配數據,匹配到之后生成join完成之后的行;
在spark中join的大體實現是分流表和構建表;基於這倆個角色來實現join操作。接下來簡單介紹下上面的幾種join操作:
1、BroadcastHashJoinExec主要通過廣播形式實現join操作;其生成的條件是:一種是標記了hint;並且可以創建構建右表或者構建左表;另外一種是小表小於配置的spark.sql.autoBroadcastJoinThreshold參數的大小,則會進行基於廣播的join;這里面spark會先將構建表的數據拉倒driver端,之后再分發到各個worker節點,所以這一步如果構建表比較大的情況下對spark的driver節點來說可能會有壓力。
2、ShuffledHashJoinExec 通過shuffle之后在內存中保存join構建表來實現join操作;其生成的條件是:可以構建左表或者右表,其次表的大小小於分區數和配置的廣播參數的乘積(保證可以加載到本地內存進行計算),並且打開了優先考慮基於hash join的開關、其次需要保證構建表足夠小(構建表*3小於流表);其主要思想就是對流表進行迭代,之后和內存中的構建表數據匹配生成join之后的行數據。
3、SortMergeJoinExec 通過shuffle操作之后進行排序,再然后進行基於排序的join操作;如果上述倆個都不滿足的情況就會進行就排序的join(前提是可以排序);排序的join就是先對數據進行shuffle分區,保證相同的key分到相同的分區,之后進行排序操作,保證數據有序,之后進行merge join操作,同時讀取流表和構建表;因為數據有序,所以只要順序遍歷流表和構建表;匹配生成join行數據就行
4、BroadcastNestedLoopJoinExec 主要針對的是沒有join條件的連接操作;暫時不做研究;
接下來主要總結下hashJoin和SortMergeJoinExec的實現邏輯;
ShuffledHashJoinExec
private def buildHashedRelation(iter: Iterator[InternalRow]): HashedRelation = {
val buildDataSize = longMetric("buildDataSize")
val buildTime = longMetric("buildTime")
val start = System.nanoTime()
val context = TaskContext.get()
val relation = HashedRelation(iter, buildKeys, taskMemoryManager = context.taskMemoryManager())
buildTime += (System.nanoTime() - start) / 1000000
buildDataSize += relation.estimatedSize
// This relation is usually used until the end of task.
context.addTaskCompletionListener(_ => relation.close())
relation
}
protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
val avgHashProbe = longMetric("avgHashProbe")
streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, buildIter) =>
val hashed = buildHashedRelation(buildIter)
join(streamIter, hashed, numOutputRows, avgHashProbe)
}
}
先看上面的doExecute方法,一般物理計划都是觸發這個方法來執行的,這里主要的邏輯是調用了buildHashedRelation方法,在這個方法中主要關注HashedRelation就行:
private[execution] object HashedRelation {
/**
* Create a HashedRelation from an Iterator of InternalRow.
*/
def apply(
input: Iterator[InternalRow],
key: Seq[Expression],
sizeEstimate: Int = 64,
taskMemoryManager: TaskMemoryManager = null): HashedRelation = {
val mm = Option(taskMemoryManager).getOrElse {
new TaskMemoryManager(
new StaticMemoryManager(
new SparkConf().set(MEMORY_OFFHEAP_ENABLED.key, "false"),
Long.MaxValue,
Long.MaxValue,
1),
0)
}
if (key.length == 1 && key.head.dataType == LongType) {
LongHashedRelation(input, key, sizeEstimate, mm)
} else {
UnsafeHashedRelation(input, key, sizeEstimate, mm)
}
}
}
這里面根據類型dataType如果是long那么就生成LongHashedRelation(基於LongToUnsafeRowMap實現),如果不是就是UnsafeHashedRelation(基於BytesToBytesMap實現)這里主要關注UnsafeHashedRelation就行:
private[joins] object UnsafeHashedRelation {
def apply(
input: Iterator[InternalRow],
key: Seq[Expression],
sizeEstimate: Int,
taskMemoryManager: TaskMemoryManager): HashedRelation = {
val pageSizeBytes = Option(SparkEnv.get).map(_.memoryManager.pageSizeBytes)
.getOrElse(new SparkConf().getSizeAsBytes("spark.buffer.pageSize", "16m"))
val binaryMap = new BytesToBytesMap(
taskMemoryManager,
// Only 70% of the slots can be used before growing, more capacity help to reduce collision
(sizeEstimate * 1.5 + 1).toInt,
pageSizeBytes,
true)
// Create a mapping of buildKeys -> rows
val keyGenerator = UnsafeProjection.create(key)
var numFields = 0
while (input.hasNext) {
val row = input.next().asInstanceOf[UnsafeRow]
numFields = row.numFields()
val key = keyGenerator(row)
if (!key.anyNull) {
val loc = binaryMap.lookup(key.getBaseObject, key.getBaseOffset, key.getSizeInBytes)
val success = loc.append(
key.getBaseObject, key.getBaseOffset, key.getSizeInBytes,
row.getBaseObject, row.getBaseOffset, row.getSizeInBytes)
if (!success) {
binaryMap.free()
throw new SparkException("There is no enough memory to build hash map")
}
}
}
new UnsafeHashedRelation(numFields, binaryMap)
}
從上面的代碼可以看出,這里主要是根據從ShuffledHashJoinExec傳過來的buildKeys,構建一個基於buildKeys和rows的映射表,其實就是上面提到的構建表。這里准備好構建表之后,回到上面提到的ShuffledHashJoinExec.doExecute中可以看到:
protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
val avgHashProbe = longMetric("avgHashProbe")
streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, buildIter) =>
val hashed = buildHashedRelation(buildIter)
join(streamIter, hashed, numOutputRows, avgHashProbe)
}
}
可以看到基於streamIter(流表)、hashed(構建表)構成了一個join操作:
protected def join(
streamedIter: Iterator[InternalRow],
hashed: HashedRelation,
numOutputRows: SQLMetric,
avgHashProbe: SQLMetric): Iterator[InternalRow] = {
val joinedIter = joinType match {
case _: InnerLike =>
innerJoin(streamedIter, hashed)
case LeftOuter | RightOuter =>
outerJoin(streamedIter, hashed)
case LeftSemi =>
semiJoin(streamedIter, hashed)
case LeftAnti =>
antiJoin(streamedIter, hashed)
case j: ExistenceJoin =>
existenceJoin(streamedIter, hashed)
case x =>
throw new IllegalArgumentException(
s"BroadcastHashJoin should not take $x as the JoinType")
}
// At the end of the task, we update the avg hash probe.
TaskContext.get().addTaskCompletionListener(_ =>
avgHashProbe.set(hashed.getAverageProbesPerLookup))
val resultProj = createResultProjection
joinedIter.map { r =>
numOutputRows += 1
resultProj(r)
}
}
這里可以看看innerJoin的操作:
private def innerJoin(
streamIter: Iterator[InternalRow],
hashedRelation: HashedRelation): Iterator[InternalRow] = {
val joinRow = new JoinedRow
val joinKeys = streamSideKeyGenerator()
streamIter.flatMap { srow =>
joinRow.withLeft(srow)
val matches = hashedRelation.get(joinKeys(srow))
if (matches != null) {
matches.map(joinRow.withRight(_)).filter(boundCondition)
} else {
Seq.empty
}
}
}
可以看出,遍歷流表,從構建表獲取相同的key,如果不為空就構建joinRow,並應用join的條件進行篩選。到這里整個hash join的實現就算是完成了。對於其他類型的join可以自己跟代碼閱讀。
SortMergeJoinExec
doExecute方法如下:
protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
val spillThreshold = getSpillThreshold
val inMemoryThreshold = getInMemoryThreshold
left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
val boundCondition: (InternalRow) => Boolean = {
condition.map { cond =>
newPredicate(cond, left.output ++ right.output).eval _
}.getOrElse {
(r: InternalRow) => true
}
}
// An ordering that can be used to compare keys from both sides.
val keyOrdering = newNaturalAscendingOrdering(leftKeys.map(_.dataType))
val resultProj: InternalRow => InternalRow = UnsafeProjection.create(output, output)
joinType match {
case _: InnerLike =>
new RowIterator {
private[this] var currentLeftRow: InternalRow = _
private[this] var currentRightMatches: ExternalAppendOnlyUnsafeRowArray = _
private[this] var rightMatchesIterator: Iterator[UnsafeRow] = null
private[this] val smjScanner = new SortMergeJoinScanner(
createLeftKeyGenerator(),
createRightKeyGenerator(),
keyOrdering,
RowIterator.fromScala(leftIter),
RowIterator.fromScala(rightIter),
inMemoryThreshold,
spillThreshold
)
private[this] val joinRow = new JoinedRow
if (smjScanner.findNextInnerJoinRows()) {
currentRightMatches = smjScanner.getBufferedMatches
currentLeftRow = smjScanner.getStreamedRow
rightMatchesIterator = currentRightMatches.generateIterator()
}
override def advanceNext(): Boolean = {
while (rightMatchesIterator != null) {
if (!rightMatchesIterator.hasNext) {
if (smjScanner.findNextInnerJoinRows()) {
currentRightMatches = smjScanner.getBufferedMatches
currentLeftRow = smjScanner.getStreamedRow
rightMatchesIterator = currentRightMatches.generateIterator()
} else {
currentRightMatches = null
currentLeftRow = null
rightMatchesIterator = null
return false
}
}
joinRow(currentLeftRow, rightMatchesIterator.next())
if (boundCondition(joinRow)) {
numOutputRows += 1
return true
}
}
false
}
override def getRow: InternalRow = resultProj(joinRow)
}.toScala
case LeftOuter =>
val smjScanner = new SortMergeJoinScanner(
streamedKeyGenerator = createLeftKeyGenerator(),
bufferedKeyGenerator = createRightKeyGenerator(),
keyOrdering,
streamedIter = RowIterator.fromScala(leftIter),
bufferedIter = RowIterator.fromScala(rightIter),
inMemoryThreshold,
spillThreshold
)
val rightNullRow = new GenericInternalRow(right.output.length)
new LeftOuterIterator(
smjScanner, rightNullRow, boundCondition, resultProj, numOutputRows).toScala
case RightOuter =>
val smjScanner = new SortMergeJoinScanner(
streamedKeyGenerator = createRightKeyGenerator(),
bufferedKeyGenerator = createLeftKeyGenerator(),
keyOrdering,
streamedIter = RowIterator.fromScala(rightIter),
bufferedIter = RowIterator.fromScala(leftIter),
inMemoryThreshold,
spillThreshold
)
val leftNullRow = new GenericInternalRow(left.output.length)
new RightOuterIterator(
smjScanner, leftNullRow, boundCondition, resultProj, numOutputRows).toScala
case FullOuter =>
val leftNullRow = new GenericInternalRow(left.output.length)
val rightNullRow = new GenericInternalRow(right.output.length)
val smjScanner = new SortMergeFullOuterJoinScanner(
leftKeyGenerator = createLeftKeyGenerator(),
rightKeyGenerator = createRightKeyGenerator(),
keyOrdering,
leftIter = RowIterator.fromScala(leftIter),
rightIter = RowIterator.fromScala(rightIter),
boundCondition,
leftNullRow,
rightNullRow)
new FullOuterIterator(
smjScanner,
resultProj,
numOutputRows).toScala
case LeftSemi =>
new RowIterator {
private[this] var currentLeftRow: InternalRow = _
private[this] val smjScanner = new SortMergeJoinScanner(
createLeftKeyGenerator(),
createRightKeyGenerator(),
keyOrdering,
RowIterator.fromScala(leftIter),
RowIterator.fromScala(rightIter),
inMemoryThreshold,
spillThreshold
)
private[this] val joinRow = new JoinedRow
override def advanceNext(): Boolean = {
while (smjScanner.findNextInnerJoinRows()) {
val currentRightMatches = smjScanner.getBufferedMatches
currentLeftRow = smjScanner.getStreamedRow
if (currentRightMatches != null && currentRightMatches.length > 0) {
val rightMatchesIterator = currentRightMatches.generateIterator()
while (rightMatchesIterator.hasNext) {
joinRow(currentLeftRow, rightMatchesIterator.next())
if (boundCondition(joinRow)) {
numOutputRows += 1
return true
}
}
}
}
false
}
override def getRow: InternalRow = currentLeftRow
}.toScala
case LeftAnti =>
new RowIterator {
private[this] var currentLeftRow: InternalRow = _
private[this] val smjScanner = new SortMergeJoinScanner(
createLeftKeyGenerator(),
createRightKeyGenerator(),
keyOrdering,
RowIterator.fromScala(leftIter),
RowIterator.fromScala(rightIter),
inMemoryThreshold,
spillThreshold
)
private[this] val joinRow = new JoinedRow
override def advanceNext(): Boolean = {
while (smjScanner.findNextOuterJoinRows()) {
currentLeftRow = smjScanner.getStreamedRow
val currentRightMatches = smjScanner.getBufferedMatches
if (currentRightMatches == null || currentRightMatches.length == 0) {
numOutputRows += 1
return true
}
var found = false
val rightMatchesIterator = currentRightMatches.generateIterator()
while (!found && rightMatchesIterator.hasNext) {
joinRow(currentLeftRow, rightMatchesIterator.next())
if (boundCondition(joinRow)) {
found = true
}
}
if (!found) {
numOutputRows += 1
return true
}
}
false
}
override def getRow: InternalRow = currentLeftRow
}.toScala
case j: ExistenceJoin =>
new RowIterator {
private[this] var currentLeftRow: InternalRow = _
private[this] val result: InternalRow = new GenericInternalRow(Array[Any](null))
private[this] val smjScanner = new SortMergeJoinScanner(
createLeftKeyGenerator(),
createRightKeyGenerator(),
keyOrdering,
RowIterator.fromScala(leftIter),
RowIterator.fromScala(rightIter),
inMemoryThreshold,
spillThreshold
)
private[this] val joinRow = new JoinedRow
override def advanceNext(): Boolean = {
while (smjScanner.findNextOuterJoinRows()) {
currentLeftRow = smjScanner.getStreamedRow
val currentRightMatches = smjScanner.getBufferedMatches
var found = false
if (currentRightMatches != null && currentRightMatches.length > 0) {
val rightMatchesIterator = currentRightMatches.generateIterator()
while (!found && rightMatchesIterator.hasNext) {
joinRow(currentLeftRow, rightMatchesIterator.next())
if (boundCondition(joinRow)) {
found = true
}
}
}
result.setBoolean(0, found)
numOutputRows += 1
return true
}
false
}
override def getRow: InternalRow = resultProj(joinRow(currentLeftRow, result))
}.toScala
case x =>
throw new IllegalArgumentException(
s"SortMergeJoin should not take $x as the JoinType")
}
}
}
這里首先看下InnerLike分支下的實現:
具體邏輯很簡單:
實例化了一個SortMergeJoinScanner,具體實現可以看實現的advanceNext方法,調用findNextInnerJoinRows找到下一行可以join的數據;這里面:
1、currentLeftRow相當於是流表數據,觸發是:smjScanner.getStreamedRow
2、currentRightMatches相當於是構建表數據,觸發是:smjScanner.getBufferedMatches
3、advanceNext這里面主要就是findNextInnerJoinRows方法,如果返回true那么就是有新行,直接重置1、2的值,然后構建joinRow,之后再應用過濾條件
4、findNextInnerJoinRows:
final def findNextInnerJoinRows(): Boolean = {
while (advancedStreamed() && streamedRowKey.anyNull) {
// Advance the streamed side of the join until we find the next row whose join key contains
// no nulls or we hit the end of the streamed iterator.
}
if (streamedRow == null) {
// We have consumed the entire streamed iterator, so there can be no more matches.
matchJoinKey = null
bufferedMatches.clear()
false
} else if (matchJoinKey != null && keyOrdering.compare(streamedRowKey, matchJoinKey) == 0) {
// The new streamed row has the same join key as the previous row, so return the same matches.
true
} else if (bufferedRow == null) {
// The streamed row's join key does not match the current batch of buffered rows and there are
// no more rows to read from the buffered iterator, so there can be no more matches.
matchJoinKey = null
bufferedMatches.clear()
false
} else {
// Advance both the streamed and buffered iterators to find the next pair of matching rows.
var comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
do {
if (streamedRowKey.anyNull) {
advancedStreamed()
} else {
assert(!bufferedRowKey.anyNull)
comp = keyOrdering.compare(streamedRowKey, bufferedRowKey)
if (comp > 0) advancedBufferedToRowWithNullFreeJoinKey()
else if (comp < 0) advancedStreamed()
}
} while (streamedRow != null && bufferedRow != null && comp != 0)
if (streamedRow == null || bufferedRow == null) {
// We have either hit the end of one of the iterators, so there can be no more matches.
matchJoinKey = null
bufferedMatches.clear()
false
} else {
// The streamed row's join key matches the current buffered row's join, so walk through the
// buffered iterator to buffer the rest of the matching rows.
assert(comp == 0)
bufferMatchingRows()
true
}
}
}
主要邏輯如下:
如果流表為空直接返回,
如何流表的行可以和當前的緩存matchJoinKey對應上,則返回true;
如果構建表為空,直接返回false;
之后具體邏輯在do while中,首先還是校驗;之后對流表和構建表數據的key進行比對,如果大於0;則重新拿構建表的數據,如果小於0,就拿流表的數據,如果不是就循環,直到倆個key相同,或者倆個表為空;之后會一直添加bufferedMatches(相當於對擁有同一個key的構建表數據進行append操作,加入bufferedMatches中);
其次在bufferMatchingRows方法中記錄了matchJoinKey,之后再調用findNextInnerJoinRows的時候,如果發現新的流表key和matchJoinKey相同直接返回true,進行join操作。
關於LeftOuter和RightOuter主要實現是基於LeftOuterIterator和RightOuterIterator,這倆個是OneSideOuterIterator的具體實現,其實依賴SortMergeJoinScanner.findNextOuterJoinRows來判斷流表和構建表的key,然后進行相應的處理;這倆個主要實現setBufferedSideOutput和setStreamSideOutput這倆個方法,之后的邏輯都在advanceStream中。
對於FullOuter主要實現就是FullOuterIterator,這里:
private class FullOuterIterator(
smjScanner: SortMergeFullOuterJoinScanner,
resultProj: InternalRow => InternalRow,
numRows: SQLMetric) extends RowIterator {
private[this] val joinedRow: JoinedRow = smjScanner.getJoinedRow()
override def advanceNext(): Boolean = {
val r = smjScanner.advanceNext()
if (r) numRows += 1
r
}
override def getRow: InternalRow = resultProj(joinedRow)
}
這么看FullOuter的實現倒是最簡單的;
因為返回的是一個迭代器,所以在查看源碼的時候,主要關注advanceNext方法的實現,根據這個可以追溯到整個的join的過程。
總結,這里主要簡單總結了下spark join的實現思想。具體的實現細節還是要深入代碼去了解,比如SortMergeJoinExec中,他的溢出是基於什么的?這個其實在SortMergeJoinScanner
中的ExternalAppendOnlyUnsafeRowArray,他基於UnsafeExternalSorter來實現對應的溢寫操作。
