Processor 簡述
Processor是處理SQL中各步驟數據的基本單元。數據,從Processor流入,經過Processor處理,處理后從Processor流出。
從功能上主要分為三類,
1.輸入數據,抽象ISource
2.過程處理,Transform
3.結果輸出(一般是常見是寫磁盤),ISink
以select * from table1 語句為例,數據首先從磁盤讀入ISource類型Processor0,流入Transform類型的Processor1,結果輸出給Client,SQL執行結束。
Clickhouse 中的 Processor關系圖
Processor的結構 (靜態展示)
單個Processor的功能模塊
2.數據流出端口 Outport
3.中間處理,Processor中的transform方法從Inport中讀取數據,然后寫入Outport。

例子 ConcatProcessor
這個Processor可多個Inport讀取數據,然后輸出結果到一個Outport

類比現實中水管聯通基本單元 (圖片源自網絡)

鏈接水管設備圖
引入Pipeline
Pipeline 本質
將Processor(小段水管pipe)串聯構成管道。
類比現實管道 (圖片源自網絡)

管道
SQL中的pipeline
通過 SQL語法展示 某個執行SQL的pipeline。
例子,SELECT 查詢 : SELECT id from table_map LIMIT 10, 2。
EXPLAIN PIPELINE
SELECT id
FROM table_map
LIMIT 10, 2
Query id: 078bc729-38f4-4788-8db7-875d00e74487
┌─explain────────────────────────────┐
│ (Expression) │
│ ExpressionTransform │
│ (SettingQuotaAndLimits) │
│ (Limit) │
│ Limit │
│ (ReadFromMergeTree) │
│ Concat 2 → 1 │
│ MergeTreeInOrder × 2 0 → 1 │
└────────────────────────────────────┘
↑ Progress: 8.00 rows, 230.00 B (1.43 thousand rows/s., 41.21 KB/
8 rows in set. Elapsed: 0.006 sec.
Pipeline 輸出解讀
- 整體語義
- Select 語句 的 pipeline,以遞進關系表示依賴,上層依賴下層的輸入。
- 其中pipeline ()里面的內容就是QueryPlan的節點名。 請見explain pipeline 中藍色括號的內容與 下面query plan的內容。
- 數字語義
- 每個pipeline單元 后面都會標識並行度(默認是1沒有標識), 'x' 后面的數據代表並行度(Processor的數量), n1 -> n2, '->',代表數據流向, n1代表Inport數量, n2代表OutPort數量。MergeTreeInOrder × 2 0 → 1,代表有兩個並行度,0個Inport, 1個Outport。
查看SQL的執行計划 QueryPlan,我們發現表達的語義是一致的。
queryPlan.
MacBook.local :) explain select id from table_map limit 200 offset 0
Query id: 7666532a-6a3e-43c5-8160-a4539edc984d
┌─explain───────────────────────────────────────────────────────────────────┐
│ Expression ((Projection + Before ORDER BY)) │
│ SettingQuotaAndLimits (Set limits and quota after reading from storage) │
│ Limit (preliminary LIMIT (without OFFSET)) │
│ ReadFromMergeTree │
└───────────────────────────────────────────────────────────────────────────┘
↙ Progress: 4.00 rows, 221.00 B (2.73 thousand rows/s., 150.65 KB
4 rows in set. Elapsed: 0.002 sec.
Pipeline 三部曲
- Pipeline的構建
- Pipeline的執行准備
- Pipeline的執行
pipeline的構建 [開始]
局部構建Pipeline
上述SQL的例子: 以MergeTreeInOrder到Concat構建為例。
│ Concat 2 → 1 │
│ MergeTreeInOrder × 2 0 → 1 │
局部構建的Pipeline圖片

圖ReadFromMergeTree& ConcatProcessor 組裝后的pipeline.
構建完整的Pipeline 圖

組裝端口相關代碼(ReadFromMergeTree 部分的Pipeline):
- DFS 根據QueryPlan 構建Pipeline (從沒有children 的node開始構建第一個pipeline)。然后不斷構建直到Pipeline都構建完畢。
QueryPipelinePtr QueryPlan::buildQueryPipeline(..){ QueryPipelinePtr last_pipeline; std::stack<Frame> stack; stack.push(Frame{.node = root}); while (!stack.empty()) { auto & frame = stack.top(); if (last_pipeline) { frame.pipelines.emplace_back(std::move(last_pipeline)); last_pipeline = nullptr; //-V1048 } size_t next_child = frame.pipelines.size(); // 當一個Node中的children都組裝成了Pipeline時,那么就可以當前Node與children組裝好的 // pipeline 構造新的Pipeline. if (next_child == frame.node->children.size()) { bool limit_max_threads = frame.pipelines.empty(); last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines), build_pipeline_settings); if (limit_max_threads && max_threads) last_pipeline->limitMaxThreads(max_threads); stack.pop(); } else stack.push(Frame{.node = frame.node->children[next_child]}); }
打通processor到對應Step.(暫時不明目的)
即設置
IProcessor 中的query_plan_step為當前的Step. (可能多個Processor對應一個Step).
detachProcessors() 方法: for (auto & processor : processors) processor->setQueryPlanStep(step, group); 即設置 IProcessor 中的query_plan_step為當前的Step. (可能多個Processor對應一個Step).
Pipeline構建 [結束]
所有的processors都存放在了last_pipeline,返回給調用者。

Pipeline重要數據結構分析
Pipeline中繼承IProcessor的類的端口使用IProcessor中的成員變量。
ISource的 output端口中的Input_port就是 IProcessor的outputs[0]的input_port.
Pipeline中 Inport與OutPort端口間公用State。
即上游Processor在Outport流出數據(寫入某個地方),就能被下游Processor 從InPort讀取。

端口間數據
驗證 端口間公用State.
查看State的地址.
MergeTreeOrderSelectProcessor 的 OutPort 的State

ConcatProcessor 的InPort 的State

賦值給更高級的抽象 BlockIO

Pipeline的執行准備 [開始] 下一個Stage
構建Executing Graph
Executing Graph Node 和 edge
ExecutingGraph::ExecutingGraph(const Processors & processors) { uint64_t num_processors = processors.size(); nodes.reserve(num_processors); /// Create nodes. for (uint64_t node = 0; node < num_processors; ++node) { IProcessor * proc = processors[node].get(); processors_map[proc] = node; nodes.emplace_back(std::make_unique<Node>(proc, node)); } /// Create edges. for (uint64_t node = 0; node < num_processors; ++node) addEdges(node); }
Excuting Graph edge 更新相鄰Port狀態
當前Processor的Port處理數據完畢后,通過edge到對下一個Processor的對應Port進行處理。
if (from_output < outputs.size()) { was_edge_added = true; for (auto it = std::next(outputs.begin(), from_output); it != outputs.end(); ++it, ++from_output) { const IProcessor * to = &it->getInputPort().getProcessor(); auto input_port_number = to->getInputPortNumber(&it->getInputPort()); Edge edge(0, false, input_port_number, from_output, &nodes[node]->post_updated_output_ports); // 將edge添加到當前node的direct_edges中. 使用move語義. auto & added_edge = addEdge(nodes[node]->direct_edges, std::move(edge), from, to); // Port中的update_info 和 edge的update_info 指向同一個地方. it->setUpdateInfo(&added_edge.update_info); } }
構建Executing Graph 結束
更新Execute Graph Node狀態
創建另外線程執行

創建線程池,可並行執行Execute Graph.
Pull 模式 (控制方向類比圖)

Node中的edge 方向(back vs direct)
- Node A 的direct_edge
// direct_edges 是指的 [ ] 節點A output 到child節點B // | // \|/ (direct_edge) 數據流指出去的edge. // [ ] 節點B
- NodeA 的back_edges
// back_edges 是指 [ ] 節點 A 從節點B 流入數據 // /|\ // | (back_edge) 數據流指向自己的edge [ ] 節點B
例子: ConcatProcessor 相應的Node
2個back_edges: 有兩個Inport
1個direct_edge: 一個outport

Pull模式初始化
挑選所有沒有OutPort的Node. (當前SQL例子為 lazyOutputFormat)
void PipelineExecutor::addChildlessProcessorsToStack(Stack & stack) { UInt64 num_processors = processors.size(); for (UInt64 proc = 0; proc < num_processors; ++proc) { // 從root指向child方向的邊, 在Select查詢語句中,lazyOutputFormat是沒有output到其他節點的Node. if (graph->nodes[proc]->direct_edges.empty()) { stack.push(proc); /// do not lock mutex, as this function is executed in single thread graph->nodes[proc]->status = ExecutingGraph::ExecStatus::Preparing; } } }
初始化所有Node的狀態. 參考 控制方向類比圖
遞歸調用 prepareProcessor(),根據Graph Node中的edge更新所有Node狀態。

如圖所示,如果是backward為true,那么當前端口就是InputPort, 下一個節點Port是OutputPort。共同組成edge (InputPort <---- OutpurtPort).
Private question.
代碼分析
// tryAddProcessorToStackIfUpdated. // 通過edge.to得到關聯節點,並做初始化 auto & node = *graph->nodes[edge.to]; std::unique_lock lock(node.status_mutex); ExecutingGraph::ExecStatus status = node.status; if (status == ExecutingGraph::ExecStatus::Finished) return true; // 如果當前邊的方向是backward,那么將edge中相應的port number添加到updated_output_ports中 if (edge.backward) node.updated_output_ports.push_back(edge.output_port_number); else node.updated_input_ports.push_back(edge.input_port_number); if (status == ExecutingGraph::ExecStatus::Idle) { node.status = ExecutingGraph::ExecStatus::Preparing; // 這里調用prepareProcessor是PipelineExecutor. return prepareProcessor(edge.to, thread_number, queue, async_queue, std::move(lock)); }
如何從Node中得到 下一個需要處理的edge.
// 當執行完 當前node.processor的prepare方法以后,我們就得到了下一個edge. node.last_processor_status = node.processor->prepare(node.updated_input_ports, node.updated_output_ports);
Port.h中 // InPort.setNeeded(). 如字面意思,開始設置Node的Inport端口信息.而Port和Edge關心很緊密, 在Port中的UpdateInfo中存儲的id就是與當前Port關聯的edge的地址. 見下圖 void ALWAYS_INLINE setNeeded() { assumeConnected(); if ((state->setFlags(State::IS_NEEDED, State::IS_NEEDED) & State::IS_NEEDED) == 0) updateVersion(); } void inline ALWAYS_INLINE updateVersion() { if (likely(update_info)) update_info->update(); } }; void inline ALWAYS_INLINE update() { if (version == prev_version && update_list) update_list->push_back(id); ++version; }
update_info 中的 Id 是 edge的地址.

對應Graph Node中的back_edge中的update_info

id 指向擁有這個update_info的edge.
ExecutingGraph::Edge & ExecutingGraph::addEdge(Edges & edges, Edge edge, const IProcessor * from, const IProcessor * to) { auto it = processors_map.find(to); if (it == processors_map.end()) { String msg = "Processor " + to->getName() + " was found as " + (edge.backward ? "input" : "output") + " for processor " + from->getName() + ", but not found in list of processors."; throw Exception(msg, ErrorCodes::LOGICAL_ERROR); } edge.to = it->second; auto & added_edge = edges.emplace_back(std::move(edge)); added_edge.update_info.id = &added_edge; return added_edge; }
Pipeline::prepareProcessor()作用
-
初始化所有相關的Processors/Nodes的狀態
- 獲取當前 node_x 的Inport, 初始化,通過當前 node_x InPort中的update_info結構中的id 得到edge_e 地址.
- 根據edge_e中的to字段(下一個Node的index),在graph中獲得下一個node_y。
- 根據edge_e中的output_number信息對node_y進行初始化。
- 執行node_y中的processor.prepare()方法
- 如果能執行到 input.setNeeded(),那么就得到了下一批edges. 回到a.
- 如果執行MergeTreeSelector時,沒有Input.setNeeded(). 此時,設置node.status為Ready,將此Node加入到Queue,並將此Node狀態置為Executing,表明此Node已經可以被執行。
case IProcessor::Status::Ready: { node.status = ExecutingGraph::ExecStatus::Executing; queue.push(&node); break; }
並且這個Node,沒有需要更新的edge,所以就可以返回。
初始化Node結束,MergeTreeSelect對應的Node作為准備好的Node放到queue中,這些Node作為SQL執行開始的Task發送給task_queue給threads執行。

Processor的狀態(會決定Node的狀態)
enum class Status { /// Processor needs some data at its inputs to proceed. /// You need to run another processor to generate required input and then call 'prepare' again. NeedData, /// Processor cannot proceed because output port is full or not isNeeded(). /// You need to transfer data from output port to the input port of another processor and then call 'prepare' again. PortFull, /// All work is done (all data is processed or all output are closed), nothing more to do. Finished, /// No one needs data on output ports. /// Unneeded, /// You may call 'work' method and processor will do some work synchronously. Ready, /// You may call 'schedule' method and processor will return descriptor. /// You need to poll this descriptor and call work() afterwards. Async, /// Processor wants to add other processors to pipeline. /// New processors must be obtained by expandPipeline() call. ExpandPipeline, };
參與執行的所有Node是不是都會進入Executing狀態?
不是,只有正常執行,且實現了work()方法的Node,會從Preparing狀態->Executing狀態,然后交給 queue,再去執行。
Excute Graph Node更新結束
狀態圖 : MergeTreeSelector [status: Executing, last_processor_status: Ready] OtherSelectors [status: Idle, last_processor_status: NeedData]
總結
為什么需要構建ExecuteGraph?
個人理解就是為了完成一個class單獨完成自己的事情。
Execute Graph 本質就是包含了Processor,有狀態,可以被調度執行的Graph。
Pipeline是解決數據通過端口流通的結構,本身並不關心如何被調度執行。
准備Pipeline執行時,有一個Node失敗,會很快返回。
1. MergeTreeSelector的狀態 從Executing -> PortFull.
2.根據寫的DFS規則,更新並添加相關節點. MergeTreeSelector只有direct_edge.
MergeTreeSelector [status: Executing, last_processor_status: Ready]
OtherSelectors [status: Idle, last_processor_status: NeedData]
queue中保留狀態為Executing狀態Node. // 將Executing Node 放入到 task_queue中相應的thread需要處理的task queue中. // task_queue: std::vector[[std::queue[Task*]],[std::queue[Task*]]], Task 模板類型,可以是Node. task_queue.push(queue.front(), next_thread); 2. initializeExecution結束, executeStepImpl()開始執行 //直到finished或者 yield. while(!finished && !yield){ /// 准備 /// First, find any processor to execute. /// Just traverse graph and prepare any processor. while (!finished && node != nullptr){ ... // 從task_queue某個thread的Task隊列中移出Task(node) node = task_queue.pop(thread_num); // 如果是1個並行度,那么就在當前線程中執行. // 下面代碼是指有多個Node並行執行是,會喚醒threads_queue中的其他線程一起執行。 if (node) { if (!task_queue.empty() && !threads_queue.empty()){ auto thread_to_wake = task_queue.getAnyThreadWithTasks() ... } } } 執行 while (node && !yield){ // 當前線程執行Node任務 addJob(node); ... // 最終調用Processor中的 work()方法處理. node->job(); /// Try to execute neighbour processor. { /// Prepare processor after execution. /// 找下個可以執行的Processors. /// 如果有合適狀態的Node(Executing狀態),添加到queue隊列,如果沒有,結束任務. { auto lock = std::unique_lock<std::mutex>(node->status_mutex); if (!prepareProcessor(node->processors_id, thread_num, queue, async_queue, std::move(lock))) finish(); } /// 執行下一批可以執行的Processors. /// 將除queue中的第一個task之外的所有的任務搬運到task_queue,多線程執行這些Task. /// 其中多線程模型中各個線程都會執行 executeSingleThread(thread_num, num_threads). /// 單線程中executeSingleThread(0,1)只是一個特例 while (!queue.empty() && !finished) { task_queue.push(queue.front(), thread_num); queue.pop(); } } } } 執行結束后,監測各個Node中是否有exception. /// Execution can be stopped because of exception. Check and rethrow if any. for (auto & node : graph->nodes) if (node->exception) std::rethrow_exception(node->exception);
Pipeline 執行
生產者/消費者模型
生產: 執行過程中如果ExecutingGraph的Node可以變為Executing,那么就可以被放進task_queue (此時開始執行時,是沿着direct_edge 執行,數據流動方向)。
消費: 被創建的線程池消費,最終調用IProcessor中的work()方法。然后更新相鄰的Node的狀態,再次產生Executing 狀態的Node。 一直到所有Node狀態變成Finished狀態,或者出現異常。
偽代碼式分析
queue中保留狀態為Executing狀態Node. // 將Executing Node 放入到 task_queue中相應的thread需要處理的task queue中. // task_queue: std::vector[[std::queue[Task*]],[std::queue[Task*]]], Task 模板類型,可以是Node. task_queue.push(queue.front(), next_thread); initializeExecution結束, executeStepImpl()開始執行 //直到finished或者 yield. while(!finished && !yield){ /// 准備 /// First, find any processor to execute. /// Just traverse graph and prepare any processor. while (!finished && node != nullptr){ ... // 從task_queue某個thread的Task隊列中移出Task(node) node = task_queue.pop(thread_num); // 如果是1個並行度,那么就在當前線程中執行. // 下面代碼是指有多個Node並行執行是,會喚醒threads_queue中的其他線程一起執行。 if (node) { if (!task_queue.empty() && !threads_queue.empty()){ auto thread_to_wake = task_queue.getAnyThreadWithTasks() ... } } } 執行 while (node && !yield){ // 當前線程執行Node任務 addJob(node); ... // 最終調用Processor中的 work()方法處理. node->job(); /// Try to execute neighbour processor. { /// Prepare processor after execution. /// 找下個可以執行的Processors. /// 如果有合適狀態的Node(Executing狀態),添加到queue隊列,如果沒有,結束任務. { auto lock = std::unique_lock<std::mutex>(node->status_mutex); if (!prepareProcessor(node->processors_id, thread_num, queue, async_queue, std::move(lock))) finish(); } /// 執行下一批可以執行的Processors. /// 將除queue中的第一個task之外的所有的任務搬運到task_queue,多線程執行這些Task. /// 其中多線程模型中各個線程都會執行 executeSingleThread(thread_num, num_threads). /// 單線程中executeSingleThread(0,1)只是一個特例 while (!queue.empty() && !finished) { task_queue.push(queue.front(), thread_num); queue.pop(); } } } } 執行結束后,監測各個Node中是否有exception. /// Execution can be stopped because of exception. Check and rethrow if any. for (auto & node : graph->nodes) if (node->exception) std::rethrow_exception(node->exception);
queue中保留狀態為Executing狀態Node.
// 將Executing Node 放入到 task_queue中相應的thread需要處理的task queue中.
// task_queue: std::vector[[std::queue[Task*]],[std::queue[Task*]]], Task 模板類型,可以是Node.
task_queue.push(queue.front(), next_thread);
2. initializeExecution結束,
executeStepImpl()開始執行 //直到finished或者 yield. while(!finished && !yield){ /// 准備 /// First, find any processor to execute. /// Just traverse graph and prepare any processor. while (!finished && node != nullptr){ ... // 從task_queue某個thread的Task隊列中移出Task(node) node = task_queue.pop(thread_num); // 如果是1個並行度,那么就在當前線程中執行. // 下面代碼是指有多個Node並行執行是,會喚醒threads_queue中的其他線程一起執行。 if (node) { if (!task_queue.empty() && !threads_queue.empty()){ auto thread_to_wake = task_queue.getAnyThreadWithTasks() ... } } } 執行 while (node && !yield){ // 當前線程執行Node任務 addJob(node); ... // 最終調用Processor中的 work()方法處理. node->job(); /// Try to execute neighbour processor. { /// Prepare processor after execution. /// 找下個可以執行的Processors. /// 如果有合適狀態的Node(Executing狀態),添加到queue隊列,如果沒有,結束任務. { auto lock = std::unique_lock<std::mutex>(node->status_mutex); if (!prepareProcessor(node->processors_id, thread_num, queue, async_queue, std::move(lock))) finish(); }
/// 執行下一批可以執行的Processors.
/// 將除queue中的第一個task之外的所有的任務搬運到task_queue,多線程執行這些Task.
/// 其中多線程模型中各個
Private Question
如果某個pipeline出現異常excpetion,提前結束,如何通知其他pipeline的執行者終止執行?
很多繼承IProcessor的類,並沒有實現cancel方法。推測不會生效
// 如果某個node在執行時,出現異常那么就會通知所有的Processors停止執行。 // if (node->exception) cancel(); // 這里發生運行時異常時,會調用所有processor的cancel()方法, // 有些processor沒有實現這個方法。可以調用IProcessor的cancel(). void PipelineExecutor::cancel() { cancelled = true; finish(); // 這里的processors是PipelineExecutor中的變量,包含所有processors。 std::lock_guard guard(processors_mutex); for (auto & processor : processors) processor->cancel(); }
繼承Processor的Transform的work()方法調用是否從task_queue出隊后執行的? 是的
Executing Graph Node沿着 direct_edge方向運行?
首先執行MergeTreeSelector(Ready 狀態).
/src/Processors/ISource.cpp 數據和狀態變化是同步的. output.pushData(std::move(current_chunk));
邊執行Node邊添加?
這種算法有兩種傾向:
1. 盡可能先處理Direct方向的edge,因為data被消費出Pipeline后,才能使MergeTreeSelector Processor pull 數據進Pipeline
2.DFS
PipelineExecutor::prepareProcessor(){ ... node.last_processor_status = node.processor->prepare(node.updated_input_ports, node.updated_output_ports); // DFS 過程中僅當node.last_processor_status為Ready才會被放入執行隊列. switch (node.last_processor_status) { ... case IProcessor::Status::Ready: { node.status = ExecutingGraph::ExecStatus::Executing; queue.push(&node); break; } ... } ... // 盡可能從direct 方向進行DFS搜索。 for (auto & edge : updated_direct_edges) { if (!tryAddProcessorToStackIfUpdated(*edge, queue, async_queue, thread_number)) return false; } // 反向DFS初始化 for (auto & edge : updated_back_edges) { if (!tryAddProcessorToStackIfUpdated(*edge, queue, async_queue, thread_number)) return false; } } // 僅當與此edge連接的Node狀態Idle才會繼續DFS,其他情況作為剪枝情況,停止DFS. tryAddProcessorToStackIfUpdated() { if (status == ExecutingGraph::ExecStatus::Idle) { node.status = ExecutingGraph::ExecStatus::Preparing; return prepareProcessor(edge.to, thread_number, queue, async_queue, std::move(lock)); }
分析沿着direct_edge和back_edge 進行DFS遍歷Node 不會死循環.
1.調用LimitTransform到node id 是1的時候, 節點的last_processor_status從 NeedData --> Finished。因為LimitTransform 得到了超過limit 為1的rows.所以狀態可以變為結束.
1.1 LimitTransform 此時outport和inport都被更新,direct_edge和backward_edge方向的Node都需要被更新.由於DFS,會沿着direct_edge方形更新下一個節點ISimpleTransform Node.
// 當前Node(LimitTransform) 是接收 MergeTreeSelector輸出的節點。
// 因為MergeTreeSelector 和 當前節點Data是指向同一個數據. // 在后面兩個循環對數據進行了依次轉移。此方法沒有重寫 IProcessor的work()方法。 // prepare 時就完成了數據的處理。主要是對limit進行限制。 // process_pair()->preparePair()-> LimitTransform::splitChunk(PortsData & data), //對已有數據在當前Transform進行進行組裝. for (auto pos : updated_input_ports) process_pair(pos); for (auto pos : updated_output_ports) process_pair(pos); /// If we reached limit for some port, then close others. Otherwise some sources may infinitely read data. /// Example: SELECT * FROM system.numbers_mt WHERE number = 1000000 LIMIT 1 if ((!limit_is_unreachable && rows_read >= offset + limit) && !previous_row_chunk && !always_read_till_end) { for (auto & input : inputs) input.close(); // for (auto & output : outputs) output.finish(); return Status::Finished; }
2.ISimpleTransform 會將LimitTransform通過outport設置的數據,通過自己的inport處理. last_processor_status: NeedData->Ready. (這里沒有設置post_updated_out_ports原因是,這個Transform有work()方法,只有通過work()方法處理才能將inport中的數據轉移到outport中。)所以這里會將該Node放到queue上去執行。(這里不會沿着direct_edg 一直更新下去Node)
case IProcessor::Status::Ready: { node.status = ExecutingGraph::ExecStatus::Executing; queue.push(&node); break; }
3.繼續更新LimitTransform back_edge方向的Node.因為只有一個MergeTreeSelector。 ?這樣會陷入死循環而使ISimpleTransform沒有辦法執行么? 從LimitTransform 更新到 MergeTreeSelector,然后MergeTreeSelector通過direct_edge又更新回來?不會,因為LimitTransform 在拿到limit (本次測試limit 是 1) 的數據時,會對和它公用的Data數據進行狀態更新,更新為State::IS_FINISHED.這樣就整體結束了。
- 執行完所有和LimitTransform Node相關的Node狀態,開始執行加入到queue中的ISimpleTransform. 同樣也是按照direct_edge方向執行.直到所有節點都到Finished狀態或者出現異常。
- expand_pipeline目前沒有接觸到.
參考

使用分治方法。


執行收尾時檢查否有異常(異常保存在node中).
/// Execution can be stopped because of exception. Check and rethrow if any. for (auto & node : graph->nodes) if (node->exception) std::rethrow_exception(node->exception);
一些總結
- Processor的執行方式上有兩種,一種是實現了work()方法的Processor,這種Node需要加入到queue中執行。另一種,沒有work方法,在prepare此Node時,就等價於對Node進行了處理。
比如 LimitTransform 是沒有實現work()方法的,那么它的數據從(inport->outport)流動是在prepare時 就完成了。 但是IsimpleTransform 實現了work()方法,說明處理inport的數據然后輸出到outport 可能比較復雜,需要 發送到queue中,然后通過專門調用addJob(), job->work()單獨處理這個任務。
- 針對Processor的狀態更新,使用DFS算法,先direct_edge方向,back_edge方向更新🔗Node的對應狀態。
- 當Node更新某個端口數據時,會同時影響共有數據的State中的狀態。例如
當LimitTransform 設置為Finished時,會更新inport中的Data的狀態,會影響到MergeTreeSelector outport的狀態。 auto flags = state->setFlags(State::IS_FINISHED, State::IS_FINISHED);
- 在更新Node節點狀態時,有加鎖動作.
std::unique_lock lock(node.status_mutex);
- 執行Execution Graph 的架構
主線程 與 執行線程 解耦, 通過 LazyOutputFormat中的 Queue(非全局變量)進行通信.
Pipeline 執行 結束
執行Pipeline 總體模型
執行在TCPHandler 類發起.
發起者執行的邏輯,創建異步線程綁定到需要輸出的data上.
這個data就是PullingAsyncPipelineExecutor和異步執行線程需要通信的"全局變量"。

多threads Execute Graph調度執行 分析
- 如果有兩個Node(比如max_threads也是2,),那么在PipelineExecutor中如何協同線程工作的。
首先PipelineExecutor會創建max_threads的線程以及一個存放Tasks的數據結構. queue: vector<std::deque<Task *>> // 先入先出的隊列,存放需要執行的Node的結構 [ [NodePtr0, NodePtr1, NodePtr2, ...] ] [ [NodePtr0, NodePtr1, NodePtr2, ...] ] .... [ [NodePtr0, NodePtr1, NodePtr2, ...] ] 1.任意thread_i都可以看到這些需要被執行的task. 2.每個 thread_i 會綁定 到 一個ExecutorContext_i,並wait ExecutorContext_i中的wake_flag, 有task_queue被更新。 3.當某個thread_i 執行掃描可運行的Node時,會將對應的task (node) push到 task_queue[thread_j](j可以不等於i) 隊列,(同時將node從當前queue中出隊)中,並set wake_flag,讓這個線程thread_j去干活。 4.這保證了pipeline找那個處於Executing狀態的Node可以充分運行。 threads: 模擬線程 std::vector<std::unique_ptr<ExecutorContext>> executor_contexts; /*** /// Context for each thread. struct ExecutorContext { /// Will store context for all expand pipeline tasks (it's easy and we don't expect many). /// This can be solved by using atomic shard ptr. std::list<ExpandPipelineTask> task_list; std::queue<ExecutingGraph::Node *> async_tasks; std::atomic_bool has_async_tasks = false; std::condition_variable condvar; std::mutex mutex; bool wake_flag = false; /// Currently processing node. ExecutingGraph::Node * node = nullptr; /// Exception from executing thread itself. std::exception_ptr exception; #ifndef NDEBUG /// Time for different processing stages. UInt64 total_time_ns = 0; UInt64 execution_time_ns = 0; UInt64 processing_time_ns = 0; UInt64 wait_time_ns = 0; #endif }; ***/ ?怎么初始化,怎么使用? // 初始化這些線程 (使用globalThread),並且每個thread名字都是 QueryPipelineEx. // 初始化threads的函數是一個lambda表達式,其中capture了this指針. for (size_t i = 0; i < num_threads; ++i) { threads.emplace_back([this, thread_group, thread_num = i, num_threads] { /// ThreadStatus thread_status; setThreadName("QueryPipelineEx"); if (thread_group) CurrentThread::attachTo(thread_group); SCOPE_EXIT_SAFE( if (thread_group) CurrentThread::detachQueryIfNotDetached(); ); try { executeSingleThread(thread_num, num_threads); } catch (...) { /// In case of exception from executor itself, stop other threads. finish(); executor_contexts[thread_num]->exception = std::current_exception(); } }); } 線程的上下文 主要是功能是在保證在多個Node可以在多個線程執行時,保持一個Node在主線程執行之外, 其他的Node可以被其他thread執行。 這種模型還是很好的。
個人疑問? max_threads為什么會在兩個MergeTreeSelector時為1.
max_threads 會在某處被設置.

settings.max_block_size // default = 65505 bytes. // 如果 SQL 中的LIMIT 和 OFFSET 常量 的和 小於 max_block_size,那么 會影響ExecuteGraph的並行度。 if (!query.distinct && !query.limit_with_ties && !query.prewhere() && !query.where() && !query.groupBy() && !query.having() && !query.orderBy() && !query.limitBy() && query.limitLength() && !query_analyzer->hasAggregation() && !query_analyzer->hasWindow() && limit_length <= std::numeric_limits<UInt64>::max() - limit_offset && limit_length + limit_offset < max_block_size) { max_block_size = std::max(UInt64(1), limit_length + limit_offset); max_threads_execute_query = max_streams = 1; }

由於測試select 語句中offset數量太小,導致雖然有兩個ReadFromMergeTreeSelector,仍然是1個線程在執行。
個人疑問? 執行因超時Cancelling 如何中斷PipelineExecutor執行, cancelled 是做這個使用的么? Cancelled。
當執行 processOrdinaryQueryWithProcessors()時,會piplineExecutor 會調用pull()方法。
每隔100ms 會判斷是否用戶發送過來 cancel query的命令。如果有
當執行 processOrdinaryQueryWithProcessors()時,會piplineExecutor 會調用pull()方法。 每隔100ms 會判斷是否用戶發送過來 cancel query的命令。如果有 while (executor.pull(block, interactive_delay / 1000)) { std::lock_guard lock(task_callback_mutex); if (isQueryCancelled()) { /// A packet was received requesting to stop execution of the request. executor.cancel(); break; } 最終調用PipelineExecutor::cancel() 設置Pipeline::atomic_bool 類型為true. 實驗時沒有看到對應的pipeline被打斷執行,(sql 被cancel還是會被執行到底)。
最終調用PipelineExecutor::cancel() 設置Pipeline::atomic_bool 類型為true.
實驗時沒有看到對應的pipeline被打斷執行,(sql 被cancel還是會被執行到底)。