服務器開發中,為了充分利用多核甚至多個cpu,或者是簡化邏輯編寫的難度,會應用多進程(比如一個進程負責一種邏輯)多線程(將不同的用戶分配到不同的進程)或者協程(不同的用戶分配不同的協程,在需要時切換到其他協程),並且往往同時利用這些技術比如多進程多線程。
一個經典的服務器框架可以說如下的框架:

而這些服務器進程之間協同配合,為用戶提供服務,其中中心服務器提供集群的調度工作,而邏輯服可以是邏輯服務器1提供登錄服務、邏輯服務器2提供購買服務;也可以是2個服務器提供相同服務,然后視負載用戶數將不同用戶分配到不同服務器。
而這些服務器為了方便開發往往會采用到多線程的技術,一個經典的模式就是多個網絡線程處理網絡事件,然后一個線程處理用戶邏輯;更進一步還可以是多線程處理用戶邏輯,在多線程下比較麻煩的一點是不同線程訪問同一個資源時需要加鎖,不過眾所周知鎖上萬惡之源,帶來了難查的死鎖和低下的問題。不過這些都有特殊的技巧可以解決,比如死鎖,對同一線程重復獲取同一把鎖導致的死鎖,我們可以采用可重入的計數鎖,當一個線程獲取了這把鎖后,如果其他線程嘗試lock這個mutex時會進入系統的調度隊列,而本線程獲取這把鎖則會引用計數+1,本線程解鎖則-1,如果引用計數為0則解鎖,實現一個這樣的鎖,具體的算法的偽代碼大致如下:
// int getthreadid() 獲取線程ID // switchtothread() 切換線程 // wakeup(int id) 喚醒線程 // bool exchange(bool * m, bool v) 原子的修改一個類型為bool變量的值,並且返回原有的值 // bool load(bool * m) 原子的獲取一個bool變量的值 struct mutex{ mutex(){ islock = false; lockid = -1; ref = 0; } bool islock; volatile int lockid; int ref; std::que<int> waitth; }; void lock(mutex & mu){ int id = getthreadid(); if (id != mu.lockid){ if (exchange(&mu.islock, true)){ waitth.push(id); switchtothread(); } else { lockid = id; } } else { ref++; } } void unlock(mutex & mu){ int id = getthreadid(); if (id == mu.lockid) { if (--ref == 0){ wakeup(mu.que.pop()); } } }
這樣的一組偽代碼,其中關於原子操作和線程調度的實現在windows大致如下:
int getthreadid(){ return (int)GetCurrentThread(); } int switchtothread(){ return SwitchToThread(); } int wakeup(int id){ return ResumeThread((HANDLE)id); }
boost中就提供了可重入的mutex: boost::recursive_mutex,windows中的Mutex和Critical Section就是可重入的,而linux的pthread_mutex_t就是不可重入的。
而因為同時訪問2個資源導致的死鎖,一個可以采用對資源的訪問采用同一順序訪問,即可回避。
對於鎖帶來的性能問題,一個取巧的做法則每個用戶數據獨立,對公共資源的訪問則采用一些高效的算法比如rcu、比如無鎖算法
比如一個多讀多寫且讀請求大於寫請求的場合,我們就可以采用rcu算法。
rcu算法是在讀的時候直接讀數據,而寫入則獲取一份讀拷貝在本地修改,並注冊一個寫入的回調函數,在讀操作全部完成后調用回調函數完成寫入。寫操作之間則使用傳統的同步機制。
而無鎖算法,比較經典的則是一些無鎖隊列,比如ms-que,optimistic-que
ms-que的原理是利用原子操作,在寫入的時候,先創建一個新的節點,完成數據的寫入,然后在尾部利用cas操作,將尾節點的next指針指向新的節點,並將尾節點指向新節點。
在讀的時候則是利用cas操作,將頭節點指向頭節點的next節點。
但是在利用這些,雖然充分利用了cpu的性能,但是編程也帶來了極大的不便,比如多線程下不利於擴展以便充分利用集群提高性能的問題,多進程下則是復雜的異步調用。
一個理想的情況就是在跨進程通信的情況下,可以在發起一次遠程的請求后,原地等待遠端的相應,然后繼續執行,比如在多線程的情況下,可以一個用戶一個線程,然后在一次send之后,調用recv在請求返回后在繼續執行,
但是多線程的調度會帶來巨大的調度開銷,這種情況下,更輕量級的協程成為了一個更好的選擇,一個用戶一個協程,然后在發起一次遠程的請求后,切換到其他被喚醒的用戶協程繼續執行,在這個協程的請求返回后則喚醒這個協程繼續執行邏輯代碼。
協程帶來的問題是協程是有用戶控制調度的,所以用戶需要自己實現調度算法,以及對應的鎖算法(因為多協程雖然可以是單線程內執行,但是因為不同協程間的資源爭用所以鎖還是需要,而且是同步執行邏輯,如果使用spinlock則意味着整個系統阻塞,在異步網絡id下即便是網絡事件也不會被響應)。
協程的調度如下:
context::context* scheduling::onscheduling(){
context::context * _context = 0;
{
while (!time_wait_task_que.empty()){
uint64_t t = timer::clock();
time_wait_handle top = time_wait_task_que.top();
if(t > top.handle->waittime){
time_wait_task_que.pop();
uint32_t _state = top.handle->_state_queue.front();
top.handle->_state_queue.pop();
if (_state == time_running){
continue;
}
_context = top.handle->_context;
goto do_task;
}
}
}
{
while (!in_signal_context_list.empty()){
context::context * _context_ = in_signal_context_list.front();
in_signal_context_list.pop();
actuator * _actuator = context2actuator(_context_);
if (_actuator == 0){
continue;
}
task * _task = _actuator->current_task();
if (_task == 0){
continue;
}
if (_task->_state == time_wait_task){
_task->_state = running_task;
_task->_wait_context._state_queue.front() = time_running;
}
_context = _context_;
goto do_task;
}
}
{
if (!_fn_scheduling.empty()){
_context = _fn_scheduling();
goto do_task;
}
}
{
if (!low_priority_context_list.empty()){
_context = in_signal_context_list.front();
in_signal_context_list.pop();
goto do_task;
}
}
do_task:
{
if(_context == 0){
actuator * _actuator = _abstract_factory_actuator.create_product();
_context = _actuator->context();
_list_actuator.push_back(_actuator);
}
}
return _context;
}
鎖算法原理recursive_mutex一樣,當獲取鎖失敗則將當前協程調度到其他用戶協程,在解鎖時喚醒等待協程
void mutex::lock(){
if (_mutex){
_mutex = true;
} else {
_service_handle->scheduler();
}
}
void mutex::unlock(){
if (_mutex){
if (!wait_context_list.empty()){
auto weak_up_ct = wait_context_list.back();
wait_context_list.pop_back();
_service_handle->wake_up_context(weak_up_ct);
}
_mutex = false;
}
}
