Clickhouse Processor实现机制


/** Processor is an element (low level building block) of a query execution pipeline.
* It has zero or more input ports and zero or more output ports.
*Processor是查询处理管道的一个元素。它有零个或多个输入端口和零个或多个输出端口。
* Blocks of data are transferred over ports.
* Each port has fixed structure: names and types of columns and values of constants.
*Block通过端口传输,每个端口都有固定的结构,列的名称和类型以及常量值。
* Processors may pull data from input ports, do some processing and push data to output ports.
* Processor may indicate that it requires input data to proceed and indicate that it needs data from some ports.
*Processor从输入端口提取数据,进行一些处理,并将输入推送到输出端口。

Process可能指示它需要输入数据才能继续,并指示它需要来自某些端口的数据。
* Synchronous work must only use CPU - don't do any sleep, IO wait, network wait.
*同步工作必须只使用CPU-不做任何Sleep,IO等待和网络等待
* Processor may want to do work asynchronously (example: fetch data from remote server)
* - in this case it will initiate background job and allow to subscribe to it.
*Processor可能想异步工作(比如:从远端服务器获取数据),在这种情况下,它将启动后台作业并允许订阅它。
* Processor may throw an exception to indicate some runtime error.
*运行错误时,Processor可能会抛出异常。
* Different ports may have different structure. For example, ports may correspond to different resultsets
* or semantically different parts of result.
*不同的端口可能有不同的数据结构。例如,端口可能对应结果的不同结果集或语义上的不同部分。
* Processor may modify its ports (create another processors and connect to them) on the fly.
* Example: first execute the subquery; on basis of subquery result
* determine how to execute the rest of query and build the corresponding pipeline.
*Processor可以修改它的端口(创建另外一个Processor并连接到它)。比如:首先执行子查询,根据子查询的结果来决定如何执行剩余的查询,并构建相应的管道。
* Processor may simply wait for another processor to execute without transferring any data from it.
* For this purpose it should connect its input port to another processor, and indicate need of data.
*Processor可以简单的等待另外一个Processor执行而不需要从它传输任何数据。为此,它应该将其输入端口连接到另一个处理器,并指示需要的数据。
* Examples:
*例如
* Source. Has no input ports and single output port. Generates data itself and pushes it to its output port.
*来源没有输入端口,只要一个输出接口。自动生成数据并将数据推送到输出端口。
* Sink. Has single input port and no output ports. Consumes data that was passed to its input port.
*消费者,只有一个输入端口,没有输出端口,消费接收到的数据。
* Empty source. Immediately says that data on its output port is finished.
空来源,表示输出端口的数据已经完成。
* Null sink. Consumes data and does nothing.
*空消费者,消费数据,并且不做任何处理。
* Simple transformation. Has single input and single output port. Pulls data, transforms it and pushes to output port.

简单转换,具有单输入端口和单输出端口。拉取数据,转换并推送到输出端口。
* Example: expression calculator.比如:表达式计算器
* TODO Better to make each function a separate processor. It's better for pipeline analysis. Also keep in mind 'sleep' and 'rand' functions.
*最好让每个函数作为一个单独的Processor,更适用于管道分析,还要记住睡眠和随机功能。
* Squashing or filtering transformation. Pulls data, possibly accumulates it, and sometimes pushes it to output port.
* Examples: DISTINCT, WHERE, squashing of blocks for INSERT SELECT.
*压缩或过滤转换,拉取数据,可能会累积数据,有时会推送到输出端口。比如:DISTINCT,WHERE,INSERT SELECT的块积压
* Accumulating transformation. Pulls and accumulates all data from input until it it exhausted, then pushes data to output port.
* Examples: ORDER BY, GROUP BY.
*累积转换,从输入中提取并累积所有数据直到,直到其耗尽,然后将数据推送到输出端口。比如:ORDER BY,GROUP BY
* Limiting transformation. Pulls data from input and passes to output.
* When there was enough data, says that it doesn't need data on its input and that data on its output port is finished.
*限制转换。从输入提取数据并推送到输出。当有足够的数据时,输入不再需要数据,输出端口上的数据就完成了。
* Resize. Has arbitary number of inputs and arbitary number of outputs.调整大小,具有任意数量的输入和任意数量的输出。
* Pulls data from whatever ready input and pushes it to randomly choosed free output.从任何准备好的输入中提取数据,并推送到任意输出。
* Examples:比如:
* Union - merge data from number of inputs to one output in arbitary order.Union-以任意顺序将多个输入的数据合并到一个输出。
* Split - read data from one input and pass it to arbitary output.
*Split-从一个输入读取数据并将其传递到任意输出。
* Concat. Has many inputs and only one output. Pulls all data from first input until it is exhausted,
* then all data from second input, etc. and pushes all data to output.
*Concat-有很多输入只有一个输出。从第一个输入提取数据直到它耗尽,然后第二个,依次类推,将所有数据推送到输出。
* Ordered merge. Has many inputs but only one output. Pulls data from selected input in specific order, merges and pushes it to output.
*有序合并。有很多个输入但只有一个输出。按特定的顺序从指定的输入中提取数据,合并并将其推送到输出。
* Fork. Has one input and many outputs. Pulls data from input and copies it to all outputs.
* Used to process multiple queries with common source of data.
*Fork。有一个输入和多个输出。从输入中提取数据,并将其复制到所有输出。
* Select. Has one or multiple inputs and one output.Select。有一个或多个输入,一个输出。
* Read blocks from inputs and check that blocks on inputs are "parallel": correspond to each other in number of rows.
* Construct a new block by selecting some subset (or all) of columns from inputs.
* Example: collect columns - function arguments before function execution.
*从输入中读取数据块并检查输入的数据块是否并行:在行数上相互对应。通过从输入中选择一些列子集或全部来构建新的数据块。
*比如:在函数执行之前收集列--函数参数。
* TODO Processors may carry algebraic properties about transformations they do.
* For example, that processor doesn't change number of rows; doesn't change order of rows, doesn't change the set of rows, etc.
*Processor可能具有它们所做的变换的代数性质。比如,Processor不会更改行数,行的顺序和行的集合等等。
* TODO Ports may carry algebraic properties about streams of data.
* For example, that data comes ordered by specific key; or grouped by specific key; or have unique values of specific key.
* And also simple properties, including lower and upper bound on number of rows.
*端口可以携带关于数据流的代数属性。比如,该数据按特定键排序或按特定键分组,或具有特定键的唯一值。
* TODO Processor should have declarative representation, that is able to be serialized and parsed.
* Example: read_from_merge_tree(database, table, Columns(a, b, c), Piece(0, 10), Parts(Part('name', MarkRanges(MarkRange(0, 100), ...)), ...))
* It's reasonable to have an intermediate language for declaration of pipelines.
*Processor应该具有声明性表示,它能够被序列化和解析。
* TODO Processor with all its parameters should represent "pure" function on streams of data from its input ports.
* It's in question, what kind of "pure" function do we mean.
* For example, data streams are considered equal up to order unless ordering properties are stated explicitly.
* Another example: we should support the notion of "arbitary N-th of M substream" of full stream of data.
*Processor及其所有参数都应该代表来自其输入端口的数据流上的纯函数。问题是,什么样的函数是纯函数。

比如,除非显示地声明了排序属性,否则数据流默认按升序排列。

 

enum class Status
{
/// Processor needs some data at its inputs to proceed.Processor在输入端需要一些数据才能继续
/// You need to run another processor to generate required input and then call 'prepare' again.您需要运行另外一个Processor来生成所需的输入,然后再次调用Prepare。
NeedData,

/// Processor cannot proceed because output port is full or not isNeeded().Processor不能继续,因为输出端口已满或者不再需要。
/// You need to transfer data from output port to the input port of another processor and then call 'prepare' again.您需要将数据从输出端口传输到另一个Processor的输入端口,然后再次调用Prepare。
PortFull,

/// All work is done (all data is processed or all output are closed), nothing more to do.所有的工作都完成了(所有的数据都被处理了,所有的输出都以关闭)什么都不做了
Finished,

/// No one needs data on output ports.没有Processor需要输出端口上的数据
/// Unneeded,

/// You may call 'work' method and processor will do some work synchronously.您可以调用work方法,Processor将同步执行某些工作。
Ready,

/// You may call 'schedule' method and processor will initiate some background work.您可以调用schedule方法,Processor将启动一些后台工作。
Async,

/// Processor is doing some work in background.Processor正在后台做一些工作
/// You may wait for next event or do something else and then you should call 'prepare' again.您可以等待下一个活动或做其他的事情,然后您应该再次调用准备。
Wait,

/// Processor wants to add other processors to pipeline.Processor希望将其他Processor添加到管道中
/// New processors must be obtained by expandPipeline() call.必须通过 expandPipeline()调用获取新的Processor。
ExpandPipeline,
};

 

/** Method 'prepare' is responsible for all cheap ("instantaneous": O(1) of data volume, no wait) calculations.
*prepare方法负责所有低成本的计算(比如O(1)数据量,不需要等待的)
* It may access input and output ports,它可以访问输入和输出端口
* indicate the need for work by another processor by returning NeedData or PortFull,通过返回NeedData 或PortFull来指示另一个Processor的需要
* or indicate the absence of work by returning Finished or Unneeded,或者通过返回Finished 或Unneeded来表示没有任务
* it may pull data from input ports and push data to output ports.它可能从输入端口提取数据并将数据推送到输出端口。
*
* The method is not thread-safe and must be called from a single thread in one moment of time,
* even for different connected processors.
*该方法不是线程安全的,单个线程调用必须瞬时返回,即使对于不同连接的Processor也是如此。
* Instead of all long work (CPU calculations or waiting) it should just prepare all required data and return Ready or Async.
*它应该只准备所需要的数据并返回Ready或Async,而不是进行长时间的工作(CPU计算或等待)
* Thread safety and parallel execution:线程安全和并行执行
* - no methods (prepare, work, schedule) of single object can be executed in parallel;单个对象的方法不能并行执行(prepare,work,schedule)
* - method 'work' can be executed in parallel for different objects, even for connected processors;work方法可以针对不同的对象并行执行,即使对于连接的Processor也是如此
* - method 'prepare' cannot be executed in parallel even for different objects,prepare方法不能并行执行,即使对于不同的对象也是如此
* if they are connected (including indirectly) to each other by their ports;如果它们通过端口互相连接
*/
virtual Status prepare()
{
throw Exception("Method 'prepare' is not implemented for " + getName() + " processor", ErrorCodes::NOT_IMPLEMENTED);
}

 

Clickhouse中Chunk的概念

/**
* Chunk is a list of columns with the same length.Chunk是具有相同长度的列的集合
* Chunk stores the number of rows in a separate field and supports invariant of equal column length.
*Chunk将行数存储在一个单独的字段中,并支持等列长度的不变量
* Chunk has move-only semantic. It's more lightweight than block cause doesn't store names, types and index_by_name.
*Chunk只有移动语义。它比block更加轻量级,因为它不存储名称,类型和索引
* Chunk can have empty set of columns but non-zero number of rows. It helps when only the number of rows is needed.
* Chunk can have columns with zero number of rows. It may happen, for example, if all rows were filtered.
* Chunk is empty only if it has zero rows and empty list of columns.
*Chunk可以有空的列集合,但行数不是零。当只需要行数时,它会有所帮助。

Chunk可以有行数为0的列。例如,所有行都被筛选就会发生这种情况

Chunk只有在行数为0,且列为空时,才表示空
* Any ChunkInfo may be attached to chunk.任何ChunkInfo都可以附加到trunk
* It may be useful if additional info per chunk is needed. For example, bucket number for aggregated data.
**/如果需要每个块的附加信息,它可能会很有用。例如聚合数据的存储桶编号。


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM