ClickHouse源碼閱讀筆記(一)之主要流程


ClickHouse源碼閱讀筆記(一)之主要流程

入口main函數在dbms/programs/main.cpp

int main(int argc_, char ** argv_)
{
...

/// Print a basic help if nothing was matched
MainFunc main_func = printHelp;//這里根據啟動時傳入的參數來確定后面執行哪個func,對於server來說,對應的函數為mainEntryClickHouseServer

for (auto & application : clickhouse_applications)
{
if (isClickhouseApp(application.first, argv))
{
main_func = application.second;
break;
}
}

return main_func(static_cast<int>(argv.size()), argv.data());//對於server,這里調用mainEntryClickHouseServer后,轉到dbms/programs/server/server.cpp
}

在dbms/programs/server/server.cpp中,提供三類接口,按照源碼的描述,說明如下:

/** Server provides three interfaces:
* 1. HTTP - simple interface for any applications.

適用於任何應用程序的HTTP接口。
* 2. TCP - interface for native clickhouse-client and for server to server internal communications.

用於本地client和server之間通信的TCP接口。
* More rich and efficient, but less compatible

豐富高效,但兼容性不好
* - data is transferred by columns;

數據按列傳輸
* - data is transferred compressed;

數據壓縮后傳輸
* Allows to get more information in response.

允許在響應消息中獲取更多信息
* 3. Interserver HTTP - for replication.

用於復制的內部HTTP。
*/

dbms/programs/server/server.cpp中的main函數會解析參數配置,初始化server,啟動服務監聽端口。

int mainEntryClickHouseServer(int argc, char ** argv)
{
DB::Server app;
try
{
return app.run(argc, argv);//這里調用run。
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(true) << "\n";
auto code = DB::getCurrentExceptionCode();
return code ? code : 1;
}
}

clickhouse使用poco這個網絡庫來處理網絡請求,每個client連接的處理邏輯在dbms/programs/server//TCPHandler.cpp的run()方法中。

void TCPHandler::run()
{
try
{
runImpl();//這里調用 runImpl函數。

LOG_INFO(log, "Done processing connection.");
}
catch (Poco::Exception & e)
{
/// Timeout - not an error.
if (!strcmp(e.what(), "Timeout"))
{
LOG_DEBUG(log, "Poco::Exception. Code: " << ErrorCodes::POCO_EXCEPTION << ", e.code() = " << e.code()
<< ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what());
}
else
throw;
}
}

 

在TCPHandler::runImpl()函數中,除去握手,初始化上下文,異常處理等代碼,主要邏輯如下:

void TCPHandler::runImpl()
{

receivePacket();//接收請求

executeQuery(state.query, *query_context, false, state.stage, may_have_embedded_data);//處理請求

/// Does the request require receive data from client?
if (state.need_receive_data_for_insert)
processInsertQuery(connection_settings);//負責將結果返回給客戶端
else if (state.need_receive_data_for_input)
{
/// It is special case for input(), all works for reading data from client will be done in callbacks.
/// state.io.in is NullAndDoCopyBlockInputStream so read it once.
state.io.in->read();
state.io.onFinish();
}
else if (state.io.pipeline.initialized())
processOrdinaryQueryWithProcessors(query_context->getSettingsRef().max_threads);//負責將結果返回給客戶端
else
processOrdinaryQuery();//負責將結果返回給客戶端

}

接下來,我們繼續看executeQuery處理請求的邏輯,在dbms/src/Interpreters/executeQuery.cpp中,主要邏輯如下:

BlockIO executeQuery(
const String & query,
Context & context,
bool internal,
QueryProcessingStage::Enum stage,
bool may_have_embedded_data,
bool allow_processors)
{

std::tie(ast, streams) = executeQueryImpl(query.data(), query.data() + query.size(), context,
internal, stage, !may_have_embedded_data, nullptr, allow_processors);//這里調用executeQueryImpl

}

接下來再看executeQueryImpl的主要處理邏輯:

static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
const char * begin,
const char * end,
Context & context,
bool internal,
QueryProcessingStage::Enum stage,
bool has_query_tail,
ReadBuffer * istr,
bool allow_processors)
{

ast = parseQuery(parser, begin, end, "", max_query_size, settings.max_parser_depth);//解析查詢語句

if (use_processors)//使用pipeline
pipeline = interpreter->executeWithProcessors();
else//不使用pipiline
res = interpreter->execute();//根據interpreter的類型來調用對應類型的execute函數執行

}

下一篇文章會介紹interpreter,未完待續。。。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM