Join (Inner Join)
Join 算法
https://clickhouse.com/docs/en/operations/settings/settings/#settings-join_algorithm
Specifies
JOIN algorithm.
Possible values:
hash
—
Hash join algorithm is used.
partial_merge
—
Sort-merge algorithm is used.
prefer_partial_merge
— ClickHouse always tries to use
merge
join if possible.
auto
— ClickHouse tries to change
hash
join to
merge
join on the fly to avoid out of memory.
Default value:
hash
.
QueryPlan
MacBook.local :) explain select * from t1 join t2 on t1.x = t2.x1 EXPLAIN SELECT * FROM t1 INNER JOIN t2 ON t1.x = t2.x1 Query id: b7068d63-dc61-4389-a12c-ea8016e32bc0 ┌─explain──────────────────────────────────────────────────────────────────────────────────────┐ │ Expression ((Projection + Before ORDER BY)) │ │ Join (JOIN) │ │ Expression (Before JOIN) │ │ SettingQuotaAndLimits (Set limits and quota after reading from storage) │ │ ReadFromMergeTree │ │ Expression ((Joined actions + (Rename joined columns + (Projection + Before ORDER BY)))) │ │ SettingQuotaAndLimits (Set limits and quota after reading from storage) │ │ ReadFromMergeTree │ └──────────────────────────────────────────────────────────────────────────────────────────────┘ 8 rows in set. Elapsed: 0.003 sec.
查看 Pipeline
MacBook.local :) explain pipeline select * from t1 join t2 on t1.x = t2.x1 EXPLAIN PIPELINE SELECT * FROM t1 INNER JOIN t2 ON t1.x = t2.x1 Query id: c054b9e0-d82c-4bc4-a22d-64dd9f556cf0 ┌─explain──────────────────────────┐ │ (Expression) │ │ ExpressionTransform │ │ (Join) │ │ JoiningTransform 2 → 1 │ │ FillingRightJoinSide │ │ (Expression) │ │ ExpressionTransform │ │ (SettingQuotaAndLimits) │ │ (ReadFromMergeTree) │ │ MergeTreeInOrder 0 → 1 │ │ (Expression) │ │ ExpressionTransform │ │ (SettingQuotaAndLimits) │ │ (ReadFromMergeTree) │ │ MergeTreeInOrder 0 → 1 │ └──────────────────────────────────┘ 15 rows in set. Elapsed: 0.005 sec. MacBook.local :)
HashJoin 本質
Table1 INNER JOIN Table2
- Table2 被轉換成一個HashTable1.
- Table1 去HashTable1中查找,如果發現key值匹配, 就將相關列的行的值copy到結果級。如果右表有多個行匹配,那么就copy多個對應行的值到對應列。
舉例
Table1 Table2 x y x1 y1 -------- ---------- 1 'a' 1 'c' 2 'b' 1 'd' ------------------- Exeute: select * from Table1 Inner Join Table2 on Table1.x = Table2.x1 Result: x y x1 y1 -------------------- 1 'a' 1 'c' 1 'a' 1 'd' --------------------
HashJoin 的關鍵
遍歷左表key column的同時,遇到多個匹配需要duplicate,或者filter。
ClickHouse 中 HashJoin的實現
│ JoiningTransform 2 → 1 │ │ FillingRightJoinSide
FillingRightJoinSideTransform
本質:使用右表構建HashTable.
HashTable結構示意圖
HashTable 可擴容. [14789223245, [1,3,4,[Column1Ptr, Column2Ptr]] ] --HashCell ... [79137398732, [2,[Column3Ptr]] --HashCell ... [76349271791, [5,[Column5Ptr]] --HashCell ... [hash值, [row_number, [指向相關列的指針]]] //
HashTable 在ClickHouse的實現
HashJoin使用的HashTable與LowCardinality結構中使用的一樣。HashJoin結構體中創建HashTable,HashCell中存儲StringRef與RowsRef,並且保存當前key的hash值。
(細節)Join 構建HashMap debug 一覽.

(細節)構建map_
'b' 是右表第一個元素.

為'b'在hashTable中找位置。 b被放在HashTable中13的位置.

構建一個holder.key.data

構建一個在buf對應的內存位置構建一個HashCell.

更新buf中的value(*this)的saved_hash值.

接着更新 'a'

(細節)RowRef 數據結構

HashTable debug細膩.
通過右表key的hash值,我們可以從RowRef中的Block快速通過Column的隨機訪問快速拿到任意列的值,可以拿到所有列的相關索引的值。

struct RowRef { using SizeT = uint32_t; /// Do not use size_t cause of memory economy const Block * block = nullptr; SizeT row_num = 0; RowRef() {} RowRef(const Block * block_, size_t row_num_) : block(block_), row_num(row_num_) {} };
第二個'a'寫入,會在對應的RowsRef中增加row_number。
if (emplace_result.isInserted()) new (&emplace_result.getMapped()) typename Map::mapped_type(stored_block, i); else { /// The first element of the list is stored in the value of the hash table, the rest in the pool. emplace_result.getMapped().insert({stored_block, i}, pool); }
可以看到創建HashTable,包含了其他右表的列的信息。也就是說在后續查找某列中i row的值,可以直接隨機查找。

JoiningTransform::work().
此時需要FillingRightJoinSide構建的Join object。
讀取參與Join的左側的Chunk (columns)。
開始HashJoin時,需要使用Join結構中的maps_ ,

Block使用的是左側列組成的Block.

右邊表需要被加到最終結果block中的column。 block_with_columns_to_add 變量

根據block_with_columns_to_add 構建added_columns。

SwitchJoinRightColumns 最重要的函數 filter 與replicate
參與計算這兩個關鍵的要素,只需要參與Join 左表的key column 就可以。其他的filter和replicated都是和左表的key column保持一致。 這里的列都是新建並填充的,這樣保證在匹配/不匹配的情況下填充時的靈活性。
size_t rows = added_columns.rows_to_add; // rows_to_add 是 左表 column的行. 此時右表的可以被map表示. filter = IColumn::Filter(rows, 0);// 以左表行數 設置filter. 比較合理 added_columns.offsets_to_replicate = std::make_unique<IColumn::Offsets>(rows); // 以左表行數設置 需要復制的行. 合理. 如果是左表[a,a] -> 右表[a] => 那么左表行最終為[a,a]合理。 for (auto &row: rows){ // 第一層的過濾,目前不知道哪個算子提供這個join_mask_columns。測試時為Nullptr,即所有行都不進行過濾。 bool row_acceptable = !added_columns.isRowFiltered(i); // 如果這行row需要被保留,我們需要查看這個left_key_column的第i行在map中的位置.在hashTable中查找。 獲取 類型為String 類型的key // StringRef key(chars + offsets[row - 1], offsets[row] - offsets[row - 1] - 1); auto find_result = row_acceptable ? key_getter.findKey(map, i, pool) : FindResult(); if (find_result.isFound()){ setUsed<need_filter>(filter, i); 設置相應filter[i]為1. // 將當前所有能和當前左表key join上的row添加到所有左邊列。 // columns[i] 開啟復制模式,因為columns這里一開始都是empty. addFoundRowAll<Map, add_missing>(mapped, added_columns, current_offset); // 通過對各個列調用此方法來完成 左表 [a] -> [a,a] 右表,讓 所有的列都完成對應的復制 [a,a]。 //void insertFrom(const IColumn & src, size_t n) override { // data.push_back(assert_cast<const Self &>(src).getData()[n]); // } // } if constexpr need_replicated){ // 當前row i 更新對offsets進行更新.這里主要是為了后面replicate時,可以采用批量的方式. memcpyxxx 一種優化方式. (*added_columns.offsets_to_replicate)[i] = current_offset; } }
執行完Join算法以后。將結果存放到block
for (size_t i = 0; i < added_columns.size(); ++i) block.insert(added_columns.moveColumn(i));
返回前的replicate 操作
- ?最后返回之前需要創建一個新的column 新建 需要replicate的列。目前不知道最后一步實現的replicate的含義是什么。
for (size_t i = 0; i < existing_columns; ++i) block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->replicate(*offsets_to_replicate);
ColumnPtr ColumnString::replicate(const Offsets & replicate_offsets) const { size_t col_size = size(); if (col_size != replicate_offsets.size()) throw Exception("Size of offsets doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH); auto res = ColumnString::create(); if (0 == col_size) return res; Chars & res_chars = res->chars; Offsets & res_offsets = res->offsets; res_chars.reserve(chars.size() / col_size * replicate_offsets.back()); res_offsets.reserve(replicate_offsets.back()); Offset prev_replicate_offset = 0; Offset prev_string_offset = 0; Offset current_new_offset = 0; for (size_t i = 0; i < col_size; ++i) { size_t size_to_replicate = replicate_offsets[i] - prev_replicate_offset; size_t string_size = offsets[i] - prev_string_offset; for (size_t j = 0; j < size_to_replicate; ++j) { current_new_offset += string_size; res_offsets.push_back(current_new_offset); res_chars.resize(res_chars.size() + string_size); // 這里就是利用了 前面記錄的offset進行memcpy copy的優化. memcpySmallAllowReadWriteOverflow15( &res_chars[res_chars.size() - string_size], &chars[prev_string_offset], string_size); } prev_replicate_offset = replicate_offsets[i]; prev_string_offset = offsets[i]; } return res; }
總結
- 計算右表key的列hashTable,並將相關聯的其他右表列信息也更新在map中。
- 根據map計算左表的最終結果放到block.
- 所有的計算重點都在 SwitchJoinRightColumns<KIND, STRICTNESS>(maps_, added_columns, data->type, null_map, used_flags) 函數中。
- 將右表key的列加入到block
- 返回。
收獲
- RightTable到->Join,然后不同的JoiningTransform可以自由Join,將結果寫入到local變量added_columns,這樣可以使join並行化。
- HashTable 作用起到了關鍵性的作用。
-
問題:
- 在BuildQueryPlan 執行FillingRightJoinSide::transformHeader()和JoiningTransform::transformHeader()的意義是什么?
- 在JoiningTransform:: joinRightColumns()執行結束之后的對 block中已有的column進行replicate的作用是什么?
- Interpreter 執行優化?
- BlockStreaming.
- QueryPipeline = pipeline.
InBlockInputStream
Block RemoteQueryExecutor::read() { if (!sent_query) { sendQuery(); if (context->getSettingsRef().skip_unavailable_shards && (0 == connections->size())) return {}; } while (true) { if (was_cancelled) return Block(); Packet packet = connections->receivePacket(); if (auto block = processPacket(std::move(packet))) return *block; else if (got_duplicated_part_uuids) return std::get<Block>(restartQueryWithoutDuplicatedUUIDs()); } }

IBlockInputStream可以 輸出 Block(data) 給調用者。以RemoteBlockInputStream為例