Join (Inner Join)
Join 算法
https://clickhouse.com/docs/en/operations/settings/settings/#settings-join_algorithm
Specifies
JOIN algorithm.
Possible values:
hash —
Hash join algorithm is used.
partial_merge —
Sort-merge algorithm is used.
prefer_partial_merge — ClickHouse always tries to use
merge join if possible.
auto — ClickHouse tries to change
hash join to
merge join on the fly to avoid out of memory.
Default value:
hash.
QueryPlan
MacBook.local :) explain select * from t1 join t2 on t1.x = t2.x1 EXPLAIN SELECT * FROM t1 INNER JOIN t2 ON t1.x = t2.x1 Query id: b7068d63-dc61-4389-a12c-ea8016e32bc0 ┌─explain──────────────────────────────────────────────────────────────────────────────────────┐ │ Expression ((Projection + Before ORDER BY)) │ │ Join (JOIN) │ │ Expression (Before JOIN) │ │ SettingQuotaAndLimits (Set limits and quota after reading from storage) │ │ ReadFromMergeTree │ │ Expression ((Joined actions + (Rename joined columns + (Projection + Before ORDER BY)))) │ │ SettingQuotaAndLimits (Set limits and quota after reading from storage) │ │ ReadFromMergeTree │ └──────────────────────────────────────────────────────────────────────────────────────────────┘ 8 rows in set. Elapsed: 0.003 sec.
查看 Pipeline
MacBook.local :) explain pipeline select * from t1 join t2 on t1.x = t2.x1 EXPLAIN PIPELINE SELECT * FROM t1 INNER JOIN t2 ON t1.x = t2.x1 Query id: c054b9e0-d82c-4bc4-a22d-64dd9f556cf0 ┌─explain──────────────────────────┐ │ (Expression) │ │ ExpressionTransform │ │ (Join) │ │ JoiningTransform 2 → 1 │ │ FillingRightJoinSide │ │ (Expression) │ │ ExpressionTransform │ │ (SettingQuotaAndLimits) │ │ (ReadFromMergeTree) │ │ MergeTreeInOrder 0 → 1 │ │ (Expression) │ │ ExpressionTransform │ │ (SettingQuotaAndLimits) │ │ (ReadFromMergeTree) │ │ MergeTreeInOrder 0 → 1 │ └──────────────────────────────────┘ 15 rows in set. Elapsed: 0.005 sec. MacBook.local :)
HashJoin 本質
Table1 INNER JOIN Table2
- Table2 被轉換成一個HashTable1.
- Table1 去HashTable1中查找,如果發現key值匹配, 就將相關列的行的值copy到結果級。如果右表有多個行匹配,那么就copy多個對應行的值到對應列。
舉例
Table1 Table2
x y x1 y1
-------- ----------
1 'a' 1 'c'
2 'b' 1 'd'
-------------------
Exeute:
select * from Table1 Inner Join Table2 on Table1.x = Table2.x1
Result:
x y x1 y1
--------------------
1 'a' 1 'c'
1 'a' 1 'd'
--------------------
HashJoin 的關鍵
遍歷左表key column的同時,遇到多個匹配需要duplicate,或者filter。
ClickHouse 中 HashJoin的實現
│ JoiningTransform 2 → 1 │ │ FillingRightJoinSide
FillingRightJoinSideTransform
本質:使用右表構建HashTable.
HashTable結構示意圖
HashTable 可擴容.
[14789223245, [1,3,4,[Column1Ptr, Column2Ptr]] ] --HashCell
...
[79137398732, [2,[Column3Ptr]] --HashCell
...
[76349271791, [5,[Column5Ptr]] --HashCell
...
[hash值, [row_number, [指向相關列的指針]]] //
HashTable 在ClickHouse的實現
HashJoin使用的HashTable與LowCardinality結構中使用的一樣。HashJoin結構體中創建HashTable,HashCell中存儲StringRef與RowsRef,並且保存當前key的hash值。
(細節)Join 構建HashMap debug 一覽.
(細節)構建map_
'b' 是右表第一個元素.
為'b'在hashTable中找位置。 b被放在HashTable中13的位置.
構建一個holder.key.data
構建一個在buf對應的內存位置構建一個HashCell.
更新buf中的value(*this)的saved_hash值.
接着更新 'a'
(細節)RowRef 數據結構
HashTable debug細膩.
通過右表key的hash值,我們可以從RowRef中的Block快速通過Column的隨機訪問快速拿到任意列的值,可以拿到所有列的相關索引的值。
struct RowRef
{
using SizeT = uint32_t; /// Do not use size_t cause of memory economy
const Block * block = nullptr;
SizeT row_num = 0;
RowRef() {}
RowRef(const Block * block_, size_t row_num_) : block(block_), row_num(row_num_) {}
};
第二個'a'寫入,會在對應的RowsRef中增加row_number。
if (emplace_result.isInserted())
new (&emplace_result.getMapped()) typename Map::mapped_type(stored_block, i);
else
{
/// The first element of the list is stored in the value of the hash table, the rest in the pool.
emplace_result.getMapped().insert({stored_block, i}, pool);
}
可以看到創建HashTable,包含了其他右表的列的信息。也就是說在后續查找某列中i row的值,可以直接隨機查找。
JoiningTransform::work().
此時需要FillingRightJoinSide構建的Join object。
讀取參與Join的左側的Chunk (columns)。
開始HashJoin時,需要使用Join結構中的maps_ ,
Block使用的是左側列組成的Block.
右邊表需要被加到最終結果block中的column。 block_with_columns_to_add 變量
根據block_with_columns_to_add 構建added_columns。
SwitchJoinRightColumns 最重要的函數 filter 與replicate
參與計算這兩個關鍵的要素,只需要參與Join 左表的key column 就可以。其他的filter和replicated都是和左表的key column保持一致。 這里的列都是新建並填充的,這樣保證在匹配/不匹配的情況下填充時的靈活性。
size_t rows = added_columns.rows_to_add; // rows_to_add 是 左表 column的行. 此時右表的可以被map表示.
filter = IColumn::Filter(rows, 0);// 以左表行數 設置filter. 比較合理
added_columns.offsets_to_replicate = std::make_unique<IColumn::Offsets>(rows); // 以左表行數設置 需要復制的行. 合理. 如果是左表[a,a] -> 右表[a] => 那么左表行最終為[a,a]合理。
for (auto &row: rows){
// 第一層的過濾,目前不知道哪個算子提供這個join_mask_columns。測試時為Nullptr,即所有行都不進行過濾。
bool row_acceptable = !added_columns.isRowFiltered(i);
// 如果這行row需要被保留,我們需要查看這個left_key_column的第i行在map中的位置.在hashTable中查找。 獲取 類型為String 類型的key
// StringRef key(chars + offsets[row - 1], offsets[row] - offsets[row - 1] - 1);
auto find_result = row_acceptable ? key_getter.findKey(map, i, pool) : FindResult();
if (find_result.isFound()){
setUsed<need_filter>(filter, i); 設置相應filter[i]為1.
// 將當前所有能和當前左表key join上的row添加到所有左邊列。
// columns[i] 開啟復制模式,因為columns這里一開始都是empty.
addFoundRowAll<Map, add_missing>(mapped, added_columns, current_offset); // 通過對各個列調用此方法來完成 左表 [a] -> [a,a] 右表,讓 所有的列都完成對應的復制 [a,a]。
//void insertFrom(const IColumn & src, size_t n) override {
// data.push_back(assert_cast<const Self &>(src).getData()[n]);
// }
//
}
if constexpr need_replicated){
// 當前row i 更新對offsets進行更新.這里主要是為了后面replicate時,可以采用批量的方式. memcpyxxx 一種優化方式.
(*added_columns.offsets_to_replicate)[i] = current_offset;
}
}
執行完Join算法以后。將結果存放到block
for (size_t i = 0; i < added_columns.size(); ++i)
block.insert(added_columns.moveColumn(i));
返回前的replicate 操作
- ?最后返回之前需要創建一個新的column 新建 需要replicate的列。目前不知道最后一步實現的replicate的含義是什么。
for (size_t i = 0; i < existing_columns; ++i)
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->replicate(*offsets_to_replicate);
ColumnPtr ColumnString::replicate(const Offsets & replicate_offsets) const
{
size_t col_size = size();
if (col_size != replicate_offsets.size())
throw Exception("Size of offsets doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
auto res = ColumnString::create();
if (0 == col_size)
return res;
Chars & res_chars = res->chars;
Offsets & res_offsets = res->offsets;
res_chars.reserve(chars.size() / col_size * replicate_offsets.back());
res_offsets.reserve(replicate_offsets.back());
Offset prev_replicate_offset = 0;
Offset prev_string_offset = 0;
Offset current_new_offset = 0;
for (size_t i = 0; i < col_size; ++i)
{
size_t size_to_replicate = replicate_offsets[i] - prev_replicate_offset;
size_t string_size = offsets[i] - prev_string_offset;
for (size_t j = 0; j < size_to_replicate; ++j)
{
current_new_offset += string_size;
res_offsets.push_back(current_new_offset);
res_chars.resize(res_chars.size() + string_size);
// 這里就是利用了 前面記錄的offset進行memcpy copy的優化.
memcpySmallAllowReadWriteOverflow15(
&res_chars[res_chars.size() - string_size], &chars[prev_string_offset], string_size);
}
prev_replicate_offset = replicate_offsets[i];
prev_string_offset = offsets[i];
}
return res;
}
總結
- 計算右表key的列hashTable,並將相關聯的其他右表列信息也更新在map中。
- 根據map計算左表的最終結果放到block.
- 所有的計算重點都在 SwitchJoinRightColumns<KIND, STRICTNESS>(maps_, added_columns, data->type, null_map, used_flags) 函數中。
- 將右表key的列加入到block
- 返回。
收獲
- RightTable到->Join,然后不同的JoiningTransform可以自由Join,將結果寫入到local變量added_columns,這樣可以使join並行化。
- HashTable 作用起到了關鍵性的作用。
-
問題:
- 在BuildQueryPlan 執行FillingRightJoinSide::transformHeader()和JoiningTransform::transformHeader()的意義是什么?
- 在JoiningTransform:: joinRightColumns()執行結束之后的對 block中已有的column進行replicate的作用是什么?
- Interpreter 執行優化?
- BlockStreaming.
- QueryPipeline = pipeline.
InBlockInputStream
Block RemoteQueryExecutor::read()
{
if (!sent_query)
{
sendQuery();
if (context->getSettingsRef().skip_unavailable_shards && (0 == connections->size()))
return {};
}
while (true)
{
if (was_cancelled)
return Block();
Packet packet = connections->receivePacket();
if (auto block = processPacket(std::move(packet)))
return *block;
else if (got_duplicated_part_uuids)
return std::get<Block>(restartQueryWithoutDuplicatedUUIDs());
}
}
IBlockInputStream可以 輸出 Block(data) 給調用者。以RemoteBlockInputStream為例
