服務器開發中,為了充分利用多核甚至多個cpu,或者是簡化邏輯編寫的難度,會應用多進程(比如一個進程負責一種邏輯)多線程(將不同的用戶分配到不同的進程)或者協程(不同的用戶分配不同的協程,在需要時切換到其他協程),並且往往同時利用這些技術比如多進程多線程。
一個經典的服務器框架可以說如下的框架:
而這些服務器進程之間協同配合,為用戶提供服務,其中中心服務器提供集群的調度工作,而邏輯服可以是邏輯服務器1提供登錄服務、邏輯服務器2提供購買服務;也可以是2個服務器提供相同服務,然后視負載用戶數將不同用戶分配到不同服務器。
而這些服務器為了方便開發往往會采用到多線程的技術,一個經典的模式就是多個網絡線程處理網絡事件,然后一個線程處理用戶邏輯;更進一步還可以是多線程處理用戶邏輯,在多線程下比較麻煩的一點是不同線程訪問同一個資源時需要加鎖,不過眾所周知鎖上萬惡之源,帶來了難查的死鎖和低下的問題。不過這些都有特殊的技巧可以解決,比如死鎖,對同一線程重復獲取同一把鎖導致的死鎖,我們可以采用可重入的計數鎖,當一個線程獲取了這把鎖后,如果其他線程嘗試lock這個mutex時會進入系統的調度隊列,而本線程獲取這把鎖則會引用計數+1,本線程解鎖則-1,如果引用計數為0則解鎖,實現一個這樣的鎖,具體的算法的偽代碼大致如下:
// int getthreadid() 獲取線程ID // switchtothread() 切換線程 // wakeup(int id) 喚醒線程 // bool exchange(bool * m, bool v) 原子的修改一個類型為bool變量的值,並且返回原有的值 // bool load(bool * m) 原子的獲取一個bool變量的值 struct mutex{ mutex(){ islock = false; lockid = -1; ref = 0; } bool islock; volatile int lockid; int ref; std::que<int> waitth; }; void lock(mutex & mu){ int id = getthreadid(); if (id != mu.lockid){ if (exchange(&mu.islock, true)){ waitth.push(id); switchtothread(); } else { lockid = id; } } else { ref++; } } void unlock(mutex & mu){ int id = getthreadid(); if (id == mu.lockid) { if (--ref == 0){ wakeup(mu.que.pop()); } } }
這樣的一組偽代碼,其中關於原子操作和線程調度的實現在windows大致如下:
int getthreadid(){ return (int)GetCurrentThread(); } int switchtothread(){ return SwitchToThread(); } int wakeup(int id){ return ResumeThread((HANDLE)id); }
boost中就提供了可重入的mutex: boost::recursive_mutex,windows中的Mutex和Critical Section就是可重入的,而linux的pthread_mutex_t就是不可重入的。
而因為同時訪問2個資源導致的死鎖,一個可以采用對資源的訪問采用同一順序訪問,即可回避。
對於鎖帶來的性能問題,一個取巧的做法則每個用戶數據獨立,對公共資源的訪問則采用一些高效的算法比如rcu、比如無鎖算法
比如一個多讀多寫且讀請求大於寫請求的場合,我們就可以采用rcu算法。
rcu算法是在讀的時候直接讀數據,而寫入則獲取一份讀拷貝在本地修改,並注冊一個寫入的回調函數,在讀操作全部完成后調用回調函數完成寫入。寫操作之間則使用傳統的同步機制。
而無鎖算法,比較經典的則是一些無鎖隊列,比如ms-que,optimistic-que
ms-que的原理是利用原子操作,在寫入的時候,先創建一個新的節點,完成數據的寫入,然后在尾部利用cas操作,將尾節點的next指針指向新的節點,並將尾節點指向新節點。
在讀的時候則是利用cas操作,將頭節點指向頭節點的next節點。
但是在利用這些,雖然充分利用了cpu的性能,但是編程也帶來了極大的不便,比如多線程下不利於擴展以便充分利用集群提高性能的問題,多進程下則是復雜的異步調用。
一個理想的情況就是在跨進程通信的情況下,可以在發起一次遠程的請求后,原地等待遠端的相應,然后繼續執行,比如在多線程的情況下,可以一個用戶一個線程,然后在一次send之后,調用recv在請求返回后在繼續執行,
但是多線程的調度會帶來巨大的調度開銷,這種情況下,更輕量級的協程成為了一個更好的選擇,一個用戶一個協程,然后在發起一次遠程的請求后,切換到其他被喚醒的用戶協程繼續執行,在這個協程的請求返回后則喚醒這個協程繼續執行邏輯代碼。
協程帶來的問題是協程是有用戶控制調度的,所以用戶需要自己實現調度算法,以及對應的鎖算法(因為多協程雖然可以是單線程內執行,但是因為不同協程間的資源爭用所以鎖還是需要,而且是同步執行邏輯,如果使用spinlock則意味着整個系統阻塞,在異步網絡id下即便是網絡事件也不會被響應)。
協程的調度如下:
context::context* scheduling::onscheduling(){ context::context * _context = 0; { while (!time_wait_task_que.empty()){ uint64_t t = timer::clock(); time_wait_handle top = time_wait_task_que.top(); if(t > top.handle->waittime){ time_wait_task_que.pop(); uint32_t _state = top.handle->_state_queue.front(); top.handle->_state_queue.pop(); if (_state == time_running){ continue; } _context = top.handle->_context; goto do_task; } } } { while (!in_signal_context_list.empty()){ context::context * _context_ = in_signal_context_list.front(); in_signal_context_list.pop(); actuator * _actuator = context2actuator(_context_); if (_actuator == 0){ continue; } task * _task = _actuator->current_task(); if (_task == 0){ continue; } if (_task->_state == time_wait_task){ _task->_state = running_task; _task->_wait_context._state_queue.front() = time_running; } _context = _context_; goto do_task; } } { if (!_fn_scheduling.empty()){ _context = _fn_scheduling(); goto do_task; } } { if (!low_priority_context_list.empty()){ _context = in_signal_context_list.front(); in_signal_context_list.pop(); goto do_task; } } do_task: { if(_context == 0){ actuator * _actuator = _abstract_factory_actuator.create_product(); _context = _actuator->context(); _list_actuator.push_back(_actuator); } } return _context; }
鎖算法原理recursive_mutex一樣,當獲取鎖失敗則將當前協程調度到其他用戶協程,在解鎖時喚醒等待協程
void mutex::lock(){ if (_mutex){ _mutex = true; } else { _service_handle->scheduler(); } } void mutex::unlock(){ if (_mutex){ if (!wait_context_list.empty()){ auto weak_up_ct = wait_context_list.back(); wait_context_list.pop_back(); _service_handle->wake_up_context(weak_up_ct); } _mutex = false; } }