ClickHouse源碼閱讀筆記(一)之主要流程
入口main函數在dbms/programs/main.cpp
int main(int argc_, char ** argv_)
{
...
/// Print a basic help if nothing was matched
MainFunc main_func = printHelp;//這里根據啟動時傳入的參數來確定后面執行哪個func,對於server來說,對應的函數為mainEntryClickHouseServer
for (auto & application : clickhouse_applications)
{
if (isClickhouseApp(application.first, argv))
{
main_func = application.second;
break;
}
}
return main_func(static_cast<int>(argv.size()), argv.data());//對於server,這里調用mainEntryClickHouseServer后,轉到dbms/programs/server/server.cpp
}
在dbms/programs/server/server.cpp中,提供三類接口,按照源碼的描述,說明如下:
/** Server provides three interfaces:
* 1. HTTP - simple interface for any applications.
適用於任何應用程序的HTTP接口。
* 2. TCP - interface for native clickhouse-client and for server to server internal communications.
用於本地client和server之間通信的TCP接口。
* More rich and efficient, but less compatible
豐富高效,但兼容性不好
* - data is transferred by columns;
數據按列傳輸
* - data is transferred compressed;
數據壓縮后傳輸
* Allows to get more information in response.
允許在響應消息中獲取更多信息
* 3. Interserver HTTP - for replication.
用於復制的內部HTTP。
*/
dbms/programs/server/server.cpp中的main函數會解析參數配置,初始化server,啟動服務監聽端口。
int mainEntryClickHouseServer(int argc, char ** argv)
{
DB::Server app;
try
{
return app.run(argc, argv);//這里調用run。
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(true) << "\n";
auto code = DB::getCurrentExceptionCode();
return code ? code : 1;
}
}
clickhouse使用poco這個網絡庫來處理網絡請求,每個client連接的處理邏輯在dbms/programs/server//TCPHandler.cpp的run()方法中。
void TCPHandler::run()
{
try
{
runImpl();//這里調用 runImpl函數。
LOG_INFO(log, "Done processing connection.");
}
catch (Poco::Exception & e)
{
/// Timeout - not an error.
if (!strcmp(e.what(), "Timeout"))
{
LOG_DEBUG(log, "Poco::Exception. Code: " << ErrorCodes::POCO_EXCEPTION << ", e.code() = " << e.code()
<< ", e.displayText() = " << e.displayText() << ", e.what() = " << e.what());
}
else
throw;
}
}
在TCPHandler::runImpl()函數中,除去握手,初始化上下文,異常處理等代碼,主要邏輯如下:
void TCPHandler::runImpl()
{
receivePacket();//接收請求
executeQuery(state.query, *query_context, false, state.stage, may_have_embedded_data);//處理請求
/// Does the request require receive data from client?
if (state.need_receive_data_for_insert)
processInsertQuery(connection_settings);//負責將結果返回給客戶端
else if (state.need_receive_data_for_input)
{
/// It is special case for input(), all works for reading data from client will be done in callbacks.
/// state.io.in is NullAndDoCopyBlockInputStream so read it once.
state.io.in->read();
state.io.onFinish();
}
else if (state.io.pipeline.initialized())
processOrdinaryQueryWithProcessors(query_context->getSettingsRef().max_threads);//負責將結果返回給客戶端
else
processOrdinaryQuery();//負責將結果返回給客戶端
}
接下來,我們繼續看executeQuery處理請求的邏輯,在dbms/src/Interpreters/executeQuery.cpp中,主要邏輯如下:
BlockIO executeQuery(
const String & query,
Context & context,
bool internal,
QueryProcessingStage::Enum stage,
bool may_have_embedded_data,
bool allow_processors)
{
std::tie(ast, streams) = executeQueryImpl(query.data(), query.data() + query.size(), context,
internal, stage, !may_have_embedded_data, nullptr, allow_processors);//這里調用executeQueryImpl
}
接下來再看executeQueryImpl的主要處理邏輯:
static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
const char * begin,
const char * end,
Context & context,
bool internal,
QueryProcessingStage::Enum stage,
bool has_query_tail,
ReadBuffer * istr,
bool allow_processors)
{
ast = parseQuery(parser, begin, end, "", max_query_size, settings.max_parser_depth);//解析查詢語句
if (use_processors)//使用pipeline
pipeline = interpreter->executeWithProcessors();
else//不使用pipiline
res = interpreter->execute();//根據interpreter的類型來調用對應類型的execute函數執行
}
下一篇文章會介紹interpreter,未完待續。。。