Processor 簡述
Processor是處理SQL中各步驟數據的基本單元。數據,從Processor流入,經過Processor處理,處理后從Processor流出。
從功能上主要分為三類,
1.輸入數據,抽象ISource
2.過程處理,Transform
3.結果輸出(一般是常見是寫磁盤),ISink
以select * from table1 語句為例,數據首先從磁盤讀入ISource類型Processor0,流入Transform類型的Processor1,結果輸出給Client,SQL執行結束。
Clickhouse 中的 Processor關系圖
Processor的結構 (靜態展示)

單個Processor的功能模塊
2.數據流出端口 Outport
3.中間處理,Processor中的transform方法從Inport中讀取數據,然后寫入Outport。
例子 ConcatProcessor
這個Processor可多個Inport讀取數據,然后輸出結果到一個Outport
類比現實中水管聯通基本單元 (圖片源自網絡)
鏈接水管設備圖
引入Pipeline
Pipeline 本質
將Processor(小段水管pipe)串聯構成管道。
類比現實管道 (圖片源自網絡)
管道
SQL中的pipeline
通過 SQL語法展示 某個執行SQL的pipeline。
例子,SELECT 查詢 : SELECT id from table_map LIMIT 10, 2。
EXPLAIN PIPELINE
SELECT id
FROM table_map
LIMIT 10, 2
Query id: 078bc729-38f4-4788-8db7-875d00e74487
┌─explain────────────────────────────┐
│ (Expression) │
│ ExpressionTransform │
│ (SettingQuotaAndLimits) │
│ (Limit) │
│ Limit │
│ (ReadFromMergeTree) │
│ Concat 2 → 1 │
│ MergeTreeInOrder × 2 0 → 1 │
└────────────────────────────────────┘
↑ Progress: 8.00 rows, 230.00 B (1.43 thousand rows/s., 41.21 KB/
8 rows in set. Elapsed: 0.006 sec.
Pipeline 輸出解讀
- 整體語義
- Select 語句 的 pipeline,以遞進關系表示依賴,上層依賴下層的輸入。
- 其中pipeline ()里面的內容就是QueryPlan的節點名。 請見explain pipeline 中藍色括號的內容與 下面query plan的內容。
- 數字語義
- 每個pipeline單元 后面都會標識並行度(默認是1沒有標識), 'x' 后面的數據代表並行度(Processor的數量), n1 -> n2, '->',代表數據流向, n1代表Inport數量, n2代表OutPort數量。MergeTreeInOrder × 2 0 → 1,代表有兩個並行度,0個Inport, 1個Outport。
查看SQL的執行計划 QueryPlan,我們發現表達的語義是一致的。
queryPlan.
MacBook.local :) explain select id from table_map limit 200 offset 0
Query id: 7666532a-6a3e-43c5-8160-a4539edc984d
┌─explain───────────────────────────────────────────────────────────────────┐
│ Expression ((Projection + Before ORDER BY)) │
│ SettingQuotaAndLimits (Set limits and quota after reading from storage) │
│ Limit (preliminary LIMIT (without OFFSET)) │
│ ReadFromMergeTree │
└───────────────────────────────────────────────────────────────────────────┘
↙ Progress: 4.00 rows, 221.00 B (2.73 thousand rows/s., 150.65 KB
4 rows in set. Elapsed: 0.002 sec.
Pipeline 三部曲
- Pipeline的構建
- Pipeline的執行准備
- Pipeline的執行
pipeline的構建 [開始]
局部構建Pipeline
上述SQL的例子: 以MergeTreeInOrder到Concat構建為例。
│ Concat 2 → 1 │
│ MergeTreeInOrder × 2 0 → 1 │
局部構建的Pipeline圖片
圖ReadFromMergeTree& ConcatProcessor 組裝后的pipeline.
構建完整的Pipeline 圖
組裝端口相關代碼(ReadFromMergeTree 部分的Pipeline):
- DFS 根據QueryPlan 構建Pipeline (從沒有children 的node開始構建第一個pipeline)。然后不斷構建直到Pipeline都構建完畢。
QueryPipelinePtr QueryPlan::buildQueryPipeline(..){
QueryPipelinePtr last_pipeline;
std::stack<Frame> stack;
stack.push(Frame{.node = root});
while (!stack.empty())
{
auto & frame = stack.top();
if (last_pipeline)
{
frame.pipelines.emplace_back(std::move(last_pipeline));
last_pipeline = nullptr; //-V1048
}
size_t next_child = frame.pipelines.size();
// 當一個Node中的children都組裝成了Pipeline時,那么就可以當前Node與children組裝好的
// pipeline 構造新的Pipeline.
if (next_child == frame.node->children.size())
{
bool limit_max_threads = frame.pipelines.empty();
last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines), build_pipeline_settings);
if (limit_max_threads && max_threads)
last_pipeline->limitMaxThreads(max_threads);
stack.pop();
}
else
stack.push(Frame{.node = frame.node->children[next_child]});
}
打通processor到對應Step.(暫時不明目的)
即設置
IProcessor 中的query_plan_step為當前的Step. (可能多個Processor對應一個Step).
detachProcessors() 方法:
for (auto & processor : processors)
processor->setQueryPlanStep(step, group);
即設置
IProcessor 中的query_plan_step為當前的Step. (可能多個Processor對應一個Step).
Pipeline構建 [結束]
所有的processors都存放在了last_pipeline,返回給調用者。
Pipeline重要數據結構分析
Pipeline中繼承IProcessor的類的端口使用IProcessor中的成員變量。
ISource的 output端口中的Input_port就是 IProcessor的outputs[0]的input_port.
Pipeline中 Inport與OutPort端口間公用State。
即上游Processor在Outport流出數據(寫入某個地方),就能被下游Processor 從InPort讀取。
端口間數據
驗證 端口間公用State.
查看State的地址.
MergeTreeOrderSelectProcessor 的 OutPort 的State
ConcatProcessor 的InPort 的State
賦值給更高級的抽象 BlockIO
Pipeline的執行准備 [開始] 下一個Stage
構建Executing Graph
Executing Graph Node 和 edge
ExecutingGraph::ExecutingGraph(const Processors & processors)
{
uint64_t num_processors = processors.size();
nodes.reserve(num_processors);
/// Create nodes.
for (uint64_t node = 0; node < num_processors; ++node)
{
IProcessor * proc = processors[node].get();
processors_map[proc] = node;
nodes.emplace_back(std::make_unique<Node>(proc, node));
}
/// Create edges.
for (uint64_t node = 0; node < num_processors; ++node)
addEdges(node);
}
Excuting Graph edge 更新相鄰Port狀態
當前Processor的Port處理數據完畢后,通過edge到對下一個Processor的對應Port進行處理。
if (from_output < outputs.size())
{
was_edge_added = true;
for (auto it = std::next(outputs.begin(), from_output); it != outputs.end(); ++it, ++from_output)
{
const IProcessor * to = &it->getInputPort().getProcessor();
auto input_port_number = to->getInputPortNumber(&it->getInputPort());
Edge edge(0, false, input_port_number, from_output, &nodes[node]->post_updated_output_ports);
// 將edge添加到當前node的direct_edges中. 使用move語義.
auto & added_edge = addEdge(nodes[node]->direct_edges, std::move(edge), from, to);
// Port中的update_info 和 edge的update_info 指向同一個地方.
it->setUpdateInfo(&added_edge.update_info);
}
}
構建Executing Graph 結束
更新Execute Graph Node狀態
創建另外線程執行
創建線程池,可並行執行Execute Graph.
Pull 模式 (控制方向類比圖)
Node中的edge 方向(back vs direct)
- Node A 的direct_edge
// direct_edges 是指的 [ ] 節點A output 到child節點B
// |
// \|/ (direct_edge) 數據流指出去的edge.
// [ ] 節點B
- NodeA 的back_edges
// back_edges 是指 [ ] 節點 A 從節點B 流入數據
// /|\
// | (back_edge) 數據流指向自己的edge
[ ] 節點B
例子: ConcatProcessor 相應的Node
2個back_edges: 有兩個Inport
1個direct_edge: 一個outport
Pull模式初始化
挑選所有沒有OutPort的Node. (當前SQL例子為 lazyOutputFormat)
void PipelineExecutor::addChildlessProcessorsToStack(Stack & stack)
{
UInt64 num_processors = processors.size();
for (UInt64 proc = 0; proc < num_processors; ++proc)
{
// 從root指向child方向的邊, 在Select查詢語句中,lazyOutputFormat是沒有output到其他節點的Node.
if (graph->nodes[proc]->direct_edges.empty())
{
stack.push(proc);
/// do not lock mutex, as this function is executed in single thread
graph->nodes[proc]->status = ExecutingGraph::ExecStatus::Preparing;
}
}
}
初始化所有Node的狀態. 參考 控制方向類比圖
遞歸調用 prepareProcessor(),根據Graph Node中的edge更新所有Node狀態。
如圖所示,如果是backward為true,那么當前端口就是InputPort, 下一個節點Port是OutputPort。共同組成edge (InputPort <---- OutpurtPort).
Private question.
代碼分析
// tryAddProcessorToStackIfUpdated.
// 通過edge.to得到關聯節點,並做初始化
auto & node = *graph->nodes[edge.to];
std::unique_lock lock(node.status_mutex);
ExecutingGraph::ExecStatus status = node.status;
if (status == ExecutingGraph::ExecStatus::Finished)
return true;
// 如果當前邊的方向是backward,那么將edge中相應的port number添加到updated_output_ports中
if (edge.backward)
node.updated_output_ports.push_back(edge.output_port_number);
else
node.updated_input_ports.push_back(edge.input_port_number);
if (status == ExecutingGraph::ExecStatus::Idle)
{
node.status = ExecutingGraph::ExecStatus::Preparing;
// 這里調用prepareProcessor是PipelineExecutor.
return prepareProcessor(edge.to, thread_number, queue, async_queue, std::move(lock));
}
如何從Node中得到 下一個需要處理的edge.
// 當執行完 當前node.processor的prepare方法以后,我們就得到了下一個edge. node.last_processor_status = node.processor->prepare(node.updated_input_ports, node.updated_output_ports);
Port.h中
// InPort.setNeeded(). 如字面意思,開始設置Node的Inport端口信息.而Port和Edge關心很緊密,
在Port中的UpdateInfo中存儲的id就是與當前Port關聯的edge的地址.
見下圖
void ALWAYS_INLINE setNeeded()
{
assumeConnected();
if ((state->setFlags(State::IS_NEEDED, State::IS_NEEDED) & State::IS_NEEDED) == 0)
updateVersion();
}
void inline ALWAYS_INLINE updateVersion()
{
if (likely(update_info))
update_info->update();
}
};
void inline ALWAYS_INLINE update()
{
if (version == prev_version && update_list)
update_list->push_back(id);
++version;
}
update_info 中的 Id 是 edge的地址.
對應Graph Node中的back_edge中的update_info
id 指向擁有這個update_info的edge.
ExecutingGraph::Edge & ExecutingGraph::addEdge(Edges & edges, Edge edge, const IProcessor * from, const IProcessor * to)
{
auto it = processors_map.find(to);
if (it == processors_map.end())
{
String msg = "Processor " + to->getName() + " was found as " + (edge.backward ? "input" : "output")
+ " for processor " + from->getName() + ", but not found in list of processors.";
throw Exception(msg, ErrorCodes::LOGICAL_ERROR);
}
edge.to = it->second;
auto & added_edge = edges.emplace_back(std::move(edge));
added_edge.update_info.id = &added_edge;
return added_edge;
}
Pipeline::prepareProcessor()作用
-
初始化所有相關的Processors/Nodes的狀態
- 獲取當前 node_x 的Inport, 初始化,通過當前 node_x InPort中的update_info結構中的id 得到edge_e 地址.
- 根據edge_e中的to字段(下一個Node的index),在graph中獲得下一個node_y。
- 根據edge_e中的output_number信息對node_y進行初始化。
- 執行node_y中的processor.prepare()方法
- 如果能執行到 input.setNeeded(),那么就得到了下一批edges. 回到a.
- 如果執行MergeTreeSelector時,沒有Input.setNeeded(). 此時,設置node.status為Ready,將此Node加入到Queue,並將此Node狀態置為Executing,表明此Node已經可以被執行。
case IProcessor::Status::Ready:
{
node.status = ExecutingGraph::ExecStatus::Executing;
queue.push(&node);
break;
}
並且這個Node,沒有需要更新的edge,所以就可以返回。
初始化Node結束,MergeTreeSelect對應的Node作為准備好的Node放到queue中,這些Node作為SQL執行開始的Task發送給task_queue給threads執行。
Processor的狀態(會決定Node的狀態)
enum class Status
{
/// Processor needs some data at its inputs to proceed.
/// You need to run another processor to generate required input and then call 'prepare' again.
NeedData,
/// Processor cannot proceed because output port is full or not isNeeded().
/// You need to transfer data from output port to the input port of another processor and then call 'prepare' again.
PortFull,
/// All work is done (all data is processed or all output are closed), nothing more to do.
Finished,
/// No one needs data on output ports.
/// Unneeded,
/// You may call 'work' method and processor will do some work synchronously.
Ready,
/// You may call 'schedule' method and processor will return descriptor.
/// You need to poll this descriptor and call work() afterwards.
Async,
/// Processor wants to add other processors to pipeline.
/// New processors must be obtained by expandPipeline() call.
ExpandPipeline,
};
參與執行的所有Node是不是都會進入Executing狀態?
不是,只有正常執行,且實現了work()方法的Node,會從Preparing狀態->Executing狀態,然后交給 queue,再去執行。
Excute Graph Node更新結束
狀態圖 :
MergeTreeSelector [status: Executing, last_processor_status: Ready]
OtherSelectors [status: Idle, last_processor_status: NeedData]
總結
為什么需要構建ExecuteGraph?
個人理解就是為了完成一個class單獨完成自己的事情。
Execute Graph 本質就是包含了Processor,有狀態,可以被調度執行的Graph。
Pipeline是解決數據通過端口流通的結構,本身並不關心如何被調度執行。
准備Pipeline執行時,有一個Node失敗,會很快返回。
1. MergeTreeSelector的狀態 從Executing -> PortFull.
2.根據寫的DFS規則,更新並添加相關節點. MergeTreeSelector只有direct_edge.
MergeTreeSelector [status: Executing, last_processor_status: Ready]
OtherSelectors [status: Idle, last_processor_status: NeedData]
queue中保留狀態為Executing狀態Node.
// 將Executing Node 放入到 task_queue中相應的thread需要處理的task queue中.
// task_queue: std::vector[[std::queue[Task*]],[std::queue[Task*]]], Task 模板類型,可以是Node.
task_queue.push(queue.front(), next_thread);
2. initializeExecution結束,
executeStepImpl()開始執行
//直到finished或者 yield.
while(!finished && !yield){
/// 准備
/// First, find any processor to execute.
/// Just traverse graph and prepare any processor.
while (!finished && node != nullptr){
...
// 從task_queue某個thread的Task隊列中移出Task(node)
node = task_queue.pop(thread_num);
// 如果是1個並行度,那么就在當前線程中執行.
// 下面代碼是指有多個Node並行執行是,會喚醒threads_queue中的其他線程一起執行。
if (node)
{
if (!task_queue.empty() && !threads_queue.empty()){
auto thread_to_wake = task_queue.getAnyThreadWithTasks()
...
}
}
}
執行
while (node && !yield){
// 當前線程執行Node任務
addJob(node);
...
// 最終調用Processor中的 work()方法處理.
node->job();
/// Try to execute neighbour processor.
{
/// Prepare processor after execution.
/// 找下個可以執行的Processors.
/// 如果有合適狀態的Node(Executing狀態),添加到queue隊列,如果沒有,結束任務.
{
auto lock = std::unique_lock<std::mutex>(node->status_mutex);
if (!prepareProcessor(node->processors_id, thread_num, queue, async_queue, std::move(lock)))
finish();
}
/// 執行下一批可以執行的Processors.
/// 將除queue中的第一個task之外的所有的任務搬運到task_queue,多線程執行這些Task.
/// 其中多線程模型中各個線程都會執行 executeSingleThread(thread_num, num_threads).
/// 單線程中executeSingleThread(0,1)只是一個特例
while (!queue.empty() && !finished)
{
task_queue.push(queue.front(), thread_num);
queue.pop();
}
}
}
}
執行結束后,監測各個Node中是否有exception.
/// Execution can be stopped because of exception. Check and rethrow if any.
for (auto & node : graph->nodes)
if (node->exception)
std::rethrow_exception(node->exception);
Pipeline 執行
生產者/消費者模型
生產: 執行過程中如果ExecutingGraph的Node可以變為Executing,那么就可以被放進task_queue (此時開始執行時,是沿着direct_edge 執行,數據流動方向)。
消費: 被創建的線程池消費,最終調用IProcessor中的work()方法。然后更新相鄰的Node的狀態,再次產生Executing 狀態的Node。 一直到所有Node狀態變成Finished狀態,或者出現異常。
偽代碼式分析
queue中保留狀態為Executing狀態Node.
// 將Executing Node 放入到 task_queue中相應的thread需要處理的task queue中.
// task_queue: std::vector[[std::queue[Task*]],[std::queue[Task*]]], Task 模板類型,可以是Node.
task_queue.push(queue.front(), next_thread);
initializeExecution結束,
executeStepImpl()開始執行
//直到finished或者 yield.
while(!finished && !yield){
/// 准備
/// First, find any processor to execute.
/// Just traverse graph and prepare any processor.
while (!finished && node != nullptr){
...
// 從task_queue某個thread的Task隊列中移出Task(node)
node = task_queue.pop(thread_num);
// 如果是1個並行度,那么就在當前線程中執行.
// 下面代碼是指有多個Node並行執行是,會喚醒threads_queue中的其他線程一起執行。
if (node)
{
if (!task_queue.empty() && !threads_queue.empty()){
auto thread_to_wake = task_queue.getAnyThreadWithTasks()
...
}
}
}
執行
while (node && !yield){
// 當前線程執行Node任務
addJob(node);
...
// 最終調用Processor中的 work()方法處理.
node->job();
/// Try to execute neighbour processor.
{
/// Prepare processor after execution.
/// 找下個可以執行的Processors.
/// 如果有合適狀態的Node(Executing狀態),添加到queue隊列,如果沒有,結束任務.
{
auto lock = std::unique_lock<std::mutex>(node->status_mutex);
if (!prepareProcessor(node->processors_id, thread_num, queue, async_queue, std::move(lock)))
finish();
}
/// 執行下一批可以執行的Processors.
/// 將除queue中的第一個task之外的所有的任務搬運到task_queue,多線程執行這些Task.
/// 其中多線程模型中各個線程都會執行 executeSingleThread(thread_num, num_threads).
/// 單線程中executeSingleThread(0,1)只是一個特例
while (!queue.empty() && !finished)
{
task_queue.push(queue.front(), thread_num);
queue.pop();
}
}
}
}
執行結束后,監測各個Node中是否有exception.
/// Execution can be stopped because of exception. Check and rethrow if any.
for (auto & node : graph->nodes)
if (node->exception)
std::rethrow_exception(node->exception);
queue中保留狀態為Executing狀態Node.
// 將Executing Node 放入到 task_queue中相應的thread需要處理的task queue中.
// task_queue: std::vector[[std::queue[Task*]],[std::queue[Task*]]], Task 模板類型,可以是Node.
task_queue.push(queue.front(), next_thread);
2. initializeExecution結束,
executeStepImpl()開始執行
//直到finished或者 yield.
while(!finished && !yield){
/// 准備
/// First, find any processor to execute.
/// Just traverse graph and prepare any processor.
while (!finished && node != nullptr){
...
// 從task_queue某個thread的Task隊列中移出Task(node)
node = task_queue.pop(thread_num);
// 如果是1個並行度,那么就在當前線程中執行.
// 下面代碼是指有多個Node並行執行是,會喚醒threads_queue中的其他線程一起執行。
if (node)
{
if (!task_queue.empty() && !threads_queue.empty()){
auto thread_to_wake = task_queue.getAnyThreadWithTasks()
...
}
}
}
執行
while (node && !yield){
// 當前線程執行Node任務
addJob(node);
...
// 最終調用Processor中的 work()方法處理.
node->job();
/// Try to execute neighbour processor.
{
/// Prepare processor after execution.
/// 找下個可以執行的Processors.
/// 如果有合適狀態的Node(Executing狀態),添加到queue隊列,如果沒有,結束任務.
{
auto lock = std::unique_lock<std::mutex>(node->status_mutex);
if (!prepareProcessor(node->processors_id, thread_num, queue, async_queue, std::move(lock)))
finish();
}
/// 執行下一批可以執行的Processors.
/// 將除queue中的第一個task之外的所有的任務搬運到task_queue,多線程執行這些Task.
/// 其中多線程模型中各個
Private Question
如果某個pipeline出現異常excpetion,提前結束,如何通知其他pipeline的執行者終止執行?
很多繼承IProcessor的類,並沒有實現cancel方法。推測不會生效
// 如果某個node在執行時,出現異常那么就會通知所有的Processors停止執行。
//
if (node->exception)
cancel();
// 這里發生運行時異常時,會調用所有processor的cancel()方法,
// 有些processor沒有實現這個方法。可以調用IProcessor的cancel().
void PipelineExecutor::cancel()
{
cancelled = true;
finish();
// 這里的processors是PipelineExecutor中的變量,包含所有processors。
std::lock_guard guard(processors_mutex);
for (auto & processor : processors)
processor->cancel();
}
繼承Processor的Transform的work()方法調用是否從task_queue出隊后執行的? 是的
Executing Graph Node沿着 direct_edge方向運行?
首先執行MergeTreeSelector(Ready 狀態).
/src/Processors/ISource.cpp
數據和狀態變化是同步的.
output.pushData(std::move(current_chunk));
邊執行Node邊添加?
這種算法有兩種傾向:
1. 盡可能先處理Direct方向的edge,因為data被消費出Pipeline后,才能使MergeTreeSelector Processor pull 數據進Pipeline
2.DFS
PipelineExecutor::prepareProcessor(){
...
node.last_processor_status = node.processor->prepare(node.updated_input_ports, node.updated_output_ports);
// DFS 過程中僅當node.last_processor_status為Ready才會被放入執行隊列.
switch (node.last_processor_status)
{
...
case IProcessor::Status::Ready:
{
node.status = ExecutingGraph::ExecStatus::Executing;
queue.push(&node);
break;
}
...
}
...
// 盡可能從direct 方向進行DFS搜索。
for (auto & edge : updated_direct_edges)
{
if (!tryAddProcessorToStackIfUpdated(*edge, queue, async_queue, thread_number))
return false;
}
// 反向DFS初始化
for (auto & edge : updated_back_edges)
{
if (!tryAddProcessorToStackIfUpdated(*edge, queue, async_queue, thread_number))
return false;
}
}
// 僅當與此edge連接的Node狀態Idle才會繼續DFS,其他情況作為剪枝情況,停止DFS.
tryAddProcessorToStackIfUpdated() {
if (status == ExecutingGraph::ExecStatus::Idle)
{
node.status = ExecutingGraph::ExecStatus::Preparing;
return prepareProcessor(edge.to, thread_number, queue, async_queue, std::move(lock));
}
分析沿着direct_edge和back_edge 進行DFS遍歷Node 不會死循環.
1.調用LimitTransform到node id 是1的時候, 節點的last_processor_status從 NeedData --> Finished。因為LimitTransform 得到了超過limit 為1的rows.所以狀態可以變為結束.
1.1 LimitTransform 此時outport和inport都被更新,direct_edge和backward_edge方向的Node都需要被更新.由於DFS,會沿着direct_edge方形更新下一個節點ISimpleTransform Node.
// 當前Node(LimitTransform) 是接收 MergeTreeSelector輸出的節點。
// 因為MergeTreeSelector 和 當前節點Data是指向同一個數據.
// 在后面兩個循環對數據進行了依次轉移。此方法沒有重寫 IProcessor的work()方法。
// prepare 時就完成了數據的處理。主要是對limit進行限制。
// process_pair()->preparePair()-> LimitTransform::splitChunk(PortsData & data),
//對已有數據在當前Transform進行進行組裝.
for (auto pos : updated_input_ports)
process_pair(pos);
for (auto pos : updated_output_ports)
process_pair(pos);
/// If we reached limit for some port, then close others. Otherwise some sources may infinitely read data.
/// Example: SELECT * FROM system.numbers_mt WHERE number = 1000000 LIMIT 1
if ((!limit_is_unreachable && rows_read >= offset + limit)
&& !previous_row_chunk && !always_read_till_end)
{
for (auto & input : inputs)
input.close();
//
for (auto & output : outputs)
output.finish();
return Status::Finished;
}

2.ISimpleTransform 會將LimitTransform通過outport設置的數據,通過自己的inport處理. last_processor_status: NeedData->Ready. (這里沒有設置post_updated_out_ports原因是,這個Transform有work()方法,只有通過work()方法處理才能將inport中的數據轉移到outport中。)所以這里會將該Node放到queue上去執行。(這里不會沿着direct_edg 一直更新下去Node)
case IProcessor::Status::Ready:
{
node.status = ExecutingGraph::ExecStatus::Executing;
queue.push(&node);
break;
}
3.繼續更新LimitTransform back_edge方向的Node.因為只有一個MergeTreeSelector。 ?這樣會陷入死循環而使ISimpleTransform沒有辦法執行么? 從LimitTransform 更新到 MergeTreeSelector,然后MergeTreeSelector通過direct_edge又更新回來?不會,因為LimitTransform 在拿到limit (本次測試limit 是 1) 的數據時,會對和它公用的Data數據進行狀態更新,更新為State::IS_FINISHED.這樣就整體結束了。
- 執行完所有和LimitTransform Node相關的Node狀態,開始執行加入到queue中的ISimpleTransform. 同樣也是按照direct_edge方向執行.直到所有節點都到Finished狀態或者出現異常。
- expand_pipeline目前沒有接觸到.
參考
使用分治方法。
執行收尾時檢查否有異常(異常保存在node中).
/// Execution can be stopped because of exception. Check and rethrow if any.
for (auto & node : graph->nodes)
if (node->exception)
std::rethrow_exception(node->exception);
一些總結
- Processor的執行方式上有兩種,一種是實現了work()方法的Processor,這種Node需要加入到queue中執行。另一種,沒有work方法,在prepare此Node時,就等價於對Node進行了處理。
比如 LimitTransform 是沒有實現work()方法的,那么它的數據從(inport->outport)流動是在prepare時 就完成了。 但是IsimpleTransform 實現了work()方法,說明處理inport的數據然后輸出到outport 可能比較復雜,需要 發送到queue中,然后通過專門調用addJob(), job->work()單獨處理這個任務。
- 針對Processor的狀態更新,使用DFS算法,先direct_edge方向,back_edge方向更新🔗Node的對應狀態。
- 當Node更新某個端口數據時,會同時影響共有數據的State中的狀態。例如
當LimitTransform 設置為Finished時,會更新inport中的Data的狀態,會影響到MergeTreeSelector outport的狀態。 auto flags = state->setFlags(State::IS_FINISHED, State::IS_FINISHED);
- 在更新Node節點狀態時,有加鎖動作.
std::unique_lock lock(node.status_mutex);
- 執行Execution Graph 的架構
主線程 與 執行線程 解耦, 通過 LazyOutputFormat中的 Queue(非全局變量)進行通信.
Pipeline 執行 結束
執行Pipeline 總體模型
執行在TCPHandler 類發起.
發起者執行的邏輯,創建異步線程綁定到需要輸出的data上.
這個data就是PullingAsyncPipelineExecutor和異步執行線程需要通信的"全局變量"。
多threads Execute Graph調度執行 分析
- 如果有兩個Node(比如max_threads也是2,),那么在PipelineExecutor中如何協同線程工作的。
首先PipelineExecutor會創建max_threads的線程以及一個存放Tasks的數據結構.
queue: vector<std::deque<Task *>> // 先入先出的隊列,存放需要執行的Node的結構
[ [NodePtr0, NodePtr1, NodePtr2, ...] ]
[ [NodePtr0, NodePtr1, NodePtr2, ...] ]
....
[ [NodePtr0, NodePtr1, NodePtr2, ...] ]
1.任意thread_i都可以看到這些需要被執行的task.
2.每個 thread_i 會綁定 到 一個ExecutorContext_i,並wait ExecutorContext_i中的wake_flag, 有task_queue被更新。
3.當某個thread_i 執行掃描可運行的Node時,會將對應的task (node) push到
task_queue[thread_j](j可以不等於i) 隊列,(同時將node從當前queue中出隊)中,並set wake_flag,讓這個線程thread_j去干活。
4.這保證了pipeline找那個處於Executing狀態的Node可以充分運行。
threads: 模擬線程
std::vector<std::unique_ptr<ExecutorContext>> executor_contexts;
/***
/// Context for each thread.
struct ExecutorContext
{
/// Will store context for all expand pipeline tasks (it's easy and we don't expect many).
/// This can be solved by using atomic shard ptr.
std::list<ExpandPipelineTask> task_list;
std::queue<ExecutingGraph::Node *> async_tasks;
std::atomic_bool has_async_tasks = false;
std::condition_variable condvar;
std::mutex mutex;
bool wake_flag = false;
/// Currently processing node.
ExecutingGraph::Node * node = nullptr;
/// Exception from executing thread itself.
std::exception_ptr exception;
#ifndef NDEBUG
/// Time for different processing stages.
UInt64 total_time_ns = 0;
UInt64 execution_time_ns = 0;
UInt64 processing_time_ns = 0;
UInt64 wait_time_ns = 0;
#endif
};
***/
?怎么初始化,怎么使用?
// 初始化這些線程 (使用globalThread),並且每個thread名字都是 QueryPipelineEx.
// 初始化threads的函數是一個lambda表達式,其中capture了this指針.
for (size_t i = 0; i < num_threads; ++i)
{
threads.emplace_back([this, thread_group, thread_num = i, num_threads]
{
/// ThreadStatus thread_status;
setThreadName("QueryPipelineEx");
if (thread_group)
CurrentThread::attachTo(thread_group);
SCOPE_EXIT_SAFE(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
try
{
executeSingleThread(thread_num, num_threads);
}
catch (...)
{
/// In case of exception from executor itself, stop other threads.
finish();
executor_contexts[thread_num]->exception = std::current_exception();
}
});
}
線程的上下文 主要是功能是在保證在多個Node可以在多個線程執行時,保持一個Node在主線程執行之外,
其他的Node可以被其他thread執行。
這種模型還是很好的。
個人疑問? max_threads為什么會在兩個MergeTreeSelector時為1.
max_threads 會在某處被設置.
settings.max_block_size // default = 65505 bytes.
// 如果 SQL 中的LIMIT 和 OFFSET 常量 的和 小於 max_block_size,那么 會影響ExecuteGraph的並行度。
if (!query.distinct
&& !query.limit_with_ties
&& !query.prewhere()
&& !query.where()
&& !query.groupBy()
&& !query.having()
&& !query.orderBy()
&& !query.limitBy()
&& query.limitLength()
&& !query_analyzer->hasAggregation()
&& !query_analyzer->hasWindow()
&& limit_length <= std::numeric_limits<UInt64>::max() - limit_offset
&& limit_length + limit_offset < max_block_size)
{
max_block_size = std::max(UInt64(1), limit_length + limit_offset);
max_threads_execute_query = max_streams = 1;
}
由於測試select 語句中offset數量太小,導致雖然有兩個ReadFromMergeTreeSelector,仍然是1個線程在執行。
個人疑問? 執行因超時Cancelling 如何中斷PipelineExecutor執行, cancelled 是做這個使用的么? Cancelled。
當執行 processOrdinaryQueryWithProcessors()時,會piplineExecutor 會調用pull()方法。
每隔100ms 會判斷是否用戶發送過來 cancel query的命令。如果有
當執行 processOrdinaryQueryWithProcessors()時,會piplineExecutor 會調用pull()方法。
每隔100ms 會判斷是否用戶發送過來 cancel query的命令。如果有
while (executor.pull(block, interactive_delay / 1000))
{
std::lock_guard lock(task_callback_mutex);
if (isQueryCancelled())
{
/// A packet was received requesting to stop execution of the request.
executor.cancel();
break;
}
最終調用PipelineExecutor::cancel() 設置Pipeline::atomic_bool 類型為true.
實驗時沒有看到對應的pipeline被打斷執行,(sql 被cancel還是會被執行到底)。
最終調用PipelineExecutor::cancel() 設置Pipeline::atomic_bool 類型為true.
實驗時沒有看到對應的pipeline被打斷執行,(sql 被cancel還是會被執行到底)。
