在mysqld_main函數中經過一系列的初始化后,mysql開始監聽客戶端的連接
mysqld_socket_acceptor->connection_event_loop();
查看mysqld_socket_acceptor:
Connection_acceptor<Mysqld_socket_listener> *mysqld_socket_acceptor= NULL;
這是一個類模版Connection_acceptor通過Mysqld_socket_listener類進行的實例化,下面是Connection_acceptor的定義
template <typename Listener> class Connection_acceptor
{
Listener *m_listener;
public:
Connection_acceptor(Listener *listener)
: m_listener(listener)
{ }
~Connection_acceptor()
{
delete m_listener;
}
/**
Initialize a connection acceptor.
@retval return true if initialization failed, else false.
*/
bool init_connection_acceptor()
{
return m_listener->setup_listener();
}
/**
Connection acceptor loop to accept connections from clients.
*/
void connection_event_loop()
{
Connection_handler_manager *mgr= Connection_handler_manager::get_instance();
while (!abort_loop)
{
Channel_info *channel_info= m_listener->listen_for_connection_event();
if (channel_info != NULL)
mgr->process_new_connection(channel_info);
}
}
/**
Close the listener.
*/
void close_listener()
{
m_listener->close_listener();
}
};
可以看出這個類是一個抽象接口封裝了對於監聽套接字的操作,connection_event_loop函數就是循環讀取監聽的套接字上的連接並且進行連接分配的函數。m_listener是一個指針,指向實際上獲取連接的類Mysqld_socket_listener的實例,它的成員函數listen_for_connection_event()根據不同測操作系統的情況封裝丟與套接字的監聽操作,例如在linux下就使用poll函數進行操作。
Connection_handler_manager是一個全局的單例模式,這個類用於管理獲取的新連接如何進行處理,到底是使用每個連接一個線程,還是使用其他模式。這里暫時列出該類的幾個與新建連接的關鍵成員和函數:
class Connection_handler_manager
{
static Connection_handler_manager* m_instance;
Connection_handler* m_connection_handler;
static mysql_mutex_t LOCK_connection_count;
static mysql_cond_t COND_connection_count;
public:
static uint connection_count; // Protected by LOCK_connection_count
static ulong max_used_connections; // Protected by LOCK_connection_count
static ulong max_used_connections_time;// Protected by LOCK_connection_count
//獲取單例模式指向實例的指針
static Connection_handler_manager* get_instance()
{
DBUG_ASSERT(m_instance != NULL);
return m_instance;
}
//處理新連接
void process_new_connection(Channel_info* channel_info);
bool check_and_incr_conn_count();
}
m_connection_handler是具體的連接處理者,其類型Connection_handler是一個虛基類,各種連接方式繼承這個類具體實現如何處理連接,process_new_connection函數為接收新連接后進行處理的函數:
Connection_handler_manager::process_new_connection(Channel_info* channel_info)
{
if (abort_loop || !check_and_incr_conn_count())
{
channel_info->send_error_and_close_channel(ER_CON_COUNT_ERROR, 0, true);
delete channel_info;
return;
}
if (m_connection_handler->add_connection(channel_info))
{
inc_aborted_connects();
delete channel_info;
}
}
這個函數先靠abort_loop判斷是否停止監控,這是一個定義在mysqld.cc中的volatile全局變量,可能被其他線程改變。check_and_incr_conn_count函數用來增加連接計數,當連接數大於最大連接數時,新增連接失敗。但是值得注意的是當連接數等於最大連接時依然允許再建立一個連接,這個連接是為root用戶管理使用的,在連接后會進行驗證。因此實際上的最大連接數是:最大連接數+1。如果循環未停止且連接數未滿,則調用m_connection_handler的add_connection添加連接,比較奇怪的是這里返回false表示正常?。這里就是一個典型的多態,具體的處理要看繼承Connection_handler的類。這里以每連接每線程為例子,其實現Per_thread_connection_handler類如下:
//sql/conn_handler/connection_handler_impl.h
class Per_thread_connection_handler : public Connection_handler
{
Per_thread_connection_handler(const Per_thread_connection_handler&);
Per_thread_connection_handler&
operator=(const Per_thread_connection_handler&);
/**
Check if idle threads to handle connection in
thread cache. If so enqueue the new connection
to be picked by the idle thread in thread cache.
@retval false if idle pthread was found, else true.
*/
bool check_idle_thread_and_enqueue_connection(Channel_info* channel_info);
/**
List of pending channel info objects to be picked by idle
threads. Protected by LOCK_thread_cache.
*/
static std::list<Channel_info*> *waiting_channel_info_list;
static mysql_mutex_t LOCK_thread_cache;
static mysql_cond_t COND_thread_cache;
static mysql_cond_t COND_flush_thread_cache;
public:
// Status variables related to Per_thread_connection_handler
static ulong blocked_pthread_count; // Protected by LOCK_thread_cache.
static ulong slow_launch_threads;
// System variable
static ulong max_blocked_pthreads;
static void init();
static void destroy();
/**
Wake blocked pthreads and wait until they have terminated.
*/
static void kill_blocked_pthreads();
/**
Block until a new connection arrives.
*/
static Channel_info* block_until_new_connection();
Per_thread_connection_handler() {}
virtual ~Per_thread_connection_handler() { }
protected:
virtual bool add_connection(Channel_info* channel_info);
virtual uint get_max_threads() const;
};
這個類是用於在one thread per connection的模式下管理連接與線程,在這里可以詳細探究下。首先,mysql的one thread per connection模式下,每個連接占用一個線程,mysql會緩存一部分線程以供重用,最大數量由該類中的max_blocked_pthreads控制(這個變量在mysqld.cc中的init_common_variables中按照用戶設置進行初始化)
線程緩存的實現實際上使用了類似阻塞隊列的方式,可以將這個看成一個近似生產者消費者的模型,waiting_channel_info_list中為待消費的元素,這里就是保存連接信息的channel_info的指針。當一個線程處理完自己的任務時,調用block_until_new_connection函數:
Channel_info* Per_thread_connection_handler::block_until_new_connection()
{
Channel_info *new_conn= NULL;
mysql_mutex_lock(&LOCK_thread_cache);
if (blocked_pthread_count < max_blocked_pthreads &&
!kill_blocked_pthreads_flag)
{
/* Don't kill the pthread, just block it for reuse */
DBUG_PRINT("info", ("Blocking pthread for reuse"));
/*
mysys_var is bound to the physical thread,
so make sure mysys_var->dbug is reset to a clean state
before picking another session in the thread cache.
*/
DBUG_POP();
DBUG_ASSERT( ! _db_is_pushed_());
// Block pthread
blocked_pthread_count++;
while (!abort_loop && !wake_pthread && !kill_blocked_pthreads_flag)
mysql_cond_wait(&COND_thread_cache, &LOCK_thread_cache);
blocked_pthread_count--;
if (kill_blocked_pthreads_flag)
mysql_cond_signal(&COND_flush_thread_cache);
else if (!abort_loop && wake_pthread)
{
wake_pthread--;
DBUG_ASSERT(!waiting_channel_info_list->empty());
new_conn= waiting_channel_info_list->front();
waiting_channel_info_list->pop_front();
DBUG_PRINT("info", ("waiting_channel_info_list->pop %p", new_conn));
}
}
mysql_mutex_unlock(&LOCK_thread_cache);
return new_conn;
}
從最外層的if語句可以看出,當被阻塞以緩存起來的線程數量未達到最大值,且kill_blocked_pthreads_flag標志未被設置時(當關閉時,該標志被設置,用來結束所有被阻塞的線程),線程將阻塞在條件變量COND_thread_cache上:
while (!abort_loop && !wake_pthread && !kill_blocked_pthreads_flag)
mysql_cond_wait(&COND_thread_cache, &LOCK_thread_cache);
當有新連接的到來時,會檢查是否有被緩存的線程,有的話就會使用COND_thread_cache喚醒阻塞在上面的線程,由代碼可以看出若喚醒的原因是由於新連接的到來的話,則被喚醒的線程會去隊列中取出一個待處理的連接並且返回這個連接,然后進行處理。
else if (!abort_loop && wake_pthread)
{
wake_pthread--;
DBUG_ASSERT(!waiting_channel_info_list->empty());
new_conn= waiting_channel_info_list->front();
waiting_channel_info_list->pop_front();
DBUG_PRINT("info", ("waiting_channel_info_list->pop %p", new_conn));
}
接下來看如何加入新連接,從名字就可以看出,加入新連接的函數為 add_connection,上面已經提到過這個函數是繼承自虛基類Connection_handler的一個虛函數,連接管理類Connection_handler_manager的process_new_connection函數調用 add_connection 來實際添加連接,下面來看看這個函數的主要代碼
bool Per_thread_connection_handler::add_connection(Channel_info* channel_info)
{
int error= 0;
my_thread_handle id;
//省略調試信息
···
if (!check_idle_thread_and_enqueue_connection(channel_info))
DBUG_RETURN(false);
/*
There are no idle threads avaliable to take up the new
connection. Create a new thread to handle the connection
*/
channel_info->set_prior_thr_create_utime();
error= mysql_thread_create(key_thread_one_connection, &id,
&connection_attrib,
handle_connection,
(void*) channel_info);
#ifndef DBUG_OFF
handle_error:
#endif // !DBUG_OFF
if (error)
{
//錯誤處理
···
}
Global_THD_manager::get_instance()->inc_thread_created();
DBUG_PRINT("info",("Thread created"));
DBUG_RETURN(false);
}
精簡代碼后邏輯比較清晰,程序先調用check_idle_thread_and_enqueue_connection查看是否有可用的被緩存起來的線程,如果有就直接將channelinfo插入隊列並喚醒阻塞的線程,如果沒有則新建一個連接處理的線程。check_idle_thread_and_enqueue_connection代碼如下:
bool Per_thread_connection_handler::check_idle_thread_and_enqueue_connection(Channel_info* channel_info)
{
bool res= true;
mysql_mutex_lock(&LOCK_thread_cache);
if (Per_thread_connection_handler::blocked_pthread_count > wake_pthread)
{
DBUG_PRINT("info",("waiting_channel_info_list->push %p", channel_info));
waiting_channel_info_list->push_back(channel_info);
wake_pthread++;
mysql_cond_signal(&COND_thread_cache);
res= false;
}
mysql_mutex_unlock(&LOCK_thread_cache);
return res;
}
這個函數較為簡單,當有空余線程的時候,就將待處理的連接加入隊列並且向條件變量發送信號並喚醒一個線程進行處理。這里wake_pthread的含義可以認為是還需要多少個線程去取連接,wake_pthread自增后會去喚醒一個阻塞的線程,待取得一個連接后wake_pthread再自減,所以只有當blocked_pthread_count > wake_pthread的時候才會有空閑的線程,否則需要新建線程:
error= mysql_thread_create(key_thread_one_connection, &id,
&connection_attrib,
handle_connection,
(void*) channel_info);
這里就是具體處理連接的入口,下面是精簡之后的handle_connection函數:
extern "C" void *handle_connection(void *arg)
{
Global_THD_manager *thd_manager= Global_THD_manager::get_instance();
Connection_handler_manager *handler_manager=
Connection_handler_manager::get_instance();
Channel_info* channel_info= static_cast<Channel_info*>(arg);
bool pthread_reused MY_ATTRIBUTE((unused))= false;
if (my_thread_init())
{
//錯誤處理 PS:mysql有些函數錯誤處理有點奇葩,中有的函數返回false表示正確,true表示錯誤
···
}
for (;;)
{
THD *thd= init_new_thd(channel_info);
if (thd == NULL)
{
//錯誤處理
break; // We are out of resources, no sense in continuing.
}
···
thd_manager->add_thd(thd);
if (thd_prepare_connection(thd))
handler_manager->inc_aborted_connects();
else
{//當連接存活時,循環處理命令
while (thd_connection_alive(thd))
{
if (do_command(thd))
break;
}
end_connection(thd);
}
//做關閉連接后的善后工作
close_connection(thd, 0, false, false);
thd->get_stmt_da()->reset_diagnostics_area();
thd->release_resources();
ERR_remove_state(0);
thd_manager->remove_thd(thd);
Connection_handler_manager::dec_connection_count();
delete thd;
if (abort_loop) // Server is shutting down so end the pthread.
break;
//阻塞在這里等待新連接(當被緩存的線程數未達到最大值的時候)
channel_info=Per_thread_connection_handler::block_until_new_connection();
if (channel_info == NULL)
break;
pthread_reused= true;
}
//退出線程
my_thread_end();
my_thread_exit(0);
return NULL;
}
這里首先獲取了線程管理的單例類Global_THD_manager,和連接的管理類Connection_handler_manager。Global_THD_manager將各個線程的THD結構體串聯在一個鏈表中統一管理,這里先不展開。在使用每連接每線程的模式時,一個THD結構對應一個線程,但並不總是這樣,例如使用線程池的話就並非如此。為了不混淆,之后所稱的線程其實指一個開始處理的連接,線程結構體為其對應的THD結構體。若要指代真正的線程,會使用真實線程或者物理線程的說法。
隨后進行了線程初始化,主要是初始化了一些線程局部變量,用於DEBUG。之后的for循環就是一個線程的主要邏輯所在
循環中首先創建了這個線程的結構體THD,這個結構體保存了一個線程的上下文信息,非常重要,經常作為一些函數的參數。例如一個THD對應一個客戶端連接時,THD結構體里就包含了連接的所有信息,權限,帳號,事務狀態等等。隨后這個THD被加入了Global_THD_manager管理的鏈表中。
在對連接進行驗證(例如權限驗證)后,進入循環:
while (thd_connection_alive(thd))
{
if (do_command(thd))
break;
}
end_connection(thd);
這里就是對一個連接的命令的處理,循環接收連接的指令並執行,直到連接被killed或者執行出現某些錯誤再退出。再退出循環后進行連接善后工作並如同上文所說的一樣調用Per_thread_connection_handler::block_until_new_connection()嘗試將該線程緩存起來供下一個連接使用。最后當線程退出時,進行一些清理工作。
do_command函數就是接收執行客戶端命令的函數,將網絡協議以及錯誤處理等精簡后,得到其主要邏輯的代碼如下:
//sql/sql_parse.cc
bool do_command(THD *thd)
{
NET *net= NULL;
enum enum_server_command command;
COM_DATA com_data;
rc= thd->get_protocol()->get_command(&com_data, &command);
return_value= dispatch_command(thd, &com_data, command);
}
可以看到,每一次循環從網絡讀取指令數據和類型,然后給dispatch_command進行具體的命令執行,當一個連接新創建或者沒有任何請求的時候線程就會阻塞在這里。到此,就進入了mysql的命令分發流程。dispatch_command函數體非常的長,這里只保留部分主要邏輯。
//sql/sql_parse.cc
bool dispatch_command(THD *thd, const COM_DATA *com_data,
enum enum_server_command command)
{
bool error= 0;
Global_THD_manager *thd_manager= Global_THD_manager::get_instance();
thd->set_command(command);
thd->enable_slow_log= TRUE;
thd->lex->sql_command= SQLCOM_END; /* to avoid confusing VIEW detectors */
thd->set_time();
thd_manager->inc_thread_running();
switch (command) {
case COM_INIT_DB:
···
case COM_REGISTER_SLAVE:
···
···
case COM_QUERY:
{
···
if (alloc_query(thd, com_data->com_query.query,
com_data->com_query.length))
break;
···
mysql_parse(thd, &parser_state);
···
}
}
這個函數的主要邏輯就是一個巨無霸switch case 語句,根據命令的類型來進行不同的處理。這里注意COM_QUERY,雖然他的名字有“QUERY”,看上去像處理查詢的,實際上所有對於數據庫的訪問操作都是通過這里進入的,如常見的DDL、DML語句......將代碼簡化后的邏輯為:
//sql/sql_pasrse.cc
void mysql_parse(THD *thd, Parser_state *parser_state)
{
//為查詢緩存作初始化
mysql_reset_thd_for_next_command(thd);
lex_start(thd);
if (query_cache.send_result_to_client(thd, thd->query()) <= 0)
{
···
err= parse_sql(thd, parser_state, NULL);
···
error= mysql_execute_command(thd, true);
···
}
···
}
這里只列出了主要的入口,其他的細節暫時不清楚....這個函數先判斷查詢的語句是否在查詢緩存里面(只有完全一樣的語句才能匹配,甚至包括空格和大小寫),若在則直接發送給客戶端,不然就對語句進行語法解析,然后真正的執行它。執行函數為mysql_execute_command
int
mysql_execute_command(THD *thd, bool first_level)
{
//保存解析后的語法樹
LEX *const lex= thd->lex;
//需要訪問哪些表
TABLE_LIST *all_tables;
all_tables= lex->query_tables;
···
switch (lex->sql_command)
{
case SQLCOM_SHOW_STATUS:
···
case SQLCOM_INSERT:
case SQLCOM_REPLACE_SELECT:
case SQLCOM_INSERT_SELECT:
{
DBUG_ASSERT(first_table == all_tables && first_table != 0);
DBUG_ASSERT(lex->m_sql_cmd != NULL);
res= lex->m_sql_cmd->execute(thd);
break;
}
···
case SQLCOM_SHOW_PROCESSLIST:
···
}
}
這里就是最終對於各個語句具體的操作的入口,這又是一個巨大的switch case語句,其中的每個case就是我們熟悉的指令,對於其中每個指令的執行,就不再深入了,到這里已經完成了對於連接處理的跟蹤。