/** Processor is an element (low level building block) of a query execution pipeline.
* It has zero or more input ports and zero or more output ports.
*Processor是查詢處理管道的一個元素。它有零個或多個輸入端口和零個或多個輸出端口。
* Blocks of data are transferred over ports.
* Each port has fixed structure: names and types of columns and values of constants.
*Block通過端口傳輸,每個端口都有固定的結構,列的名稱和類型以及常量值。
* Processors may pull data from input ports, do some processing and push data to output ports.
* Processor may indicate that it requires input data to proceed and indicate that it needs data from some ports.
*Processor從輸入端口提取數據,進行一些處理,並將輸入推送到輸出端口。
Process可能指示它需要輸入數據才能繼續,並指示它需要來自某些端口的數據。
* Synchronous work must only use CPU - don't do any sleep, IO wait, network wait.
*同步工作必須只使用CPU-不做任何Sleep,IO等待和網絡等待
* Processor may want to do work asynchronously (example: fetch data from remote server)
* - in this case it will initiate background job and allow to subscribe to it.
*Processor可能想異步工作(比如:從遠端服務器獲取數據),在這種情況下,它將啟動后台作業並允許訂閱它。
* Processor may throw an exception to indicate some runtime error.
*運行錯誤時,Processor可能會拋出異常。
* Different ports may have different structure. For example, ports may correspond to different resultsets
* or semantically different parts of result.
*不同的端口可能有不同的數據結構。例如,端口可能對應結果的不同結果集或語義上的不同部分。
* Processor may modify its ports (create another processors and connect to them) on the fly.
* Example: first execute the subquery; on basis of subquery result
* determine how to execute the rest of query and build the corresponding pipeline.
*Processor可以修改它的端口(創建另外一個Processor並連接到它)。比如:首先執行子查詢,根據子查詢的結果來決定如何執行剩余的查詢,並構建相應的管道。
* Processor may simply wait for another processor to execute without transferring any data from it.
* For this purpose it should connect its input port to another processor, and indicate need of data.
*Processor可以簡單的等待另外一個Processor執行而不需要從它傳輸任何數據。為此,它應該將其輸入端口連接到另一個處理器,並指示需要的數據。
* Examples:
*例如
* Source. Has no input ports and single output port. Generates data itself and pushes it to its output port.
*來源沒有輸入端口,只要一個輸出接口。自動生成數據並將數據推送到輸出端口。
* Sink. Has single input port and no output ports. Consumes data that was passed to its input port.
*消費者,只有一個輸入端口,沒有輸出端口,消費接收到的數據。
* Empty source. Immediately says that data on its output port is finished.
空來源,表示輸出端口的數據已經完成。
* Null sink. Consumes data and does nothing.
*空消費者,消費數據,並且不做任何處理。
* Simple transformation. Has single input and single output port. Pulls data, transforms it and pushes to output port.
簡單轉換,具有單輸入端口和單輸出端口。拉取數據,轉換並推送到輸出端口。
* Example: expression calculator.比如:表達式計算器
* TODO Better to make each function a separate processor. It's better for pipeline analysis. Also keep in mind 'sleep' and 'rand' functions.
*最好讓每個函數作為一個單獨的Processor,更適用於管道分析,還要記住睡眠和隨機功能。
* Squashing or filtering transformation. Pulls data, possibly accumulates it, and sometimes pushes it to output port.
* Examples: DISTINCT, WHERE, squashing of blocks for INSERT SELECT.
*壓縮或過濾轉換,拉取數據,可能會累積數據,有時會推送到輸出端口。比如:DISTINCT,WHERE,INSERT SELECT的塊積壓
* Accumulating transformation. Pulls and accumulates all data from input until it it exhausted, then pushes data to output port.
* Examples: ORDER BY, GROUP BY.
*累積轉換,從輸入中提取並累積所有數據直到,直到其耗盡,然后將數據推送到輸出端口。比如:ORDER BY,GROUP BY
* Limiting transformation. Pulls data from input and passes to output.
* When there was enough data, says that it doesn't need data on its input and that data on its output port is finished.
*限制轉換。從輸入提取數據並推送到輸出。當有足夠的數據時,輸入不再需要數據,輸出端口上的數據就完成了。
* Resize. Has arbitary number of inputs and arbitary number of outputs.調整大小,具有任意數量的輸入和任意數量的輸出。
* Pulls data from whatever ready input and pushes it to randomly choosed free output.從任何准備好的輸入中提取數據,並推送到任意輸出。
* Examples:比如:
* Union - merge data from number of inputs to one output in arbitary order.Union-以任意順序將多個輸入的數據合並到一個輸出。
* Split - read data from one input and pass it to arbitary output.
*Split-從一個輸入讀取數據並將其傳遞到任意輸出。
* Concat. Has many inputs and only one output. Pulls all data from first input until it is exhausted,
* then all data from second input, etc. and pushes all data to output.
*Concat-有很多輸入只有一個輸出。從第一個輸入提取數據直到它耗盡,然后第二個,依次類推,將所有數據推送到輸出。
* Ordered merge. Has many inputs but only one output. Pulls data from selected input in specific order, merges and pushes it to output.
*有序合並。有很多個輸入但只有一個輸出。按特定的順序從指定的輸入中提取數據,合並並將其推送到輸出。
* Fork. Has one input and many outputs. Pulls data from input and copies it to all outputs.
* Used to process multiple queries with common source of data.
*Fork。有一個輸入和多個輸出。從輸入中提取數據,並將其復制到所有輸出。
* Select. Has one or multiple inputs and one output.Select。有一個或多個輸入,一個輸出。
* Read blocks from inputs and check that blocks on inputs are "parallel": correspond to each other in number of rows.
* Construct a new block by selecting some subset (or all) of columns from inputs.
* Example: collect columns - function arguments before function execution.
*從輸入中讀取數據塊並檢查輸入的數據塊是否並行:在行數上相互對應。通過從輸入中選擇一些列子集或全部來構建新的數據塊。
*比如:在函數執行之前收集列--函數參數。
* TODO Processors may carry algebraic properties about transformations they do.
* For example, that processor doesn't change number of rows; doesn't change order of rows, doesn't change the set of rows, etc.
*Processor可能具有它們所做的變換的代數性質。比如,Processor不會更改行數,行的順序和行的集合等等。
* TODO Ports may carry algebraic properties about streams of data.
* For example, that data comes ordered by specific key; or grouped by specific key; or have unique values of specific key.
* And also simple properties, including lower and upper bound on number of rows.
*端口可以攜帶關於數據流的代數屬性。比如,該數據按特定鍵排序或按特定鍵分組,或具有特定鍵的唯一值。
* TODO Processor should have declarative representation, that is able to be serialized and parsed.
* Example: read_from_merge_tree(database, table, Columns(a, b, c), Piece(0, 10), Parts(Part('name', MarkRanges(MarkRange(0, 100), ...)), ...))
* It's reasonable to have an intermediate language for declaration of pipelines.
*Processor應該具有聲明性表示,它能夠被序列化和解析。
* TODO Processor with all its parameters should represent "pure" function on streams of data from its input ports.
* It's in question, what kind of "pure" function do we mean.
* For example, data streams are considered equal up to order unless ordering properties are stated explicitly.
* Another example: we should support the notion of "arbitary N-th of M substream" of full stream of data.
*Processor及其所有參數都應該代表來自其輸入端口的數據流上的純函數。問題是,什么樣的函數是純函數。
比如,除非顯示地聲明了排序屬性,否則數據流默認按升序排列。
enum class Status
{
/// Processor needs some data at its inputs to proceed.Processor在輸入端需要一些數據才能繼續
/// You need to run another processor to generate required input and then call 'prepare' again.您需要運行另外一個Processor來生成所需的輸入,然后再次調用Prepare。
NeedData,
/// Processor cannot proceed because output port is full or not isNeeded().Processor不能繼續,因為輸出端口已滿或者不再需要。
/// You need to transfer data from output port to the input port of another processor and then call 'prepare' again.您需要將數據從輸出端口傳輸到另一個Processor的輸入端口,然后再次調用Prepare。
PortFull,
/// All work is done (all data is processed or all output are closed), nothing more to do.所有的工作都完成了(所有的數據都被處理了,所有的輸出都以關閉)什么都不做了
Finished,
/// No one needs data on output ports.沒有Processor需要輸出端口上的數據
/// Unneeded,
/// You may call 'work' method and processor will do some work synchronously.您可以調用work方法,Processor將同步執行某些工作。
Ready,
/// You may call 'schedule' method and processor will initiate some background work.您可以調用schedule方法,Processor將啟動一些后台工作。
Async,
/// Processor is doing some work in background.Processor正在后台做一些工作
/// You may wait for next event or do something else and then you should call 'prepare' again.您可以等待下一個活動或做其他的事情,然后您應該再次調用准備。
Wait,
/// Processor wants to add other processors to pipeline.Processor希望將其他Processor添加到管道中
/// New processors must be obtained by expandPipeline() call.必須通過 expandPipeline()調用獲取新的Processor。
ExpandPipeline,
};
/** Method 'prepare' is responsible for all cheap ("instantaneous": O(1) of data volume, no wait) calculations.
*prepare方法負責所有低成本的計算(比如O(1)數據量,不需要等待的)
* It may access input and output ports,它可以訪問輸入和輸出端口
* indicate the need for work by another processor by returning NeedData or PortFull,通過返回NeedData 或PortFull來指示另一個Processor的需要
* or indicate the absence of work by returning Finished or Unneeded,或者通過返回Finished 或Unneeded來表示沒有任務
* it may pull data from input ports and push data to output ports.它可能從輸入端口提取數據並將數據推送到輸出端口。
*
* The method is not thread-safe and must be called from a single thread in one moment of time,
* even for different connected processors.
*該方法不是線程安全的,單個線程調用必須瞬時返回,即使對於不同連接的Processor也是如此。
* Instead of all long work (CPU calculations or waiting) it should just prepare all required data and return Ready or Async.
*它應該只准備所需要的數據並返回Ready或Async,而不是進行長時間的工作(CPU計算或等待)
* Thread safety and parallel execution:線程安全和並行執行
* - no methods (prepare, work, schedule) of single object can be executed in parallel;單個對象的方法不能並行執行(prepare,work,schedule)
* - method 'work' can be executed in parallel for different objects, even for connected processors;work方法可以針對不同的對象並行執行,即使對於連接的Processor也是如此
* - method 'prepare' cannot be executed in parallel even for different objects,prepare方法不能並行執行,即使對於不同的對象也是如此
* if they are connected (including indirectly) to each other by their ports;如果它們通過端口互相連接
*/
virtual Status prepare()
{
throw Exception("Method 'prepare' is not implemented for " + getName() + " processor", ErrorCodes::NOT_IMPLEMENTED);
}
Clickhouse中Chunk的概念
/**
* Chunk is a list of columns with the same length.Chunk是具有相同長度的列的集合
* Chunk stores the number of rows in a separate field and supports invariant of equal column length.
*Chunk將行數存儲在一個單獨的字段中,並支持等列長度的不變量
* Chunk has move-only semantic. It's more lightweight than block cause doesn't store names, types and index_by_name.
*Chunk只有移動語義。它比block更加輕量級,因為它不存儲名稱,類型和索引
* Chunk can have empty set of columns but non-zero number of rows. It helps when only the number of rows is needed.
* Chunk can have columns with zero number of rows. It may happen, for example, if all rows were filtered.
* Chunk is empty only if it has zero rows and empty list of columns.
*Chunk可以有空的列集合,但行數不是零。當只需要行數時,它會有所幫助。
Chunk可以有行數為0的列。例如,所有行都被篩選就會發生這種情況
Chunk只有在行數為0,且列為空時,才表示空
* Any ChunkInfo may be attached to chunk.任何ChunkInfo都可以附加到trunk
* It may be useful if additional info per chunk is needed. For example, bucket number for aggregated data.
**/如果需要每個塊的附加信息,它可能會很有用。例如聚合數據的存儲桶編號。