分享一下筆者研讀ClickHouse源碼時分析函數調用的實現,重點在於分析Clickhouse查詢層實現的接口,以及Clickhouse是如何利用這些接口更好的實現向量化的。本文的源碼分析基於ClickHouse v19.16.2.2的版本。
1.舉個栗子
下面是一個簡單的SQL語句
SELECT a, abs(b) FROM test
這里調用一個abs的函數,我們先打開ClickHouse的Debug日志看一下執行計划。(當前ClickHouse不支持使用Explain語句來查看執行計划,這個確實是很蛋疼的~~)
這里分為了3個流
- ExpressionBlockInputStream: 最頂層的Expression,實現了Projection,這個和我們今天主題無關,本質上就是實現一個簡單列的改名操作。比如
select a as aaa from test
這里將列名從a
改為aaa
. - ExpressionBlockInputStream: 第二個ExpressionBlockInputStream就是我們關注的重點的,后面的章節會詳細的剖析它。它主要完成了下面兩件事情
-
- 對
b
列執行函數abs
,生成新的一列數據abs(b)
- 對
-
remove column b
, 將b
列刪除。新的Block為a, abs(b)
-
- TinyLogBlockInputStream: 存儲引擎的讀取流,這里標識了底層表的存儲引擎為
append only
的TinyLog
。
從上面的執行計划可以看出,Clickhouse的表達式計算是由ExpressionBlockInputStream來完成的,而這個類是一個很強大的類,可以實現:Projection, Join, Apply_Function, Add Column, Remove Column
等。
2. 實現流程的梳理
- ExpressionBlockInputSteam readImpl()的實現
直接上代碼,看一下ExpressionBlockInputStream的讀取方法的實現
Block ExpressionBlockInputStream::readImpl()
{
Block res = children.back()->read();
if (res)
expression->execute(res);
return res;
}
這里的實現很簡單,就是不停從底層的流讀取數據Block,Block可以理解為Doris之中的Batch,相當一組數據,然后在Block之上執行表達式計算,之后返回給上節點。所以這里的重點就在於表達式計算的實現類ExpressionActions
的指針expression
,它封裝了一組表達式的Action
,在Block上依次執行這些Action
。
- Action excute的實現
Action支持多種操作,包含了:
enum Type {
ADD_COLUMN,
REMOVE_COLUMN,
COPY_COLUMN,
APPLY_FUNCTION,
ARRAY_JOIN,
JOIN,
PROJECT,
ADD_ALIASES,
};
這里我們重點關注的是函數執行的實現,可以直接定位到APPLY_FUNCTION
的代碼:
case APPLY_FUNCTION:
{
1. 從Block之中篩選出對應的參數數組
ColumnNumbers arguments(argument_names.size());
for (size_t i = 0; i < argument_names.size(); ++i)
{
arguments[i] = block.getPositionByName(argument_names[i]);
}
2.新建一個結果的列,對應函數的結果會寫入結果列,把結果列寫入的Block之中
size_t num_columns_without_result = block.columns();
block.insert({ nullptr, result_type, result_name});
3.調用對應的函數指針,執行函數調用
function->execute(block, arguments, num_columns_without_result, input_rows_count, dry_run);
這里我保留一部分關鍵的執行路徑代碼,並添加了對應的中文注釋。
選出了函數執行的參數,並添加了新的一個空列用於存儲函數abs(b)
的最終結果,新的列的偏移量就是num_columns_without_result
指定的。
接下來這里我們這里重點關注Function的execute接口的參數就可以了:
- block:實際存儲的數據
- arguments:列的參數偏移量
- num_columns_without_result:函數計算結果的寫入列
- input_rows_count: block之中的數據行數
這里本質上是調用了接口IFunction的接口,它的子類需要實現對應的excuteImpl
的方法:
class IFunction : public std::enable_shared_from_this<IFunction>,
public FunctionBuilderImpl, public IFunctionBase, public PreparedFunctionImpl
{
public:
/// TODO: make const
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) override = 0;
而最終的實現是IFunction的子類:FunctionUnaryArithmetic實現了該方法,該方法的核心代碼如下:
if (auto col = checkAndGetColumn<ColumnVector<T0>>(block.getByPosition(arguments[0]).column.get()))
{
auto col_res = ColumnVector<typename Op<T0>::ResultType>::create();
auto & vec_res = col_res->getData();
vec_res.resize(col->getData().size());
UnaryOperationImpl<T0, Op<T0>>::vector(col->getData(), vec_res);
block.getByPosition(result).column = std::move(col_res);
return true;
}
這里最為核心的是,將arguments
的列作為參數列取出為變量col
, 而col_res
創建了個新的列,存放result的結果。這里最重要的方法就是
UnaryOperationImpl<T0, Op<T0>>::vector
,從名字上也能看出,它實現了函數的向量化計算,我們繼續看這部分代碼:
static void NO_INLINE vector(const ArrayA & a, ArrayC & c)
{
size_t size = a.size();
for (size_t i = 0; i < size; ++i)
c[i] = Op::apply(a[i]);
}
顯然,這就是一個完美的向量化優化代碼,沒有任何if, switch, break
的分支跳轉語句,for循環的長度也是已知的。這里的Op::apply就是咱們調用的AbsImpl::apply
函數的實現:
template <typename A>
struct AbsImpl
{
static inline NO_SANITIZE_UNDEFINED ResultType apply(A a)
{
if constexpr (IsDecimalNumber<A>)
return a < 0 ? A(-a) : a;
else if constexpr (std::is_integral_v<A> && std::is_signed_v<A>)
return a < 0 ? static_cast<ResultType>(~a) + 1 : a;
else if constexpr (std::is_integral_v<A> && std::is_unsigned_v<A>)
return static_cast<ResultType>(a);
else if constexpr (std::is_floating_point_v<A>)
return static_cast<ResultType>(std::abs(a));
}
走的這里,相當於走完了整個函數調用的流程。而其他多參數的函數的實現也是大同小異,如:
struct BinaryOperationImplBase
{
using ResultType = ResultType_;
static void NO_INLINE vector_vector(const PaddedPODArray<A> & a, const PaddedPODArray<B> & b, PaddedPODArray<ResultType> & c)
{
size_t size = a.size();
for (size_t i = 0; i < size; ++i)
c[i] = Op::template apply<ResultType>(a[i], b[i]);
}
而執行完成abs(b)
函數之后,b
列就沒有用處了,Clickhouse會調用另一個Action:REMOVE_COLUM
在Block之中刪除b
列,這樣就得到了我們所需要的兩個列a, abs(b)
組成的新的Block。
3.要點梳理
第二小節梳理完成了一整個函數調用的流程,這里重點梳理一下實現向量化函數調要點:
- ClickHouse的計算是純粹函數式編程式的計算,不會改變原先的列狀態,而是產生一組新的列。
- 各個函數的實現需要繼承IFunction的接口,實現
execute
的方法,該方法基於Block進行執行。 - 最終繼承IFunction接口的實現類都需要override的
execute
方法,並真正實現對應的函數vectoer
的調用,這里Clickhouse確保了For循環的長度是已知的,同時沒有對應跳轉語句,確保了編譯器進行向量化優化時有足夠的親和度。(這里可以打開gcc的編譯flag:-fopt-info-vec
或者clang的編譯選項:-Rpass=loop-vectorize
來查看實際源代碼的向量化情況)
4. 小結
好了,到這里也就把ClickHouse函數調用的代碼梳理完了。
除了abs函數外,其他的函數的執行也是同樣通過類似的方式依次來實現和處理的,源碼閱讀的步驟也可以參照筆者的分析流程來參考。
筆者是一個ClickHouse的初學者,對ClickHouse有興趣的同學,歡迎多多指教,交流。