Clickhouse執行處理查詢語句(包括DDL,DML)的過程
總體過程
- 啟動線程處理客戶端接入的TCP連接;
- 接收請求數據,交給函數
executeQueryImpl()
處理; executeQueryImpl()
處理查詢的SQL語句字符串;- 生成
QueryPipeline
實例,QueryPipeline
實例可以包含數據也可以僅包含如何讀取數據的信息; - 通過
*PipelineExecutor
例如PullingAsyncPipelineExecutor
執行QueryPipeline
實例,獲得數據結果。
PullingAsyncPipelineExecutor::pull() -> PipelineExecutor::execute()
executeQueryImpl()函數過程
executeQueryImpl()是整個處理流程的重點,她包含如下幾項:
- 解析SQL語句,生成語法樹AST;
- 預處理AST
- AST參數替換成實際值
- With 子句替換
- 各種visitor
- 標准化AST
- 處理帶select的insert語句和不帶select的insert語句
- 通過工廠方法獲得對應的解釋器對象 (InterpreterFactory.cpp 里面找到所有的解釋器)
- 執行解釋器的execute()方法,該方法是所有解釋器的基類
IInterpreter
定義的函數,返回BlockIO
實例,其中包含的最重要的是QueryPipeline
的實例。
BlockIO
是一個IO的抽象,可輸出(select類查詢),也可輸入(insert類查詢),參考以下IInterpreter
的定義。
class IInterpreter
{
public:
/** For queries that return a result (SELECT and similar), sets in BlockIO a stream from which you can read this result.
* For queries that receive data (INSERT), sets a thread in BlockIO where you can write data.
* For queries that do not require data and return nothing, BlockIO will be empty.
*/
virtual BlockIO execute() = 0;
......
......
}
BlockIO
包含query pipeline,process list和callbacks,其中query pipeline是數據的流動管道。
- Select查詢類的解釋器例如
InterpreterSelectQuery
會先構建一個query plan,再從query plan上構建query pipeline。 PullingAsyncPipelineExecutor::pull()
或者PullingPipelineExecutor
拉取QueryPipeline
管道的數據。
解析查詢語句
parseQuery()
函數接收SQL語句字符串和parser,調用parseQueryAndMovePosition()
,最終調用tryParseQuery()
完成解析返回AST樹作為結果。
參數allow_multi_statements
用於控制是否解析多個SQL語句,這個對於我目前的任務非常重要。
ASTPtr parseQueryAndMovePosition(
IParser & parser,
const char * & pos,
const char * end,
const std::string & query_description,
bool allow_multi_statements,
size_t max_query_size,
size_t max_parser_depth)
{
... ...
... ...
}
過程大致分為兩步:
- 將SQL字符串轉成token集合
- parser通過
TokenIterator
遍歷token集合,更新AST結果
最終的AST樹即是解析之后的結果。
每個parser代表一種語法模式,一個parser可以調用另外多個parser。以下是所有的parser。
^IParser$
└── IParser
└── IParserBase
├── IParserColumnDeclaration
├── IParserNameTypePair
├── ParserAdditiveExpression
├── ParserAlias
├── ParserAlterCommand
├── ParserAlterCommand
├── ParserAlterCommandList
├── ParserAlterQuery
├── ParserAlterQuery
├── ParserAlwaysFalse
├── ParserAlwaysTrue
├── ParserArray
├── ParserArrayElementExpression
├── ParserArrayJoin
├── ParserArrayOfLiterals
├── ParserAssignment
├── ParserAsterisk
├── ParserAttachAccessEntity
├── ParserBackupQuery
├── ParserBetweenExpression
├── ParserBool
...... .......
AST語法樹由IAST
的派生實現類的一組實例組成
^IAST$
└── IAST
├── ASTAlterCommand
├── ASTAlterCommand
├── ASTAlterQuery
├── ASTArrayJoin
├── ASTAssignment
├── ASTAsterisk
├── ASTBackupQuery
├── ASTColumnDeclaration
├── ASTColumns
├── ASTColumnsElement
├── ASTColumnsMatcher
... ...
構建Query Pipeline
IInterpreter::execute()
返回的結果 BlockIO
實例中主要組成部分就是QueryPipeline
實例。可以說是由解釋器來構建Query Pipeline的,但是每種解釋器的構建Query Pipeline的方式不同。Select類查詢(最普遍的查詢)是先生成Query Plan,做優化后,再生成最終的Query Pipeline。
IInterpreter::execute()
是解釋器的核心,它會根據三種情況返回BlockIO
實例作為結果。
/** Interpreters interface for different queries.
*/
class IInterpreter
{
public:
/** For queries that return a result (SELECT and similar), sets in BlockIO a stream from which you can read this result.
* For queries that receive data (INSERT), sets a thread in BlockIO where you can write data.
* For queries that do not require data and return nothing, BlockIO will be empty.
*/
virtual BlockIO execute() = 0;
}
構建select 類查詢的Query Plan
而Select查詢類的解釋器比如 InterpreterSelectQuery
的execute() 方法會首先生成QueryPlan
實例,在優化的策略下由QueryPlan
實例去生成QueryPipeline
實例。這也是為什么explain plan
命令只能用於select類型的查詢中。注意這里的 InterpreterSelectQuery::executeImpl()
並不是 InterpreterSelectQuery::execute()
的實現,其實是 InterpreterSelectQuery::buildQueryPlan()
的實現。
以下注釋反映出代碼主要邏輯:
void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<Pipe> prepared_pipe)
{
/** Streams of data. When the query is executed in parallel, we have several data streams.
* If there is no GROUP BY, then perform all operations before ORDER BY and LIMIT in parallel, then
* if there is an ORDER BY, then glue the streams using ResizeProcessor, and then MergeSorting transforms,
* if not, then glue it using ResizeProcessor,
* then apply LIMIT.
* If there is GROUP BY, then we will perform all operations up to GROUP BY, inclusive, in parallel;
* a parallel GROUP BY will glue streams into one,
* then perform the remaining operations with one resulting stream.
*/
}
Query Plan Step
Query Plan Step是Query Plan的組成部分,由基類IQueryPlanStep
和其派生實現類表示。
QueryPlan
實例主要由若干以樹(非二叉樹)的形式組織起來的IQueryPlanStep的實現類的實例構成。每個IQueryPlanStep的實現類的實例會為QueryPipeline產生並織入一組Processor,這步由 updatePipeline()
方法實現。
以下注釋解釋了其中的概要。
/// Add processors from current step to QueryPipeline.
/// Calling this method, we assume and don't check that:
/// * pipelines.size() == getInputStreams.size()
/// * header from each pipeline is the same as header from corresponding input_streams
/// Result pipeline must contain any number of streams with compatible output header is hasOutputStream(),
/// or pipeline should be completed otherwise.
virtual QueryPipelineBuilderPtr updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings & settings) = 0;
Select查詢Pipeline生成實驗
用以下數據表做實驗:
┌─statement───────────────────────────┐
│ CREATE TABLE default.cx1
(
`eventId` Int64,
`案例號` String,
`金額` UInt8
)
ENGINE = MergeTree
ORDER BY (`案例號`, eventId)
SETTINGS index_granularity = 8192 │
└────────────────────────────────────┘
最簡單的SELECT
explain pipeline select * from cx1
┌─explain───────────────────────┐
│ (Expression) │ # query step 名字
│ ExpressionTransform × 4 │ # 4個 ExpressionTransform processor
│ (SettingQuotaAndLimits) │ # query step 名字
│ (ReadFromMergeTree) │
│ MergeTreeThread × 4 0 → 1 │ # MergeTreeThread的輸入流0個,輸出流1個
└───────────────────────────────┘
帶過濾條件和LIMIT的SELECT
explain pipeline header=1 select `案例號`, eventId from cx1 where eventId % 10 > 3 group by `案例號`, eventId limit 100
┌─explain─────────────────────────────────────────────────────────────┐
│ (Expression) │
│ ExpressionTransform │
│ Header: 案例號 String: 案例號 String String(size = 0) │
│ eventId Int64: eventId Int64 Int64(size = 0) │
│ (Limit) │
│ Limit │
│ Header: 案例號 String: 案例號 String String(size = 0) │
│ eventId Int64: eventId Int64 Int64(size = 0) │
│ (Aggregating) │
│ Resize 4 → 1 # 代表輸入數據流是4個,合並后輸出1個 │
│ Header: 案例號 String: 案例號 String String(size = 0) │
│ eventId Int64: eventId Int64 Int64(size = 0) │
│ AggregatingTransform × 4 │
│ Header: 案例號 String: 案例號 String String(size = 0) │
│ eventId Int64: eventId Int64 Int64(size = 0) │
│ StrictResize 4 → 4 │
│ Header × 4 : eventId Int64: eventId Int64 Int64(size = 0) │
│ 案例號 String: 案例號 String String(size = 0) │
│ (Expression) │
│ ExpressionTransform × 4 │
│ Header: eventId Int64: eventId Int64 Int64(size = 0) │
│ 案例號 String: 案例號 String String(size = 0) │
│ (SettingQuotaAndLimits) │
│ (ReadFromMergeTree) │
│ MergeTreeThread × 4 0 → 1 │
│ Header: eventId Int64: eventId Int64 Int64(size = 0) │
│ 案例號 String: 案例號 String String(size = 0) │
└─────────────────────────────────────────────────────────────────────┘
帶過濾條件、GROUP BY和LIMIT的SELECT
explain pipeline header=1 select `案例號`, eventId, avg(`金額`) from cx1 where eventId % 10 > 3 group by `案例號`, eventId limit 100
┌─explain──────────────────────────────────────────────────────────────┐
│ (Expression) │
│ ExpressionTransform │
│ Header: 案例號 String: 案例號 String String(size = 0) │
│ eventId Int64: eventId Int64 Int64(size = 0) │
│ avg(金額) Float64: avg(金額) Float64 Float64(size = 0) │
│ (Limit) │
│ Limit │
│ Header: 案例號 String: 案例號 String String(size = 0) │
│ eventId Int64: eventId Int64 Int64(size = 0) │
│ avg(金額) Float64: avg(金額) Float64 Float64(size = 0) │
│ (Aggregating) │
│ Resize 4 → 1 │
│ Header: 案例號 String: 案例號 String String(size = 0) │
│ eventId Int64: eventId Int64 Int64(size = 0) │
│ avg(金額) Float64: avg(金額) Float64 Float64(size = 0) │
│ AggregatingTransform × 4 │
│ Header: 案例號 String: 案例號 String String(size = 0) │
│ eventId Int64: eventId Int64 Int64(size = 0) │
│ avg(金額) Float64: avg(金額) Float64 Float64(size = 0) │
│ StrictResize 4 → 4 │
│ Header × 4 : eventId Int64: eventId Int64 Int64(size = 0) │
│ 案例號 String: 案例號 String String(size = 0) │
│ 金額 UInt8: 金額 UInt8 UInt8(size = 0) │
│ (Expression) │
│ ExpressionTransform × 4 │
│ Header: eventId Int64: eventId Int64 Int64(size = 0) │
│ 案例號 String: 案例號 String String(size = 0) │
│ 金額 UInt8: 金額 UInt8 UInt8(size = 0) │
│ (SettingQuotaAndLimits) │
│ (ReadFromMergeTree) │
│ MergeTreeThread × 4 0 → 1 │
│ Header: eventId Int64: eventId Int64 Int64(size = 0) │
│ 案例號 String: 案例號 String String(size = 0) │
│ 金額 UInt8: 金額 UInt8 UInt8(size = 0) │
└──────────────────────────────────────────────────────────────────────┘
構建Query Pipeline時的Dry Run
在構建executeActionForHeader() 函數獲取表頭header,但是並不產生數據,它會調用dryrun模式,並不產生數據。
執行Query Pipeline
執行Query Pipeline的類是PullingPipelineExecutor
, PullingAsyncPipelineExecutor
, PushPipelineExecutor
, PushAsyncPipelineExecutor
。非Async的是單線程版本,帶Async的是多線程並行版本。PullingAsyncPipelineExecutor
雖然名字里有Async字眼,但實際上是等所有worker線程完成之后才返回,因此並不是我眼中的異步。
Query Pipeline的基本單位是Processor,實際執行Processor的類是PipelineExecutor
,該類被以上所有executor所調用。類QueryPipeline是Query Pipeline的實現,其中用於執行的信息如下代碼所示:
class QueryPipeline
{
...
...
private:
PipelineResourcesHolder resources;
Processors processors; // 所有要執行的processors
InputPort * input = nullptr; // 輸入端口
OutputPort * output = nullptr; // 輸出端口
OutputPort * totals = nullptr;
OutputPort * extremes = nullptr;
QueryStatus * process_list_element = nullptr; // 名字很奇怪,是表示查詢運行狀態
IOutputFormat * output_format = nullptr; // 最終輸出
size_t num_threads = 0; // 線程數
}
IProcessor
的實現類是可以直接執行的task。
QueryPipeline::complete()
里設定完成后的最終輸出,IOutputFormat
也是IProcessor
的派生類。
void QueryPipeline::complete(std::shared_ptr<IOutputFormat> format)
{
}
Chunk
/**
* Chunk is a list of columns with the same length.
* Chunk stores the number of rows in a separate field and supports invariant of equal column length.
*
* Chunk has move-only semantic. It's more lightweight than block cause doesn't store names, types and index_by_name.
*
* Chunk can have empty set of columns but non-zero number of rows. It helps when only the number of rows is needed.
* Chunk can have columns with zero number of rows. It may happen, for example, if all rows were filtered.
* Chunk is empty only if it has zero rows and empty list of columns.
*
* Any ChunkInfo may be attached to chunk.
* It may be useful if additional info per chunk is needed. For example, bucket number for aggregated data.
**/
Block
/** Container for set of columns for bunch of rows in memory.
* This is unit of data processing.
* Also contains metadata - data types of columns and their names
* (either original names from a table, or generated names during temporary calculations).
* Allows to insert, remove columns in arbitrary position, to change order of columns.
*/
Processors
實際執行query pipeline的組件是龐大而豐富的processors,它們是底層執行的基礎構件。
^IProcessor$
└── IProcessor
├── AggregatingInOrderTransform
├── AggregatingTransform
├── ConcatProcessor
├── ConvertingAggregatedToChunksTransform
├── CopyTransform
├── CopyingDataToViewsTransform
├── DelayedPortsProcessor
├── DelayedSource
├── FillingRightJoinSideTransform
├── FinalizingViewsTransform
├── ForkProcessor
├── GroupingAggregatedTransform
├── IInflatingTransform
├── IntersectOrExceptTransform
├── JoiningTransform
├── LimitTransform
├── OffsetTransform
├── ResizeProcessor
├── SortingAggregatedTransform
├── StrictResizeProcessor
├── WindowTransform
├── IAccumulatingTransform
│ ├── BufferingToFileTransform
│ ├── CreatingSetsTransform
│ ├── CubeTransform
│ ├── MergingAggregatedTransform
│ ├── QueueBuffer
│ ├── RollupTransform
│ ├── TTLCalcTransform
│ └── TTLTransform
├── ISimpleTransform
│ ├── AddingDefaultsTransform
│ ├── AddingSelectorTransform
│ ├── ArrayJoinTransform
│ ├── CheckSortedTransform
│ ├── DistinctSortedTransform
│ ├── DistinctTransform
│ ├── ExpressionTransform
│ ├── ExtremesTransform
│ ├── FillingTransform
│ ├── FilterTransform
│ ├── FinalizeAggregatedTransform
│ ├── LimitByTransform
│ ├── LimitsCheckingTransform
│ ├── MaterializingTransform
│ ├── MergingAggregatedBucketTransform
│ ├── PartialSortingTransform
│ ├── ReplacingWindowColumnTransform
│ ├── ReverseTransform
│ ├── SendingChunkHeaderTransform
│ ├── TotalsHavingTransform
│ ├── TransformWithAdditionalColumns
│ └── WatermarkTransform
├── ISink
│ ├── EmptySink
│ ├── ExternalTableDataSink
│ ├── NullSink
│ └── ODBCSink
├── SortingTransform
│ ├── FinishSortingTransform
│ └── MergeSortingTransform
├── IMergingTransformBase
│ └── IMergingTransform
│ ├── AggregatingSortedTransform
│ ├── CollapsingSortedTransform
│ ├── ColumnGathererTransform
│ ├── FinishAggregatingInOrderTransform
│ ├── GraphiteRollupSortedTransform
│ ├── MergingSortedTransform
│ ├── ReplacingSortedTransform
│ ├── SummingSortedTransform
│ └── VersionedCollapsingTransform
├── ExceptionKeepingTransform
│ ├── CheckConstraintsTransform
│ ├── ConvertingTransform
│ ├── CountingTransform
│ ├── ExecutingInnerQueryFromViewTransform
│ ├── SquashingChunksTransform
│ └── SinkToStorage
│ ├── BufferSink
│ ├── DistributedSink
│ ├── EmbeddedRocksDBSink
│ ├── HDFSSink
│ ├── KafkaSink
│ ├── LiveViewSink
│ ├── LogSink
│ ├── MemorySink
│ ├── MergeTreeSink
│ ├── NullSinkToStorage
│ ├── PostgreSQLSink
│ ├── PushingToLiveViewSink
│ ├── PushingToWindowViewSink
│ ├── RabbitMQSink
│ ├── RemoteSink
│ ├── ReplicatedMergeTreeSink
│ ├── SQLiteSink
│ ├── SetOrJoinSink
│ ├── StorageFileSink
│ ├── StorageMySQLSink
│ ├── StorageS3Sink
│ ├── StorageURLSink
│ ├── StripeLogSink
│ └── PartitionedSink
│ ├── PartitionedHDFSSink
│ ├── PartitionedStorageFileSink
│ ├── PartitionedStorageS3Sink
│ └── PartitionedStorageURLSink
├── IOutputFormat
│ ├── ArrowBlockOutputFormat
│ ├── LazyOutputFormat
│ ├── MySQLOutputFormat
│ ├── NativeOutputFormat
│ ├── NullOutputFormat
│ ├── ODBCDriver2BlockOutputFormat
│ ├── ORCBlockOutputFormat
│ ├── ParallelFormattingOutputFormat
│ ├── ParquetBlockOutputFormat
│ ├── PostgreSQLOutputFormat
│ ├── PullingOutputFormat
│ ├── TemplateBlockOutputFormat
│ ├── PrettyBlockOutputFormat
│ │ ├── PrettyCompactBlockOutputFormat
│ │ └── PrettySpaceBlockOutputFormat
│ └── IRowOutputFormat
│ ├── AvroRowOutputFormat
│ ├── BinaryRowOutputFormat
│ ├── CSVRowOutputFormat
│ ├── CapnProtoRowOutputFormat
│ ├── CustomSeparatedRowOutputFormat
│ ├── JSONCompactEachRowRowOutputFormat
│ ├── MarkdownRowOutputFormat
│ ├── MsgPackRowOutputFormat
│ ├── ProtobufRowOutputFormat
│ ├── RawBLOBRowOutputFormat
│ ├── ValuesRowOutputFormat
│ ├── VerticalRowOutputFormat
│ ├── XMLRowOutputFormat
│ ├── JSONEachRowRowOutputFormat
│ │ └── JSONEachRowWithProgressRowOutputFormat
│ ├── JSONRowOutputFormat
│ │ └── JSONCompactRowOutputFormat
│ └── TabSeparatedRowOutputFormat
│ └── TSKVRowOutputFormat
└── ISource
├── ConvertingAggregatedToChunksSource
├── MergeSorterSource
├── NullSource
├── ODBCSource
├── PushingAsyncSource
├── PushingSource
├── RemoteExtremesSource
├── RemoteTotalsSource
├── SourceFromNativeStream
├── TemporaryFileLazySource
├── WaitForAsyncInsertSource
├── IInputFormat
│ ├── ArrowBlockInputFormat
│ ├── NativeInputFormat
│ ├── ORCBlockInputFormat
│ ├── ParallelParsingInputFormat
│ ├── ParquetBlockInputFormat
│ ├── ValuesBlockInputFormat
│ └── IRowInputFormat
│ ├── AvroConfluentRowInputFormat
│ ├── AvroRowInputFormat
│ ├── CapnProtoRowInputFormat
│ ├── JSONAsStringRowInputFormat
│ ├── JSONEachRowRowInputFormat
│ ├── LineAsStringRowInputFormat
│ ├── MsgPackRowInputFormat
│ ├── ProtobufRowInputFormat
│ ├── RawBLOBRowInputFormat
│ ├── RegexpRowInputFormat
│ ├── TSKVRowInputFormat
│ └── RowInputFormatWithDiagnosticInfo
│ ├── TemplateRowInputFormat
│ └── RowInputFormatWithNamesAndTypes
│ ├── BinaryRowInputFormat
│ ├── CSVRowInputFormat
│ ├── CustomSeparatedRowInputFormat
│ ├── JSONCompactEachRowRowInputFormat
│ └── TabSeparatedRowInputFormat
└── ISourceWithProgress
└── SourceWithProgress
├── BlocksListSource
├── BlocksSource
├── BufferSource
├── CassandraSource
├── ColumnsSource
├── DDLQueryStatusSource
├── DataSkippingIndicesSource
├── DictionarySource
├── DirectoryMonitorSource
├── EmbeddedRocksDBSource
├── FileLogSource
├── GenerateSource
├── HDFSSource
├── JoinSource
├── KafkaSource
├── LiveViewEventsSource
├── LiveViewSource
├── LogSource
├── MemorySource
├── MergeTreeSequentialSource
├── MongoDBSource
├── NumbersMultiThreadedSource
├── NumbersSource
├── RabbitMQSource
├── RedisSource
├── RemoteSource
├── SQLiteSource
├── ShellCommandSource
├── SourceFromSingleChunk
├── StorageFileSource
├── StorageInputSource
├── StorageS3Source
├── StorageURLSource
├── StripeLogSource
├── SyncKillQuerySource
├── TablesBlockSource
├── WindowViewSource
├── ZerosSource
├── MySQLSource
│ └── MySQLWithFailoverSource
├── PostgreSQLSource
│ └── PostgreSQLTransactionSource
└── MergeTreeBaseSelectProcessor
├── MergeTreeThreadSelectProcessor
└── MergeTreeSelectProcessor
├── MergeTreeInOrderSelectProcessor
└── MergeTreeReverseSelectProcessor
最重要的是這幾個Class:
^IProcessor$
└── IProcessor
├── IAccumulatingTransform
├── IMergingTransformBase
├── IOutputFormat
├── ISimpleTransform
├── ISink
├── ISource
├── JoiningTransform
├── LimitTransform
├── OffsetTransform
├── ResizeProcessor
├── SortingAggregatedTransform
├── SortingTransform
└── WindowTransform
直接調用SQL查詢
解釋器里面可以直接調用SQL查詢,示例代碼如下:
BlockIO InterpreterShowProcesslistQuery::execute()
{
return executeQuery("SELECT * FROM system.processes", getContext(), true);
}