ClickHouse源碼筆記6:探究列式存儲系統的排序


分析完成了聚合以及向量化過濾,向量化的函數計算之后。本篇,筆者將分析數據庫的一個重要算子:排序。讓我們從源碼的角度來剖析ClickHouse作為列式存儲系統是如何實現排序的。

本系列文章的源碼分析基於ClickHouse v19.16.2.2的版本。

1.執行計划

老規矩,咱們還是先從一個簡單的查詢出發,通過一步步的通過執行計划按圖索驥ClickHouse的執行邏輯。

select * from test order by k1;

咱們先嘗試打開ClickHouse的Debug日志看一下具體的執行的pipeline
image.png

這里分為了5個流,而咱們所需要關注的流已經呼之欲出了MergeSortingPartialSorting,ClickHouse先從存儲引擎的數據讀取數據,並且執行函數運算,並對數據先進行部分的排序,然后對於已經有序的數據在進行MergeSort,得出最終有序的數據。

2. 實現流程的梳理

那咱們接下來要梳理的代碼也很明確了,就是PartialSortingBlockInputStreamMergingSortedBlockInputStream

  • PartialSortingBlockInputStream的實現
    PartialSortingBlockInputStream的實現很簡單,咱們直接看代碼吧:
Block PartialSortingBlockInputStream::readImpl()
{
    Block res = children.back()->read();
    sortBlock(res, description, limit);
    return res;
}

它從底層的流讀取數據Block,Block可以理解為Doris之中的Batch,相當一批行的數據,然后根據自身的成員變量SortDescription來對單個Block進行排序,並根據limit進行長度截斷。

SortDescription是一個vector,每個成員描述了單個排序列的排序規則。比如
: null值的排序規則,是否進行逆序排序等。

/// Description of the sorting rule for several columns.
using SortDescription = std::vector<SortColumnDescription>;
  • sortBlock的函數實現

接下來,我們來看看sortBlock函數的實現,看看列式的執行系統是如何利用上述信息進行數據排序的。

void sortBlock(Block & block, const SortDescription & description, UInt64 limit)
{
    /// If only one column to sort by
    if (description.size() == 1)
    {
        bool reverse = description[0].direction == -1;

        const IColumn * column = !description[0].column_name.empty()
            ? block.getByName(description[0].column_name).column.get()
            : block.safeGetByPosition(description[0].column_number).column.get();

        IColumn::Permutation perm;
        if (needCollation(column, description[0]))
        {
            const ColumnString & column_string = typeid_cast<const ColumnString &>(*column);
            column_string.getPermutationWithCollation(*description[0].collator, reverse, limit, perm);
        }
        else
            column->getPermutation(reverse, limit, description[0].nulls_direction, perm);

        size_t columns = block.columns();
        for (size_t i = 0; i < columns; ++i)
            block.getByPosition(i).column = block.getByPosition(i).column->permute(perm, limit);
    }

這里需要分為兩種情況討論:1. 單列排序。2.多列排序。多列排序與單列的實現大同小異,所以我們先從單列排序的代碼開始庖丁解牛。它的核心代碼就是下面的這四行:

    column->getPermutation(reverse, limit, description[0].nulls_direction, perm);
    size_t columns = block.columns();
    for (size_t i = 0; i < columns; ++i)
           block.getByPosition(i).column = block.getByPosition(i).column->permute(perm, limit);

先通過單列排序,拿到每一列在排序之后的IColumn::Permutation perm;。然后Block之中的每一列都利用這個perm, 生成一個新的排序列,替換舊的列之后,就完成Block的排序了。
生成Perm

如上圖所示,Permutation是一個長度為limitPodArray, 它標識了根據排序列排序之后的排序位置。后續就按照這個perm規則利用函數permute生成新的列,就是排序已經完成的列了。

ColumnPtr ColumnVector<T>::permute(const IColumn::Permutation & perm, size_t limit) const
{
    typename Self::Container & res_data = res->getData();
    for (size_t i = 0; i < limit; ++i)
        res_data[i] = data[perm[i]];

    return res;
}

這里細心的朋友會發現,String列在sortBlock函數之中做了一些額外的判斷

  if (needCollation(column, description[0])) {
            const ColumnString & column_string = typeid_cast<const ColumnString &>(*column);
            column_string.getPermutationWithCollation(*description[0].collator, reverse, limit, perm);
 }

這部分是一個特殊的字符串生成perm的邏輯,ClickHouse支持用不同的編碼進行字符串列的排序。比如通過GBK編碼進行排序的話,那么中文的排序順序將是基於拼音順序的。

  • getPermutation的實現
    所以,在ClickHouse的排序過程之中。getPermutation是整個排序算子實現的重中之重, 它是Column類的一個虛函數,也就是說每一個不同的數據類型的列都可以實現自己的排序邏輯。我們通過ColumnVector的實現,來管中規豹一把。
template <typename T>
void ColumnVector<T>::getPermutation(bool reverse, size_t limit, int nan_direction_hint, IColumn::Permutation & res) const
{
        if (reverse)
            std::partial_sort(res.begin(), res.begin() + limit, res.end(), greater(*this, nan_direction_hint));
        else
            std::partial_sort(res.begin(), res.begin() + limit, res.end(), less(*this, nan_direction_hint));
    }
    else
    {
        /// A case for radix sort
        if constexpr (std::is_arithmetic_v<T> && !std::is_same_v<T, UInt128>)
        {
                return;
            }
        }

        /// Default sorting algorithm.
        for (size_t i = 0; i < s; ++i)
            res[i] = i;

       pdqsort(res.begin(), res.end(), less(*this, nan_direction_hint));
    }
}

這部分代碼較多,筆者簡化了一下這部分的邏輯。

  • 如果存在limit條件,並且列的長度大於limit,采用std::partial_sort進行perm的排序。
  • 如果為數字類型,並且不為UInt128類型時,則采用Radix Sort計數排序來對perm進行排序。
  • 如不滿足前二者的條件,則使用快速排序作為最終的默認實現。

好的,看到這里。已經完整的梳理了PartialSortingBlockInputStream,得到了每一個輸出的Block已經按照我們的排序規則進行排序了。接下來就要請出MergeSortingBlockInputStream來進行最終的排序工作。

  • MergeSortingBlockInputStream的實現
    從名字上也能看出來,這里需要完成一次歸並排序,來得到最終有序的排序結果。至於排序的對象,自然上面通過PartialSortingBlockInputStream輸出的Block了。

直接定位到readImpl()的實現,ClickHouse這里實現了Spill to disk的外部排序邏輯,這里為了簡化,筆者先暫時拿掉這部分外部排序的邏輯。

Block MergeSortingBlockInputStream::readImpl()
{
    /** Algorithm:
      * - read to memory blocks from source stream;
      */

    /// If has not read source blocks.
    if (!impl)
    {
        while (Block block = children.back()->read())
        {
            blocks.push_back(block);
            sum_rows_in_blocks += block.rows();
            sum_bytes_in_blocks += block.allocatedBytes();

            /** If significant amount of data was accumulated, perform preliminary merging step.
              */
            if (blocks.size() > 1
                && limit
                && limit * 2 < sum_rows_in_blocks   /// 2 is just a guess.
                && remerge_is_useful
                && max_bytes_before_remerge
                && sum_bytes_in_blocks > max_bytes_before_remerge)
            {
                remerge();
            }

        if ((blocks.empty() && temporary_files.empty()) || isCancelledOrThrowIfKilled())
            return Block();

        if (temporary_files.empty())
        {
            impl = std::make_unique<MergeSortingBlocksBlockInputStream>(blocks, description, max_merged_block_size, limit);
        }
       
    Block res = impl->read();
    return res;
}

由上面代碼可以看到,MergeSortingBlockInputStream這部分就是不斷從底層的PartialSortingBlockInputStream讀取出來,並存儲全部存儲下來。最終讀取完成之后,利用MergeSortingBlocksBlockInputStream類,完成所有Blocks的歸並排序工作。而MergeSortingBlocksBlockInputStream類就是簡單完成利用堆進行多路歸並排序的過程代碼,筆者在這里就不再展開了,感興趣的同學可以自行參考MergeSortingBlockInputStream.cpp部分的實現。

3.要點梳理

第二小節梳理完ClickHouse的排序算子的實現流程,這里進行一些簡單的要點小結:

  1. ClickHouse的排序實現需要利用排序列生成對應的perm,最終利用perm完成每一個Block的排序。

  2. 所以每一個不同數據類型的列,都需要實現getPermutationpermute來實現排序。並且可以根據數據類型,選擇不同的排序實現。比如radix sort的時間復雜度為O(n),相對快速排序的時間復雜度就存在了明顯的優勢。

  3. 排序算法存在大量的數據依賴,所以是很難發揮SIMD的優勢的。只有在radix sort下才些微有些部分可以向量化,所以相對於非向量化的實現,不存在太多性能上的優勢。

4. 小結

OK,到此為止,咱們可以從Clickhouse的源碼實現之中梳理完成列式的存儲系統是如何實現排序的。
當然,這部分跳過了一部分重要的實現:Spill to disk。這個是確保在一定的內存限制之下,對海量數據進行排序時,可以利用磁盤來緩存排序的中間結果。這部分的實現也很有意思,感興趣的朋友,可以進一步展開來看這部分的實現。
筆者是一個ClickHouse的初學者,對ClickHouse有興趣的同學,歡迎多多指教,交流。

5. 參考資料

官方文檔
ClickHouse源代碼


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM