第八章 高性能服務器程序框架
我們將服務器一般分為三個主要模塊,I/O處理單元、邏輯單元及存儲單元。常用的服務器模型有C/S模型和P2P模型,比較簡單。我們來看一下網絡編程中的I/O模型。首先我們要了解阻塞模型和非阻塞模型的區別,socket在創建時默認是阻塞的,可以在socket系統調用的第二個參數傳遞SOCK_NONBLOCK標志或者通過fcntl將其設置為非阻塞,針對阻塞I/O的系統調用可能因為無法立即完成而被系統掛起,直到等待的事件發生為止,而非阻塞I/O的系統調用則會立即返回,如果事件沒有立即發生,和出錯一樣會返回-1,此時我們要通過errno來區分,通常來講,accept、send和recv事件未發生errno被設置成EAGAIN或EWOULDBLOCK,對connect而言,errno則為EINPROGRESS。
而非阻塞I/O通常與其他I/O通知機制一起使用,如I/O復用和SIGIO信號。I/O是最常用的通知機制,應用程序通過I/O復用函數向內核注冊一組事件,內核通過I/O復用函數將就緒的事件通知應用程序,常用的有select、poll和epoll,I/O復用函數本身也是阻塞的,其能提高效率的原因在於能同時監聽多個I/O事件。我們來比較一下不同的I/O模型
I/O模型 |
讀寫操作和阻塞階段 |
阻塞I/O |
程序阻塞於讀寫函數 |
I/O復用 |
程序阻塞於I/O復用系統調用,但可以監聽多個I/O事件,讀寫本身非阻塞 |
SIGIO信號 |
信號觸發讀寫就緒事件,用戶程序執行讀寫操作,程序沒有阻塞階段。 |
異步I/O |
內核執行讀寫操作並觸發讀寫完成事件,程序沒有阻塞階段。 |
服務器程序有兩種高效的事件處理模式:通常使用同步I/O的Reactor和通常使用異步I/O的Proactor,但是我們也有用同步I/O實現Proactor的方法。
Reactor要求主線程只負責監聽是否有事件發生,如果有就立即將該事件通知工作線程,除此之外不進行任何實質性工作,讀寫數據,接受新連接,以及處理客戶請求都由工作線程完成。也就是說主線程只負責監聽和分發事件。以epoll為例,使用同步I/O模型實現Reactor的工作流程是:1)主線程向epoll內核事件表中注冊socket上的讀就緒事件。2)主線程調用epoll_wait等待socket上有數據可讀。3)當socket上有數據可讀時,epoll_wait通知主線程,主線程則將socket可讀事件放入請求隊列。4)睡眠在請求隊列上的某個工作線程被喚醒,從socket讀取數據,並處理客戶請求,然后向epoll內核事件表注冊該socket上的寫就緒事件。5)主線程調用epoll_wait等待socket可寫。6)當socket可寫時,epoll_wait通知主線程,主線程將可寫事件放入請求隊列。7)睡眠在請求隊列上的某個工作線程被喚醒,它往socket上寫入服務器處理客戶請求的結果。
而Proactor不同,它將所有I/O操作都交給主線程和內核來做,工作線程僅僅負責業務邏輯,其使用aio_read等函數的工作流程如下:1)主線程調用aio_read向內核注冊socket上的讀完成事件,並告訴內核用戶讀緩沖區的位置,以及讀操作完成時如何通知應用程序。2)主線程繼續處理其他邏輯。3)當socket上的數據被讀入用戶緩沖區,內核向應用程序發送一個信號,以通知應用程序。4)應用程序預先定義好的信號處理函數選擇一個工作線程處理客戶請求,工作線程處理完客戶請求后調用aio_write向內核注冊寫完成時間,並告訴內核用戶緩沖區的位置以及如果通知應用程序。5)主線程繼續處理其他邏輯。6)當用戶緩沖區的數據被寫入socket之后,內核將向應用程序發送一個信號,以通知應用程序。7)應用程序預先定義好的信號處理函數選擇一個工作線程來做善后處理,比如是否關閉socket。
在這種模式下,主線程調用的epoll_wait只能監聽socket上的連接請求,而不能檢測連接socket上的讀寫事件,讀寫事件是由信號進行通知。
前面說到我們可以用同步I/O來模擬Proactor模式,具體工作流程如下:1)主線程向epoll內核事件表中注冊socket上的讀就緒事件。2)主線程調用epoll_wait等待socket上有數據可讀。3)當socket上有數據可讀時,epoll_wait通知主線程,主線程從socket循環隊列讀取數據直到沒有更多數據可讀,然后將讀取到的數據封裝成一個請求對象並插入請求隊列。4)睡眠在請求隊列上的某個工作線程被喚醒,它獲得請求對象並處理客戶請求,然后往epoll內核事件表中注冊socket上的寫就緒事件。5)主線程調用epoll_wait等待socket可寫。6)當socket可寫時,epoll_wait通知主線程,主線程向socket上寫入服務器處理客戶請求的結果。
我們編程時采用的並發模式主要是為了讓程序“同時”執行多個任務,但如果程序是計算密集型,則並發編程並沒有優勢,反而會因為任務的切換使效率降低,如果程序是I/O密集型,由於I/O的速度遠沒有CPU的計算速度快,所以並發模式的CPU利用率會顯著提高。服務器主要使用的兩種並發編程模型是:半同步/半異步模式和領導者/追隨者模式。
在半同步/半異步模式中,同步和異步的概念與I/O模型中的同步和異步不同,在I/O模型中,同步和異步主要區分的是內核向應用程序通知的是就緒事件還是完成事件,以及該由應用程序還是內核完成I/O讀寫。而在並發模式中,同步指的是程序完全按照代碼序列的順序執行,而異步是程序的執行需要由系統事件來驅動,比如中斷、信號等。而按照同步方式執行的線程是同步線程,按異步方式執行的線程是異步線程,它們各有優缺點,所以我們采用半同步/半異步模式。其中,同步線程主要用於處理客戶邏輯,異步線程用於處理I/O事件,異步線程監聽到客戶請求就將其封裝成請求對象並插入請求隊列,請求隊列通知某個同步線程來讀取或處理該對象。
半同步/半異步模式有幾種變形,其中一種是半同步/半反應堆模式,其中,異步線程只有一個,就是主線程,其余工作線程都睡眠在請求隊列上,以競爭方式獲得任務接管權,所以只有空閑的工作線程才能處理新任務。而其缺點也很明顯,首先請求隊列是互斥資源,每次訪問需要加鎖,消耗了CPU時間;其次每個工作線程同一時間只能處理一個客戶請求,當客戶數量大時只能通過增加工作線程的方式解決問題,而工作線程的切換也將耗費大量CPU時間。
另外一種更為高效的半同步/半異步模式,每個工作線程都能處理多個客戶連接,我們考慮一個問題,既然主線程可以用epoll來對多個文件描述符進行監聽,那么工作線程呢?所以,每個工作線程都使用epoll_wait監聽多個文件描述符,當主線程監聽到連接請求,就向它和工作線程的管道中寫數據,工作線程檢測到管道有數據可讀時,就分析是否是一個新客戶連接,如果是就將其注冊到自己的內核事件表中。
領導者/追隨者模式是多個工作線程輪流獲得事件源集合,輪流監聽、分發並處理時間的一種模式,在這種模式下,沒有主線程和工作線程的區分,就好像P2P模式一樣,每個工作線程都可以負責監聽事件源集合,也可以負責事務邏輯,而半同步/半異步就好像C/S模式一樣,主線程是服務器,將工作派發給工作線程。領導者/追隨者模式在同一時刻只有一個領導者進程,負責監聽I/O事件,而其他進程為追隨者,他們處在休眠狀態等待成為新的領導者,如果當前領導者監聽到了I/O事件,則首先要從線程池中推選出新的領導者線程,然后舊領導者線程去處理I/O事件,新領導者繼續監聽I/O事件,這樣實現了並發。但是很明顯,這樣做的缺點就是沒法像高效的半同步/半異步模式那樣一個工作線程處理多個客戶連接。領導者/追隨者模式包含句柄集、線程集、事件處理器和具體事件處理器。
有限狀態機是一種很好的高效編程方法,其概念比較簡單,但建模較難,我們以一個HTTP請求的讀取和分析程序來分析一下,在服務器讀取HTTP請求時,如果沒有利用有限狀態機,就需要等讀取到表示頭部結束的空行才能對頭部進行解析,但是用有限狀態機之后可以一邊接受數據一邊進行分析,其效率更高。
1 /************************************************************************* 2 > File Name: 8-3.cpp 3 > Author: Torrance_ZHANG 4 > Mail: 597156711@qq.com 5 > Created Time: Sat 03 Feb 2018 01:49:52 AM PST 6 ************************************************************************/ 7 8 #include"head.h" 9 using namespace std; 10 #define BUFFER_SIZE 4096 11 12 //主狀態機的兩種狀態,當前正在分析請求行和正在分析頭部字段 13 enum CHECK_STATE{CHECK_STATE_REQUESTLINE = 0, CHECK_STATE_HEADER}; 14 //從狀態機的三種可能狀態,即行的讀取狀態:讀取到一個完整的行、行出錯和行數據暫且不完整 15 enum LINE_STATUS{LINE_OK = 0, LINE_BAD, LINE_OPEN}; 16 17 //服務器處理HTTP請求的結果:NO_REQUEST表示請求不完整,需要讀取客戶數據; 18 // GET_REQUEST表示獲得了一個完整的客戶請求; 19 // BAD_REQUEST表示客戶請求有語法錯誤; 20 // FORBIDDEN_REQUEST表示客戶對資源沒有足夠的訪問權限 21 // INTERNAL_ERROR表示服務器內部錯誤; 22 // CLOSED_CONNECTION表示客戶端已經關閉連接。 23 enum HTTP_CODE{NO_REQUEST, GET_REQUEST, BAD_REQUEST, FORBIDDEN_REQUEST, INTERNAL_ERROR, CLOSED_CONNECTION}; 24 25 static const char* szret[] = {"I get a correct result\n", "Something wrong\n"}; 26 27 //從狀態機,用於解析一行內容 28 LINE_STATUS parse_line(char* buffer, int &checked_index, int &read_index) { 29 //checked_id_index指向buffer的正在分析的字節,read_index指向buffer中的最后一個字節的下一個字節 30 //即從0~checked_index是已分析完畢,checked_index~read_index-1待分析 31 char temp; 32 for(; checked_index < read_index; ++ checked_index) { 33 temp = buffer[checked_index]; 34 //如果當前是回車符,則說明可能讀取到了一個完整行 35 //如果是'\n',即換行符,也說明可能讀取到了一個完整行 36 if(temp == '\r') { 37 //如果當前是本行最后一個字符,則說明不完整,需要更多數據 38 //如果下一個字符是'\n'則說明讀取到了完整的行 39 //否則說明HTTP請求存在語法問題 40 if(checked_index + 1 == read_index) { 41 return LINE_OPEN; 42 } 43 else if(buffer[checked_index + 1] == '\n') { 44 buffer[checked_index ++] = '\0'; 45 buffer[checked_index ++] = '\0'; 46 return LINE_OK; 47 } 48 else return LINE_BAD; 49 } 50 else if(temp == '\n') { 51 if((checked_index > 1) && (buffer[checked_index - 1] == '\r')) { 52 buffer[checked_index - 1] = '\0'; 53 buffer[checked_index ++] = '\0'; 54 return LINE_OK; 55 } 56 return LINE_BAD; 57 } 58 } 59 //如果到最后也沒有發現'\r'字符,則返回LINE_OPEN表示需要讀取更多數據分析 60 return LINE_OPEN; 61 } 62 63 //分析請求行 64 HTTP_CODE parse_requestline(char* temp, CHECK_STATE& checkstate) { 65 //如果請求行中沒有空格和'\t'字符則說明HTTP請求有問題 66 //strpbrk返回前面緩沖區第一個在后面字符集合中的字符位置 67 char* url = strpbrk(temp, " \t"); 68 if(!url) return BAD_REQUEST; 69 *url ++ = '\0'; 70 71 //strcasecmp與strcmp的區別就是不區分大小寫 72 char* method = temp; 73 if(strcasecmp(method, "GET") == 0) printf("The request method is GET\n"); 74 else return BAD_REQUEST; 75 76 //strspn函數統計緩沖區前面多少個連續字符在字符集合中 77 url += strspn(url, "\t"); 78 char *version = strpbrk(url, " \t"); 79 if(!version) return BAD_REQUEST; 80 81 *version ++ = '\0'; 82 version += strspn(version, " \t"); 83 84 //strchr函數返回緩沖區里第一個后面字符的位置 85 if(strcasecmp(version, "HTTP/1.1") != 0) { 86 url += 7; 87 url = strchr(url, '/'); 88 } 89 90 if(!url || url[0] != '/') return BAD_REQUEST; 91 printf("The request URL is: %s\n", url); 92 checkstate = CHECK_STATE_HEADER; 93 return NO_REQUEST; 94 } 95 96 //分析頭部 97 HTTP_CODE parse_headers(char* temp) { 98 //遇到空行說明得到了一個正確的HTTP請求 99 if(temp[0] == '\0') return GET_REQUEST; 100 else if(strncasecmp(temp, "Host:", 5) == 0) { 101 temp += 5; 102 temp += strspn(temp, " \t"); 103 printf("The request host is: %s\n", temp); 104 } 105 else printf("I can not handle this header\n"); 106 return NO_REQUEST; 107 } 108 109 //分析HTTP請求的入口函數 110 HTTP_CODE parse_content(char* buffer, int& checked_index, CHECK_STATE& checkstate, int& read_index, int &start_line) { 111 LINE_STATUS linestatus = LINE_OK; 112 HTTP_CODE retcode = NO_REQUEST; 113 while((linestatus = parse_line(buffer, checked_index, read_index)) == LINE_OK) { 114 char* temp = buffer + start_line; 115 start_line = checked_index; 116 switch(checkstate) { 117 case CHECK_STATE_REQUESTLINE: { 118 retcode = parse_requestline(temp, checkstate); 119 if(retcode == BAD_REQUEST) return BAD_REQUEST; 120 break; 121 } 122 case CHECK_STATE_HEADER: { 123 retcode = parse_headers(temp); 124 if(retcode == BAD_REQUEST) return BAD_REQUEST; 125 else if(retcode == GET_REQUEST) return GET_REQUEST; 126 break; 127 } 128 default: { 129 return INTERNAL_ERROR; 130 } 131 } 132 } 133 if(linestatus == LINE_OPEN) return NO_REQUEST; 134 else return BAD_REQUEST; 135 } 136 137 int main(int argc, char** argv) { 138 if(argc <= 2) { 139 printf("usage: %s ip_address port_number\n", basename(argv[0])); 140 return 1; 141 } 142 const char* ip = argv[1]; 143 int port = atoi(argv[2]); 144 struct sockaddr_in address; 145 bzero(&address, sizeof(address)); 146 address.sin_family = AF_INET; 147 address.sin_port = htons(port); 148 inet_pton(AF_INET, ip, &address.sin_addr); 149 150 int listenfd = socket(AF_INET, SOCK_STREAM, 0); 151 assert(listenfd >= 0); 152 153 int reuse = 1; 154 int ret = setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); 155 156 ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); 157 assert(ret != -1); 158 159 ret = listen(listenfd, 5); 160 assert(ret != -1); 161 162 struct sockaddr_in client_address; 163 socklen_t client_addrlength = sizeof(client_address); 164 int fd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength); 165 if(fd < 0) printf("errno is: %d\n", errno); 166 else { 167 char buffer[BUFFER_SIZE]; 168 memset(buffer, 0, sizeof(buffer)); 169 //下面的變量分別代表已經接收的字符數、已經讀取了多少字節、已經分析完了多少字節、行在buffer中的起始位置 170 int data_read = 0; 171 int read_index = 0; 172 int checked_index = 0; 173 int start_line = 0; 174 CHECK_STATE checkstate = CHECK_STATE_REQUESTLINE; 175 while(1) { 176 data_read = recv(fd, buffer + read_index, BUFFER_SIZE - read_index, 0); 177 if(data_read == -1) { 178 printf("reading failed\n"); 179 break; 180 } 181 else if(data_read == 0) { 182 printf("remote client has closed the connection\n"); 183 break; 184 } 185 read_index += data_read; 186 HTTP_CODE result = parse_content(buffer, checked_index, checkstate, read_index, start_line); 187 if(result == NO_REQUEST) continue; 188 else if(result == GET_REQUEST) { 189 send(fd, szret[0], strlen(szret[0]), 0); 190 break; 191 } 192 else { 193 send(fd, szret[1], strlen(szret[1]), 0); 194 break; 195 } 196 } 197 close(fd); 198 } 199 close(listenfd); 200 return 0; 201 }
我們模擬了正確的請求報文和錯誤的請求報文兩種情況,發現其正常工作。分析一下發現,這里面存在着兩個有限狀態機,分別是主狀態機和從狀態機,從狀態機就是一個parse_line函數,負責從buffer中解析出一個行,其初始狀態為LINE_OK,原始驅動力來源於buffer中新到達的數據,而當從狀態機讀取到了一個完成的行,就需要將這個行交給主狀態機處理,主狀態機中根據當前狀態調用不同的函數對報文進行解析,從而實現狀態轉移。
關於如何提高服務器的性能,還有其他三種方法:首先使用池的概念,由於臨時申請進程或線程等資源的CPU消耗比較大,所以我們事先申請好資源,如果不夠再臨時申請;其次就是復制數據的過程中,盡量使用零拷貝函數,也盡量少進行數據復制;最后就是減少上下文切換和鎖的使用。