本文基於 TiDB release-5.1進行分析,需要用到 Go 1.16以后的版本
所謂 Hash Join 就是在 join 的時候選擇一張表作為 buildSide 表來構造哈希表,另外一張表作為 probeSide 表;然后對 probeSide 表的每一行數據都去這個哈希表中查找是否有匹配的數據。
根據上面的定義,看起來 Hash Join 貌似很好做,只需要弄一個大 map 然后遍歷 probeSide 表的數據進行匹配就好了。但是作為一個高效的數據庫, TiDB 會在這個過程做什么優化呢?
所以在閱讀文章前先帶着這幾個疑問:
- 哪張表會成為 buildSide 表或 probeSide 表?
- buildSide 表來構造的哈希表是包含了 buildSide 表的所有數據嗎?數據量太大會不會有問題?
- probeSide 表匹配 buildSide 表的時候是單線程匹配還是多線程匹配?如果是多線程匹配,那么如何分配匹配的數據呢?
下面我用這個例子來進行講解:
CREATE TABLE test1 (a int , b int, c int, d int);
CREATE TABLE test2 (a int , b int, c int, d int);
然后查詢執行計划:
explain select * from test1 t1 join test1 t2 on t1.a= t2.a ;
+-----------------------+--------+---------+-------------+--------------------------------------------------+
|id |estRows |task |access object|operator info |
+-----------------------+--------+---------+-------------+--------------------------------------------------+
|HashJoin_8 |12487.50|root | |inner join, equal:[eq(test.test1.a, test.test1.a)]|
|├─TableReader_15(Build)|9990.00 |root | |data:Selection_14 |
|│ └─Selection_14 |9990.00 |cop[tikv]| |not(isnull(test.test1.a)) |
|│ └─TableFullScan_13 |10000.00|cop[tikv]|table:t2 |keep order:false, stats:pseudo |
|└─TableReader_12(Probe)|9990.00 |root | |data:Selection_11 |
| └─Selection_11 |9990.00 |cop[tikv]| |not(isnull(test.test1.a)) |
| └─TableFullScan_10 |10000.00|cop[tikv]|table:t1 |keep order:false, stats:pseudo |
+-----------------------+--------+---------+-------------+--------------------------------------------------+
構建 Hash Join 執行器
-
TiDB 首先會根據 SQL 來構建相應的 Logic Plan;
-
然后將 Logic Plan 轉成 Physical Plan,這里是轉成 PhysicalHashJoin 作為 Physical Plan;
-
通過比較 Physical Plan 的代價,最后選擇一個代價最小的 Physical Plan 構建執行器 executor;
之所以要講一下這里是因為通過 Physical Plan 構建執行器的時候會判斷是哪張表來做 buildSide 表 或 probeSide 表;
構建 Physical Plan
構建 Physical Plan 在exhaust_physical_plans.go文件的 getHashJoins 方法中:
func (p *LogicalJoin) getHashJoins(prop *property.PhysicalProperty) []PhysicalPlan {
...
joins := make([]PhysicalPlan, 0, 2)
switch p.JoinType {
case SemiJoin, AntiSemiJoin, LeftOuterSemiJoin, AntiLeftOuterSemiJoin:
joins = append(joins, p.getHashJoin(prop, 1, false))
case LeftOuterJoin:
joins = append(joins, p.getHashJoin(prop, 1, false))
joins = append(joins, p.getHashJoin(prop, 1, true))
case RightOuterJoin:
joins = append(joins, p.getHashJoin(prop, 0, false))
joins = append(joins, p.getHashJoin(prop, 0, true))
case InnerJoin:
joins = append(joins, p.getHashJoin(prop, 1, false))
joins = append(joins, p.getHashJoin(prop, 0, false))
}
return joins
}
這個方法會根據 Join 的類型分別調用 getHashJoin 方法創建 Physical Plan。 這里會創建多個 PhysicalHashJoin ,后面會選擇一個代價最小的 Physical Plan 構建執行器。
需要注意的是 getHashJoin 后面兩個參數:
func (p *LogicalJoin) getHashJoin(prop *property.PhysicalProperty, innerIdx int, useOuterToBuild bool) *PhysicalHashJoin
后面會根據 innerIdx 和 useOuterToBuild 決定哪張會成為 buildSide 表 或 probeSide 表;
選擇效率最高的執行計划
構建好 Physical Plan 之后會遍歷創建的 Plan 獲取它的代價:
func (p *baseLogicalPlan) enumeratePhysicalPlans4Task(physicalPlans []PhysicalPlan, prop *property.PhysicalProperty, addEnforcer bool, planCounter *PlanCounterTp) (task, int64, error) {
var bestTask task = invalidTask
childTasks := make([]task, 0, len(p.children))
for _, pp := range physicalPlans {
childTasks = childTasks[:0]
for j, child := range p.children {
childTask, cnt, err := child.findBestTask(pp.GetChildReqProps(j), &PlanCounterDisabled)
...
childTasks = append(childTasks, childTask)
}
// Combine best child tasks with parent physical plan.
curTask := pp.attach2Task(childTasks...)
...
// Get the most efficient one.
if curTask.cost() < bestTask.cost() || (bestTask.invalid() && !curTask.invalid()) {
bestTask = curTask
}
}
return bestTask, ...
}
從這些 Plan 里面挑選出代價最小的返回。
通過執行計划構建執行器
獲取到執行計划之后,會通過一系列的調用到 buildHashJoin 構建 HashJoinExec 作為 hash join 執行器:
我們來看一下 buildHashJoin:
func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executor {
// 構建左表 executor
leftExec := b.build(v.Children()[0])
if b.err != nil {
return nil
}
// 構建右表 executor
rightExec := b.build(v.Children()[1])
if b.err != nil {
return nil
}
// 構建
e := &HashJoinExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID(), leftExec, rightExec),
concurrency: v.Concurrency,
// join 類型
joinType: v.JoinType,
isOuterJoin: v.JoinType.IsOuterJoin(),
useOuterToBuild: v.UseOuterToBuild,
}
...
//選擇 buildSideExec 和 probeSideExec
if v.UseOuterToBuild {
if v.InnerChildIdx == 1 { // left join InnerChildIdx =1
e.buildSideExec, e.buildKeys = leftExec, v.LeftJoinKeys
e.probeSideExec, e.probeKeys = rightExec, v.RightJoinKeys
e.outerFilter = v.LeftConditions
} else {
e.buildSideExec, e.buildKeys = rightExec, v.RightJoinKeys
e.probeSideExec, e.probeKeys = leftExec, v.LeftJoinKeys
e.outerFilter = v.RightConditions
}
} else {
if v.InnerChildIdx == 0 {
e.buildSideExec, e.buildKeys = leftExec, v.LeftJoinKeys
e.probeSideExec, e.probeKeys = rightExec, v.RightJoinKeys
e.outerFilter = v.RightConditions
} else {
e.buildSideExec, e.buildKeys = rightExec, v.RightJoinKeys
e.probeSideExec, e.probeKeys = leftExec, v.LeftJoinKeys
e.outerFilter = v.LeftConditions
}
}
childrenUsedSchema := markChildrenUsedCols(v.Schema(), v.Children()[0].Schema(), v.Children()[1].Schema())
e.joiners = make([]joiner, e.concurrency)
for i := uint(0); i < e.concurrency; i++ {
// 創建 joiner 用於 Join 匹配
e.joiners[i] = newJoiner(b.ctx, v.JoinType, v.InnerChildIdx == 0, defaultValues,
v.OtherConditions, lhsTypes, rhsTypes, childrenUsedSchema)
}
...
return e
}
這段主要的邏輯就是根據最優的 Physical Plan 來構建 HashJoinExec。
其中需要主要的是,這里會根據 UseOuterToBuild 和 InnerChildIdx 來決定 buildSide 表和 probeSide 表。
比如在構建 left join 的 Physical Plan 的時候:
func (p *LogicalJoin) getHashJoins(prop *property.PhysicalProperty) []PhysicalPlan {
...
joins := make([]PhysicalPlan, 0, 2)
switch p.JoinType {
case LeftOuterJoin:
joins = append(joins, p.getHashJoin(prop, 1, false))
joins = append(joins, p.getHashJoin(prop, 1, true))
...
}
return joins
}
傳入的 getHashJoin 方法中第一個參數代表 InnerChildIdx,第二個參數代表 UseOuterToBuild。這里會生成兩個 Physical Plan ,然后會根據代價計算出最優的那個;
進入到 buildHashJoin 方法的時候,可以發現 buildSide 表和 probeSide 表是最后和 Physical Plan 有關:
func (b *executorBuilder) buildHashJoin(v *plannercore.PhysicalHashJoin) Executor {
...
//選擇 buildSideExec 和 probeSideExec
if v.UseOuterToBuild {
if v.InnerChildIdx == 1 { // left join InnerChildIdx =1
e.buildSideExec, e.buildKeys = leftExec, v.LeftJoinKeys
e.probeSideExec, e.probeKeys = rightExec, v.RightJoinKeys
e.outerFilter = v.LeftConditions
} else {
...
}
} else {
if v.InnerChildIdx == 0 {
...
} else {
e.buildSideExec, e.buildKeys = rightExec, v.RightJoinKeys
e.probeSideExec, e.probeKeys = leftExec, v.LeftJoinKeys
e.outerFilter = v.LeftConditions
}
}
...
return e
}
運行Hash Join 執行器
在構建完 HashJoinExec 之后就到了獲取數據的環節,TiDB 會通過 Next 方法一次性從執行器里面獲取一批數據,具體獲取數據的方法在 HashJoinExec 的 Next 里面。
func (e *HashJoinExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
if !e.prepared {
e.buildFinished = make(chan error, 1)
// 異步根據buildSide表中數據, 構建 hashtable
go util.WithRecovery(func() {
defer trace.StartRegion(ctx, "HashJoinHashTableBuilder").End()
e.fetchAndBuildHashTable(ctx)
}, e.handleFetchAndBuildHashTablePanic)
// 讀取probeSide表和構建的hashtable做匹配,獲取數據放入joinResultCh
e.fetchAndProbeHashTable(ctx)
e.prepared = true
}
if e.isOuterJoin {
atomic.StoreInt64(&e.requiredRows, int64(req.RequiredRows()))
}
req.Reset()
// 獲取結果數據
result, ok := <-e.joinResultCh
if !ok {
return nil
}
if result.err != nil {
e.finished.Store(true)
return result.err
}
// 將數據返回放入到 req Chunk 中
req.SwapColumns(result.chk)
result.src <- result.chk
return nil
}
Next 方法獲取數據分為三步:
- 調用 fetchAndBuildHashTable 方法異步根據buildSide表中數據, 構建 hashtable;
- 調用 fetchAndProbeHashTable 方法讀取probeSide表和構建的hashtable做匹配,獲取數據放入joinResultCh;
- 從 joinResultCh 中獲取數據;
fetchAndBuildHashTable 構建 hash table
func (e *HashJoinExec) fetchAndBuildHashTable(ctx context.Context) {
...
buildSideResultCh := make(chan *chunk.Chunk, 1)
doneCh := make(chan struct{})
go util.WithRecovery(
func() {
defer trace.StartRegion(ctx, "HashJoinBuildSideFetcher").End()
// 獲取 buildSide 表中的數據,將數據放入到 buildSideResultCh 中
e.fetchBuildSideRows(ctx, buildSideResultCh, doneCh)
}, ...,
)
// 從 buildSideResultCh 中讀取數據構建 rowContainer
err := e.buildHashTableForList(buildSideResultCh)
if err != nil {
e.buildFinished <- errors.Trace(err)
close(doneCh)
}
...
}
這里構建 hash map 的過程分為兩部分:
- 異步調用 fetchBuildSideRows 循環獲取buildSide表中數據,放入到 buildSideResultCh 中;
- 從 buildSideResultCh 中讀取數據構建 rowContainer,rowContainer 相當於 hash map 存放數據的地方。
我們下面來看一下 buildHashTableForList:
func (e *HashJoinExec) buildHashTableForList(buildSideResultCh <-chan *chunk.Chunk) error {
e.rowContainer = newHashRowContainer(e.ctx, int(e.buildSideEstCount), hCtx)
...
// 讀取 channel 數據
for chk := range buildSideResultCh {
if e.finished.Load().(bool) {
return nil
}
if !e.useOuterToBuild {
// 將數據存入到 rowContainer 中
err = e.rowContainer.PutChunk(chk, e.isNullEQ)
} else {
...
}
if err != nil {
return err
}
}
return nil
}
這里會將 chunk 的數據通過 PutChunk 存入到 rowContainer 中。
func (c *hashRowContainer) PutChunk(chk *chunk.Chunk, ignoreNulls []bool) error {
return c.PutChunkSelected(chk, nil, ignoreNulls)
}
func (c *hashRowContainer) PutChunkSelected(chk *chunk.Chunk, selected, ignoreNulls []bool) error {
start := time.Now()
defer func() { c.stat.buildTableElapse += time.Since(start) }()
chkIdx := uint32(c.rowContainer.NumChunks())
// 將數據存放到 RowContainer 中,內存中放不下會存放到磁盤中
err := c.rowContainer.Add(chk)
if err != nil {
return err
}
numRows := chk.NumRows()
c.hCtx.initHash(numRows)
hCtx := c.hCtx
// 根據chunk中的column值構建hash值
for keyIdx, colIdx := range c.hCtx.keyColIdx {
ignoreNull := len(ignoreNulls) > keyIdx && ignoreNulls[keyIdx]
err := codec.HashChunkSelected(c.sc, hCtx.hashVals, chk, hCtx.allTypes[colIdx], colIdx, hCtx.buf, hCtx.hasNull, selected, ignoreNull)
if err != nil {
return errors.Trace(err)
}
}
// 根據hash值構建hash table
for i := 0; i < numRows; i++ {
if (selected != nil && !selected[i]) || c.hCtx.hasNull[i] {
continue
}
key := c.hCtx.hashVals[i].Sum64()
rowPtr := chunk.RowPtr{ChkIdx: chkIdx, RowIdx: uint32(i)}
c.hashTable.Put(key, rowPtr)
}
return nil
}
對於 rowContainer 來說,數據存放分為兩部分:一部分是存放 chunk 數據到 rowContainer 的 records 或 recordsInDisk 里面;另一部分是構建 hash table 存放 key 值以及將數據的索引作為 value。
func (c *RowContainer) Add(chk *Chunk) (err error) {
...
// 如果內存已經滿了,那么會寫入到磁盤中
if c.alreadySpilled() {
if c.m.spillError != nil {
return c.m.spillError
}
err = c.m.recordsInDisk.Add(chk)
} else {
// 否則寫入內存
c.m.records.Add(chk)
}
return
}
RowContainer 會根據內存使用量來判斷是否要存磁盤還是存內存。
多線程執行 hash Join
hash Join 的過程是通過 fetchAndProbeHashTable 方法來執行的,這個方法比較有意思,向我們展示了如何在多線程中使用 chanel 進行數據傳遞。
func (e *HashJoinExec) fetchAndProbeHashTable(ctx context.Context) {
// 初始化數據傳遞的 channel
e.initializeForProbe()
e.joinWorkerWaitGroup.Add(1)
// 循環獲取 ProbeSide 表中的數據,將數據存放到 probeSideResult channel中
go util.WithRecovery(func() {
defer trace.StartRegion(ctx, "HashJoinProbeSideFetcher").End()
e.fetchProbeSideChunks(ctx)
}, e.handleProbeSideFetcherPanic)
probeKeyColIdx := make([]int, len(e.probeKeys))
for i := range e.probeKeys {
probeKeyColIdx[i] = e.probeKeys[i].Index
}
// 啟動多個 join workers 去buildSide表和ProbeSide 表匹配數據
for i := uint(0); i < e.concurrency; i++ {
e.joinWorkerWaitGroup.Add(1)
workID := i
go util.WithRecovery(func() {
defer trace.StartRegion(ctx, "HashJoinWorker").End()
e.runJoinWorker(workID, probeKeyColIdx)
}, e.handleJoinWorkerPanic)
}
go util.WithRecovery(e.waitJoinWorkersAndCloseResultChan, nil)
}
整個 hash Join 的執行分為三個部分:
- 由於在 hash Join 過程中是通過多線程處理的,所以會用到 channel 進行數據傳遞,所以第一步是調用 initializeForProbe 初始化數據傳遞的 channel;
- 然后會異步的調用 fetchProbeSideChunks 從 ProbeSide 表獲取數據;
- 接下來會啟動多個線程調用 runJoinWorker 方法啟動多個 Join Worker 來進行 hash Join ;
需要注意的是,這里我們將查詢probeSide表數據的線程稱作 probeSideExec worker;將執行 join 匹配的線程稱作 join worker,它的數量由 concurrency 決定,默認是5個。
initializeForProbe
我們先來看看 initializeForProbe:
func (e *HashJoinExec) initializeForProbe() {
// 用於probeSideExec worker保存probeSide表數據,用來給join worker做關聯使用
e.probeResultChs = make([]chan *chunk.Chunk, e.concurrency)
for i := uint(0); i < e.concurrency; i++ {
e.probeResultChs[i] = make(chan *chunk.Chunk, 1)
}
// 用於將已被join workers使用過的chunks給probeSideExec worker復用
e.probeChkResourceCh = make(chan *probeChkResource, e.concurrency)
for i := uint(0); i < e.concurrency; i++ {
e.probeChkResourceCh <- &probeChkResource{
chk: newFirstChunk(e.probeSideExec),
dest: e.probeResultChs[i],
}
}
// 用於將可以重復使用的join result chunks從main thread傳遞到join worker
e.joinChkResourceCh = make([]chan *chunk.Chunk, e.concurrency)
for i := uint(0); i < e.concurrency; i++ {
e.joinChkResourceCh[i] = make(chan *chunk.Chunk, 1)
e.joinChkResourceCh[i] <- newFirstChunk(e)
}
// 用於將join結果chunks從 join worker傳遞到 main thread
e.joinResultCh = make(chan *hashjoinWorkerResult, e.concurrency+1)
}
這個方法主要就是初始化4個 channel 對象。
probeResultChs:用於保存probeSide表查出來的數據;
probeChkResourceCh:用於將已被join workers使用過的chunks給probeSideExec worker復用;
joinChkResourceCh:也是用於傳遞 chunks,主要是給 join worker 復用;
joinResultCh:用於傳遞 join worker 匹配的結果給 main thread;
fetchProbeSideChunks
下面我們再來看看異步 fetchProbeSideChunks的過程:
func (e *HashJoinExec) fetchProbeSideChunks(ctx context.Context) {
for {
...
var probeSideResource *probeChkResource
select {
case <-e.closeCh:
return
case probeSideResource, ok = <-e.probeChkResourceCh:
}
// 獲取可用的 chunk
probeSideResult := probeSideResource.chk
if e.isOuterJoin {
required := int(atomic.LoadInt64(&e.requiredRows))
probeSideResult.SetRequiredRows(required, e.maxChunkSize)
}
// 獲取數據存入到 probeSideResult
err := Next(ctx, e.probeSideExec, probeSideResult)
...
//將有數據的chunk.Chunk放入到dest channel中
probeSideResource.dest <- probeSideResult
}
}
在理清楚各個 channel 的作用之后就可以很容易的理解,這里主要就是獲取可用的 chunk,然后調用 Next 將數據放入到 chunk 中,最后將 chunk 放入到dest channel中。
runJoinWorker
最后我們來看看 Join Worker 的實現:
func (e *HashJoinExec) runJoinWorker(workerID uint, probeKeyColIdx []int) {
...
var (
probeSideResult *chunk.Chunk
selected = make([]bool, 0, chunk.InitialCapacity)
)
// 獲取 hashjoinWorkerResult
ok, joinResult := e.getNewJoinResult(workerID)
if !ok {
return
}
emptyProbeSideResult := &probeChkResource{
dest: e.probeResultChs[workerID],
}
hCtx := &hashContext{
allTypes: e.probeTypes,
keyColIdx: probeKeyColIdx,
}
// 循環獲取 probeSideResult
for ok := true; ok; {
if e.finished.Load().(bool) {
break
}
select {
case <-e.closeCh:
return
// probeResultChs 里存放的是probeSideExec worker查詢出來的數據
case probeSideResult, ok = <-e.probeResultChs[workerID]:
}
if !ok {
break
}
// 將join匹配的數據放入到joinResult的chunk里面
ok, joinResult = e.join2Chunk(workerID, probeSideResult, hCtx, joinResult, selected)
if !ok {
break
}
// 使用完之后,將chunk重置,重新放回 probeChkResourceCh 給probeSideExec worker使用
probeSideResult.Reset()
emptyProbeSideResult.chk = probeSideResult
e.probeChkResourceCh <- emptyProbeSideResult
}
...
}
由於 probeSideExec worker 會將數據放入到 probeResultChs 中,所以這里會循環獲取它里面的數據,然后調用 join2Chunk 進行數據匹配。
func (e *HashJoinExec) join2Chunk(workerID uint, probeSideChk *chunk.Chunk, hCtx *hashContext, joinResult *hashjoinWorkerResult,
selected []bool) (ok bool, _ *hashjoinWorkerResult) {
var err error
// 校驗probeSide chunk查詢到的數據是否可用來匹配
selected, err = expression.VectorizedFilter(e.ctx, e.outerFilter, chunk.NewIterator4Chunk(probeSideChk), selected)
if err != nil {
joinResult.err = err
return false, joinResult
}
//probeSide表的hash,用於匹配
hCtx.initHash(probeSideChk.NumRows())
for keyIdx, i := range hCtx.keyColIdx {
ignoreNull := len(e.isNullEQ) > keyIdx && e.isNullEQ[keyIdx]
err = codec.HashChunkSelected(e.rowContainer.sc, hCtx.hashVals, probeSideChk, hCtx.allTypes[i], i, hCtx.buf, hCtx.hasNull, selected, ignoreNull)
if err != nil {
joinResult.err = err
return false, joinResult
}
}
//遍歷probeSide表查詢到的行記錄
for i := range selected {
...
if !selected[i] || hCtx.hasNull[i] { // process unmatched probe side rows
e.joiners[workerID].onMissMatch(false, probeSideChk.GetRow(i), joinResult.chk)
} else { // process matched probe side rows
// 獲取行記錄的 probeKey 和 probeRow
probeKey, probeRow := hCtx.hashVals[i].Sum64(), probeSideChk.GetRow(i)
ok, joinResult = e.joinMatchedProbeSideRow2Chunk(workerID, probeKey, probeRow, hCtx, joinResult)
if !ok {
return false, joinResult
}
}
// 如果joinResult的chunk已經滿了,那么將數據放入到 joinResultCh,再重新獲取 joinResult
if joinResult.chk.IsFull() {
e.joinResultCh <- joinResult
ok, joinResult = e.getNewJoinResult(workerID)
if !ok {
return false, joinResult
}
}
}
return true, joinResult
}
數據匹配這里也大致分為以下幾個步驟:
- 校驗probeSide chunk查詢到的數據是否可用來匹配;
- 獲取到 probeSide chunk 的數據行進行hash,用於匹配;
- 遍歷probeSide chunk表可用於匹配的數據,並調用 joinMatchedProbeSideRow2Chunk 獲取匹配成功數據填入到 joinResult 中;
func (e *HashJoinExec) join2Chunk(workerID uint, probeSideChk *chunk.Chunk, hCtx *hashContext, joinResult *hashjoinWorkerResult,
selected []bool) (ok bool, _ *hashjoinWorkerResult) {
var err error
// 校驗probeSide chunk查詢到的數據是否可用來匹配
selected, err = expression.VectorizedFilter(e.ctx, e.outerFilter, chunk.NewIterator4Chunk(probeSideChk), selected)
if err != nil {
joinResult.err = err
return false, joinResult
}
//probeSide表的hash,用於匹配
hCtx.initHash(probeSideChk.NumRows())
for keyIdx, i := range hCtx.keyColIdx {
ignoreNull := len(e.isNullEQ) > keyIdx && e.isNullEQ[keyIdx]
err = codec.HashChunkSelected(e.rowContainer.sc, hCtx.hashVals, probeSideChk, hCtx.allTypes[i], i, hCtx.buf, hCtx.hasNull, selected, ignoreNull)
if err != nil {
joinResult.err = err
return false, joinResult
}
}
//遍歷probeSide表查詢到的行記錄
for i := range selected {
...
if !selected[i] || hCtx.hasNull[i] { // process unmatched probe side rows
e.joiners[workerID].onMissMatch(false, probeSideChk.GetRow(i), joinResult.chk)
} else { // process matched probe side rows
// 獲取行記錄的 probeKey 和 probeRow
probeKey, probeRow := hCtx.hashVals[i].Sum64(), probeSideChk.GetRow(i)
// 進行數據匹配
ok, joinResult = e.joinMatchedProbeSideRow2Chunk(workerID, probeKey, probeRow, hCtx, joinResult)
if !ok {
return false, joinResult
}
}
// 如果joinResult的chunk已經滿了,那么將數據放入到 joinResultCh,再重新獲取 joinResult
if joinResult.chk.IsFull() {
e.joinResultCh <- joinResult
ok, joinResult = e.getNewJoinResult(workerID)
if !ok {
return false, joinResult
}
}
}
return true, joinResult
}
join2Chunk 會根據過濾條件判斷 probeSide chunk 返回的數據是不是都能進行匹配,減少數據的匹配量;
如果可以匹配,那么會將 probeSide chunk 記錄行的probeKey與probeRow傳入到 joinMatchedProbeSideRow2Chunk 進行數據匹配。
func (e *HashJoinExec) joinMatchedProbeSideRow2Chunk(workerID uint, probeKey uint64, probeSideRow chunk.Row, hCtx *hashContext,
joinResult *hashjoinWorkerResult) (bool, *hashjoinWorkerResult) {
// 從buildSide表中匹配數據
buildSideRows, _, err := e.rowContainer.GetMatchedRowsAndPtrs(probeKey, probeSideRow, hCtx)
if err != nil {
joinResult.err = err
return false, joinResult
}
//表示沒有匹配到數據,直接返回
if len(buildSideRows) == 0 {
e.joiners[workerID].onMissMatch(false, probeSideRow, joinResult.chk)
return true, joinResult
}
iter := chunk.NewIterator4Slice(buildSideRows)
hasMatch, hasNull, ok := false, false, false
// 將匹配上的數據add到 joinResult chunk 中
for iter.Begin(); iter.Current() != iter.End(); {
matched, isNull, err := e.joiners[workerID].tryToMatchInners(probeSideRow, iter, joinResult.chk)
if err != nil {
joinResult.err = err
return false, joinResult
}
if joinResult.chk.IsFull() {
e.joinResultCh <- joinResult
ok, joinResult = e.getNewJoinResult(workerID)
if !ok {
return false, joinResult
}
}
}
...
return true, joinResult
}
joinMatchedProbeSideRow2Chunk 會從 rowContainer 去獲取數據,獲取不到數據直接返回,獲取到數據會將數據存放到 joinResult chunk 中。
下面用一個流程圖來解釋一下整個hash匹配過程:
整體上Join Worker匹配邏輯是:
- 從 probeSide 表獲取數據到 probeSideResource;
- 根據 probeSideResource 的數據查哈希表,將 probeSide 表和buildSide表進行匹配;
- 將匹配上的數據寫入到joinResult chunk 中;
- 最后將joinResult的數據刷入到 joinResultCh 發送給 Main Thread;
總結
這篇文章基本上從構建hash join執行器開始到運行 HashJoinExec 執行器進行了一個全面的解析。
回到開頭提出的問題:
-
哪張表會成為 buildSide 表或 probeSide 表?
這個是由優化器決定的,創建 Physical Plan 的時候會創建多個,然后會遍歷創建的 Plan 獲取它的代價最小的那個。
-
buildSide 表來構造的哈希表是包含了 buildSide 表的所有數據嗎?數據量太大會不會有問題?
buildSide 表構造的 hash 表包含了所有的數據,但是TiDB這里 hash表和數據項是分離的;數據是存放到 rowContainer 的 records ,數據量太大會通過 recordsInDisk 落盤;hash表是存放到 rowContainer的hashTable中;
-
probeSide 表匹配 buildSide 表的時候是單線程匹配還是多線程匹配?如果是多線程匹配,那么如何分配匹配的數據呢?
匹配是多線程匹配的,默認concurrency是5;它們之間傳遞數據是通過 channel 來傳遞數據,各自在獲取數據的時候會根據自己的線程id從 probeResultChs 數組中獲取 channel 並訂閱其中的數據;
Reference
https://pingcap.com/zh/blog/tidb-source-code-reading-9
https://github.com/xieyu/blog/blob/master/src/tidb/hash-join.md