服務器開發中的多進程,多線程及多協程


服務器開發中,為了充分利用多核甚至多個cpu,或者是簡化邏輯編寫的難度,會應用多進程(比如一個進程負責一種邏輯)多線程(將不同的用戶分配到不同的進程)或者協程(不同的用戶分配不同的協程,在需要時切換到其他協程),並且往往同時利用這些技術比如多進程多線程。

一個經典的服務器框架可以說如下的框架:

而這些服務器進程之間協同配合,為用戶提供服務,其中中心服務器提供集群的調度工作,而邏輯服可以是邏輯服務器1提供登錄服務、邏輯服務器2提供購買服務;也可以是2個服務器提供相同服務,然后視負載用戶數將不同用戶分配到不同服務器。

而這些服務器為了方便開發往往會采用到多線程的技術,一個經典的模式就是多個網絡線程處理網絡事件,然后一個線程處理用戶邏輯;更進一步還可以是多線程處理用戶邏輯,在多線程下比較麻煩的一點是不同線程訪問同一個資源時需要加鎖,不過眾所周知鎖上萬惡之源,帶來了難查的死鎖和低下的問題。不過這些都有特殊的技巧可以解決,比如死鎖,對同一線程重復獲取同一把鎖導致的死鎖,我們可以采用可重入的計數鎖,當一個線程獲取了這把鎖后,如果其他線程嘗試lock這個mutex時會進入系統的調度隊列,而本線程獲取這把鎖則會引用計數+1,本線程解鎖則-1,如果引用計數為0則解鎖,實現一個這樣的鎖,具體的算法的偽代碼大致如下:

// int getthreadid() 獲取線程ID
// switchtothread() 切換線程
// wakeup(int id) 喚醒線程
// bool exchange(bool * m, bool v) 原子的修改一個類型為bool變量的值,並且返回原有的值
// bool load(bool * m) 原子的獲取一個bool變量的值

struct mutex{
    mutex(){
        islock = false;
        lockid = -1;
        ref = 0;
    }

    bool islock;
    volatile int lockid;
    int ref;
    std::que<int> waitth;
};

void lock(mutex & mu){
     int id = getthreadid();
     if (id != mu.lockid){
         if (exchange(&mu.islock, true)){
             waitth.push(id);
             switchtothread();
         }
         else
         {
              lockid = id;
         }
    }
    else
    {
        ref++;
    }
}

void unlock(mutex & mu){
    int id = getthreadid();
    if (id == mu.lockid)
    {
        if (--ref == 0){
            wakeup(mu.que.pop());
        }
    }
}

這樣的一組偽代碼,其中關於原子操作和線程調度的實現在windows大致如下:

int getthreadid(){
    return (int)GetCurrentThread();
}

int switchtothread(){
    return SwitchToThread();
}

int wakeup(int id){
    return ResumeThread((HANDLE)id);
}

boost中就提供了可重入的mutex: boost::recursive_mutex,windows中的Mutex和Critical Section就是可重入的,而linux的pthread_mutex_t就是不可重入的。

而因為同時訪問2個資源導致的死鎖,一個可以采用對資源的訪問采用同一順序訪問,即可回避。

 

對於鎖帶來的性能問題,一個取巧的做法則每個用戶數據獨立,對公共資源的訪問則采用一些高效的算法比如rcu、比如無鎖算法

比如一個多讀多寫且讀請求大於寫請求的場合,我們就可以采用rcu算法。

rcu算法是在讀的時候直接讀數據,而寫入則獲取一份讀拷貝在本地修改,並注冊一個寫入的回調函數,在讀操作全部完成后調用回調函數完成寫入。寫操作之間則使用傳統的同步機制。

而無鎖算法,比較經典的則是一些無鎖隊列,比如ms-que,optimistic-que

ms-que的原理是利用原子操作,在寫入的時候,先創建一個新的節點,完成數據的寫入,然后在尾部利用cas操作,將尾節點的next指針指向新的節點,並將尾節點指向新節點。

在讀的時候則是利用cas操作,將頭節點指向頭節點的next節點。

 

但是在利用這些,雖然充分利用了cpu的性能,但是編程也帶來了極大的不便,比如多線程下不利於擴展以便充分利用集群提高性能的問題,多進程下則是復雜的異步調用。

一個理想的情況就是在跨進程通信的情況下,可以在發起一次遠程的請求后,原地等待遠端的相應,然后繼續執行,比如在多線程的情況下,可以一個用戶一個線程,然后在一次send之后,調用recv在請求返回后在繼續執行,

但是多線程的調度會帶來巨大的調度開銷,這種情況下,更輕量級的協程成為了一個更好的選擇,一個用戶一個協程,然后在發起一次遠程的請求后,切換到其他被喚醒的用戶協程繼續執行,在這個協程的請求返回后則喚醒這個協程繼續執行邏輯代碼。

協程帶來的問題是協程是有用戶控制調度的,所以用戶需要自己實現調度算法,以及對應的鎖算法(因為多協程雖然可以是單線程內執行,但是因為不同協程間的資源爭用所以鎖還是需要,而且是同步執行邏輯,如果使用spinlock則意味着整個系統阻塞,在異步網絡id下即便是網絡事件也不會被響應)。

協程的調度如下:

context::context* scheduling::onscheduling(){
	context::context * _context = 0;
	
	{
		while (!time_wait_task_que.empty()){
			uint64_t t = timer::clock();
			time_wait_handle top = time_wait_task_que.top();
			if(t > top.handle->waittime){
				time_wait_task_que.pop();

				uint32_t _state = top.handle->_state_queue.front();
				top.handle->_state_queue.pop();
				if (_state == time_running){
					continue;
				}

				_context = top.handle->_context;
				goto do_task;
			}
		}
	}
		
	{
		while (!in_signal_context_list.empty()){
			context::context * _context_ = in_signal_context_list.front();
			in_signal_context_list.pop();

			actuator * _actuator = context2actuator(_context_);
			if (_actuator == 0){
				continue;
			}

			task * _task = _actuator->current_task();
			if (_task == 0){
				continue;
			}

			if (_task->_state == time_wait_task){
				_task->_state = running_task;
				_task->_wait_context._state_queue.front() = time_running;
			}

			_context = _context_;
			goto do_task;
		}
	}

	{
		if (!_fn_scheduling.empty()){
			_context = _fn_scheduling();
			goto do_task;
		}
	}

	{
		if (!low_priority_context_list.empty()){
			_context = in_signal_context_list.front();
			in_signal_context_list.pop();
			goto do_task;
		}
	}

do_task:
	{
		if(_context == 0){
			actuator * _actuator = _abstract_factory_actuator.create_product();
			_context = _actuator->context();
			_list_actuator.push_back(_actuator);
		}
	}

	return _context;
}

鎖算法原理recursive_mutex一樣,當獲取鎖失敗則將當前協程調度到其他用戶協程,在解鎖時喚醒等待協程

void mutex::lock(){
	if (_mutex){
		_mutex = true;
	} else {
		_service_handle->scheduler();
	}
}

void mutex::unlock(){
	if (_mutex){
		if (!wait_context_list.empty()){
			auto weak_up_ct = wait_context_list.back();
			wait_context_list.pop_back();
			_service_handle->wake_up_context(weak_up_ct);
		}
		_mutex = false;
	}
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM