客戶-服務器程序設計方法
《unix網絡編程》第一卷中將傳統的客戶服務器程序設計方法講得透徹,這篇文章將其中編碼的細節略去,通過偽代碼的形式展現,主要介紹各種方法的思想;(后面再續上一篇現代服務器的主要設計方法,基本是圍繞這Reactor做文章)
示例是一個經典的TCP回射程序:
客戶端發起連接請求,連接后發送一串數據;收到服務端的數據后輸出到終端;
服務端收到客戶端的數據后原樣回寫給客戶端;
客戶端偽代碼:
sockfd = socket(AF_INET,SOCK_STREAM,0);
//與服務端建立連接
connect(sockfd);
//連接建立后從終端讀入數據並發送到服務端;
//從服務端收到數據后回寫到終端
while(fgets(sendline,MAXLINE,fileHandler)!= NULL){
writen(sockfd,sendline,strlen(sendline));
if(readline(sockfd,recvline,MAXLINE) == 0){
cout << "recive over!";
}
fputs(recvline,stdout);
}
下面介紹服務端程序處理多個客戶請求的開發范式;
多進程處理
對於多個客戶請求,服務器端采用fork的方式創建新進程來處理;
處理流程:
- 主進程綁定ip端口后,使用accept()等待新客戶的請求;
- 每一個新的用戶請求到來,都創建一個新的子進程來處理具體的客戶請求;
- 子進程處理完用戶請求,結束本進程;
服務端偽代碼:
listenFd = socket(AF_INET,SOCK_STREAM,0);
bind(listenFd,addR);
listen(listenFD);
while(true){
//服務器端在這里阻塞等待新客戶連接
connfd = accept(listenfd);
if( fork() ==0){//子進程
close(listenfd);
while(n=read(connfd,buf,MAXLINE)>0){
writen(connfd,buf);
}
}
close(connfd);
}
這種方法開發簡單,但對操作系統而言,進程是一種昂貴的資源,對於每個新客戶請求都使用一個進程處理,開銷較大;
對於客戶請求數不多的應用適用這種方法;
預先分配進程池,accept無上鎖保護
上一種方法中,每來一個客戶都創建一個進程處理請求,完畢后再釋放;
不間斷的創建和結束進程浪費系統資源;
使用進程池預先分配進程,通過進程復用,減少進程重復創建帶來的系統消耗和時間等待;
優點:消除新客戶請求到達來創建進程的開銷;
缺點:需要預先估算客戶請求的多少(確定進程池的大小)
源自Berkeley內核的系統,有以下特性:
派生的所有子進程各自調用accep()監聽同一個套接字,在沒有用戶請求時都進入睡眠;
當有新客戶請求到來時,所有的客戶都被喚醒;內核從中選擇一個進程處理請求,剩余的進程再次轉入睡眠(回到進程池);
利用這個特性可以由操作系統來控制進程的分配;
內核調度算法會把各個連接請求均勻的分散到各個進程中;
處理流程:
- 主進程預先分配進程池,所有子進程阻塞在accept()調用上;
- 新用戶請求到來,操作系統喚醒所有的阻塞在accpet上的進程,從其中選擇一個建立連接;
- 被選中的子進程處理用戶請求,其它子進程回到睡眠;
- 子進程處理完畢,再次阻塞在accept上;
服務端偽代碼:
listenFd = socket(AF_INET,SOCK_STREAM,0);
bind(listenFd,addR);
listen(listenFD);
for(int i = 0;i< children;i++){
if(fork() == 0){//子進程
while(true){
//所有子進程監聽同一個套接字,等待用戶請求
int connfd = accept(listenfd);
close(listenfd);
//連接建立后處理用戶請求,完畢后關閉連接
while(n=read(connfd,buf,MAXLINE)>0){
writen(connfd,buf);
}
close(connfd);
}
}
}
如何從進程池中取出進程?
所有的進程都通過accept()阻塞等待,等連接請求到來后,由內核從所有等待的進程中選擇一個進程處理;
處理完的進程,如何放回到池子中?
子進程處理完客戶請求后,通過無限循環,再次阻塞在accpet()上等待新的連接請求;
注意: 多個進程accept()阻塞會產生“驚群問題”:盡管只有一個進程將獲得連接,但是所有的進程都被喚醒;這種每次有一個連接准備好卻喚醒太多進程的做法會導致性能受損;
預先分配進程池,accept上鎖(文件鎖、線程鎖)
上述不上鎖的實現存在移植性的問題(只能在源自Berkeley的內核系統上)和驚群問題,
更為通用的做法是對accept上鎖;即避免讓多個進程阻塞在accpet調用上,而是都阻塞在獲取鎖的函數中;
服務端偽代碼:
listenFd = socket(AF_INET,SOCK_STREAM,0);
bind(listenFd,addR);
listen(listenFD);
for(int i = 0;i< children;i++){
if(fork() == 0){
while(true){
my_lock_wait();//獲取鎖
int connfd = accept(listenfd);
my_lock_release();//釋放鎖
close(listenfd);
while(n=read(connfd,buf,MAXLINE)>0){
writen(connfd,buf);
}
close(connfd);
}
}
}
上鎖可以使用文件上鎖,線程上鎖;
- 文件上鎖的方式可移植到所有的操作系統,但其涉及到文件系統操作,可能比較耗時;
- 線程上鎖的方式不僅適用不同線程之間的上鎖,也適用於不同進程間的上鎖;
關於上鎖的編碼細節詳見《網絡編程》第30章;
預先分配進程池,傳遞描述符;
與上面的每個進程各自accept接收監聽請求不同,這個方法是在父進程中統一接收accpet()用戶請求,在連接建立后,將連接描述符傳遞給子進程;
處理流程:
- 主進程阻塞在accpet上等待用戶請求,所有子進程不斷輪詢探查是否有可用的描述符;
- 有新用戶請求到來,主進程accpet建立連接后,從進程池中取出一個進程,通過字節流管道將連接描述符傳遞給子進程;
- 子進程收到連接描述符,處理用戶請求,處理完成后向父進程發送一個字節的內容(無實際意義),告知父進程我任務已完成;
- 父進程收到子進程的單字節數據,將子進程放回到進程池;
服務端偽代碼:
listenFd = socket(AF_INET,SOCK_STREAM,0);
bind(listenFd,addR);
listen(listenFD);
//預先建立子進程池
for(int i = 0;i< children;i++){
//使用Unix域套接字創建一個字節流管道,用來傳遞描述符
socketpair(AF_LOCAL,SOCK_STREAM,0,sockfd);
if(fork() == 0){//預先創建子進程
//子進程字節流到父進程
dup2(sockfd[1],STDERR_FILENO);
close(listenfd);
while(true){
//收到連接描述符
if(read_fd(STDERR_FILENO,&connfd) ==0){;
continue;
}
while(n=read(connfd,buf,MAXLINE)>0){ //處理用戶請求
writen(connfd,buf);
}
close(connfd);
//通知父進程處理完畢,本進程可以回到進程池
write(STDERR_FILENO,"",1);
}
}
}
while(true){
//監聽listen套接字描述符和所有子進程的描述符
select(maxfd+1,&rset,NULL,NULL,NULL);
if(FD_ISSET(listenfd,&rset){//有客戶連接請求
connfd = accept(listenfd);//接收客戶連接
//從進程池中找到一個空閑的子進程
for(int i = 0 ;i < children;i++){
if(child_status[i] == 0)
break;
}
child_status[i] = 1;//子進程從進程池中分配出去
write_fd(childfd[i],connfd);//將描述符傳遞到子進程中
close(connfd);
}
//檢查子進程的描述符,有數據,表明已經子進程請求已處理完成,回收到進程池
for(int i = 0 ;i < children;i++){
if(FD_ISSET(childfd[i],&rset)){
if(read(childfd[i])>0){
child_status[i] = 0;
}
}
}
}
多線程處理
為每個用戶創建一個線程,這種方法比為每個用戶創建一個進程要快出許多倍;
處理流程:
- 主線程阻塞在accpet上等待用請求;
- 有新用戶請求時,主線程建立連接,然后創建一個新的線程,將連接描述符傳遞過去;
- 子線程處理用戶請求,完畢后線程結束;
服務端偽代碼:
listenFd = socket(AF_INET,SOCK_STREAM,0);
bind(listenFd,addR);
listen(listenFD);
while(true){
connfd = accept(listenfd);
//連接建立后,創建新線程處理具體的用戶請求
pthread_create(&tid,NULL,&do_function,(void*)connfd);
close(connfd);
}
--------------------
//具體的用戶請求處理函數(子線程主體)
void * do_function(void * connfd){
pthread_detach(pthread_self());
while(n=read(connfd,buf,MAXLINE)>0){
writen(connfd,buf);
close((int)connfd);
}
預先創建線程池,每個線程各自accept
處理流程:
- 主線程預先創建線程池,第一個創建的子線程獲取到鎖,阻塞在accept()上,其它子線程阻塞在線程鎖上;
- 用戶請求到來,第一個子線程建立連接后釋放鎖,然后處理用戶請求;完成后進入線程池,等待獲取鎖;
- 第一個子線程釋放鎖之后,線程池中等待的線程有一個會獲取到鎖,阻塞在accept()等待用戶請求;
listenFd = socket(AF_INET,SOCK_STREAM,0);
bind(listenFd,addR);
listen(listenFD);
//預先創建線程池,將監聽描述符傳給每個新創建的線程
for(int i = 0 ;i <threadnum;i++){
pthread_create(&tid[i],NULL,&thread_function,(void*)connfd);
}
--------------------
//具體的用戶請求處理
//通過鎖保證任何時刻只有一個線程阻塞在accept上等待新用戶的到來;其它的線程都
//在等鎖;
void * thread_function(void * connfd){
while(true){
pthread_mutex_lock(&mlock); // 線程上鎖
connfd = accept(listenfd);
pthread_mutex_unlock(&mlock);//線程解鎖
while(n=read(connfd,buf,MAXLINE)>0){
writen(connfd,buf);
close(connfd);
}
}
使用源自Berkeley的內核的Unix系統時,我們不必為調用accept而上鎖,
去掉上鎖的兩個步驟后,我們發現沒有上鎖的用戶時間減少(因為上鎖是在用戶空間中執行的線程函數完成的),而系統時間卻增加很多(每一個accept到達,所有的線程都變喚醒,引發內核的驚群問題,這個是在線程內核空間中完成的);
而我們的線程都需要互斥,讓內核執行派遣還不讓自己通過上鎖來得快;
這里沒有必要使用文件上鎖,因為單個進程中的多個線程,總是可以通過線程互斥鎖來達到同樣目的;(文件鎖更慢)
預先創建線程池,主線程accept后傳遞描述符
處理流程:
- 主線程預先創建線程池,線程池中所有的線程都通過調用pthread_cond_wait()而處於睡眠狀態(由於有鎖的保證,是依次進入睡眠,而不會發生同時調用pthread_cond_wait引發競爭)
- 主線程阻塞在acppet調用上等待用戶請求;
- 用戶請求到來,主線程accpet建立建立,將連接句柄放入約定位置后,發送pthread_cond_signal激活一個等待該條件的線程;
- 線程激活后從約定位置取出連接句柄處理用戶請求;完畢后再次進入睡眠(回到線程池);
激活條件等待的方式有兩種:pthread_cond_signal()激活一個等待該條件的線程,存在多個等待線程時按入隊順序激活其中一個;而pthread_cond_broadcast()則激活所有等待線程。
注:一般應用中條件變量需要和互斥鎖一同使用;
在調用pthread_cond_wait()前必須由本線程加鎖(pthread_mutex_lock()),而在更新條件等待隊列以前,mutex保持鎖定狀態,並在線程掛起進入等待前解鎖。在條件滿足從而離開pthread_cond_wait()之前,mutex將被重新加鎖,以與進入pthread_cond_wait()前的加鎖動作對應。
服務端偽代碼:
listenFd = socket(AF_INET,SOCK_STREAM,0);
bind(listenFd,addR);
listen(listenFD);
for(int i = 0 ;i <threadnum;i++){
pthread_create(&tid[i],NULL,&thread_function,(void*)connfd);
}
while(true){
connfd = accept(listenfd);
pthread_mutex_lock(&mlock); // 線程上鎖
childfd[iput] = connfd;//將描述符的句柄放到數組中傳給獲取到鎖的線程;
if(++iput == MAX_THREAD_NUM)
iput= 0;
if(iput == iget)
err_quit("thread num not enuough!");
pthread_cond_signal(&clifd_cond);//發信號,喚醒一個睡眠線程(輪詢喚醒其中的一個)
pthread_mutex_unlock(&mlock);//線程解鎖
}
--------------------
void * thread_function(void * connfd){
while(true){
pthread_mutex_lock(&mlock); // 線程上鎖
//當無沒有收到連接句柄時,睡眠在條件變量上,並釋放mlock鎖
//滿足條件被喚醒后,重新加mlock鎖
while(iget == iput)
pthread_cond_wait(&clifd_cond,&mlock);
connfd = childfd[iget];
if(++iget == MAX_THREAD_NUM)
iget = 0;
pthread_mutex_unlock(&mlock);//線程解鎖
//處理用戶請求
while(n=read(connfd,buf,MAXLINE)>0){
writen(connfd,buf);
close(connfd);
}
}
測試表明這個版本的服務器要慢於每個線程各自accpet的版本,原因在於這個版本同時需要互斥鎖和條件變量,而上一個版本只需要互斥鎖;
線程描述符的傳遞和進程描述符的傳遞的區別?
在一個進程中打開的描述符對該進程中的所有線程都是可見的,引用計數也就是1;
所有線程訪問這個描述符都只需要通過一個描述符的值(整型)訪問;
而進程間的描述符傳遞,傳遞的是描述符的引用;(好比一個文件被2個進程打開,相應的這個文件的描述符引用計數增加2);
總結
- 當系統負載較輕時,每個用戶請求現場派生一個子進程為之服務的傳統並發服務器模型就足夠了;
- 相比傳統的每個客戶fork一次的方式,預先創建一個子進程池或線程池能夠把進程控制cpu時間降低10倍以上;當然,程序會相應復雜一些,需要監視子進程個數,隨着客戶用戶數的動態變化而增加或減少進程池;
- 讓所有子進程或線程自行調用accept通常比讓父進程或主線程獨自調用accpet並發描述符傳遞給子進程或線程要簡單和快速;
- 使用線程通常要快於使用進程;
參考資料
《unix網絡編程》第一卷 套接字聯網API
Posted by: 大CC | 05APR,2015
博客:blog.me115.com [訂閱]
微博:新浪微博