1、網絡編程概述
1.1 套接字socket
網絡程序設計主要依靠套接字接受和發送信息來實現。Socket實質上提供了進程通信的端點。進程通信之前,雙方首先必須各自創建一個端點,每一個Socket都用一個半相關描述:
{協議,本地地址,本地端口}
一個完整的Socket則用一個相關描述:
{協議,本地地址,本地端口,遠程地址,遠程端口}
每一個Socket有一個本地的唯一Socket號,由操作系統分配。
1.2 套接字的三種類型
套接字有三種類型:流式套接字(SOCK_STREAM),數據報套接字(SOCK_DGRAM)及原始套接字。
1.流式套接字(SOCK_STREAM)
流式的套接字可以提供可靠的、面向連接的通訊流。如果你通過流式套接字發送了順序的數據:“1”、“2”。那么數據到達遠程時候的順序也是“1”、“2”。
流式套接字使用了TCP(Transmission Control Protocol)協議,保證數據傳輸是正確的,並且是順序的。
2.數據報套接字(SOCK_DGRAM)
數據報套接字定義了一種無連接的服務,數據通過相互獨立的報文進行傳輸,是無序的,並且不保證可靠,無差錯。
數據報套接字使用使用者數據報協議UDP(User Datagram Protocol)協議。
3.原始套接字
原始套接字主要用於一些協議的開發,可以進行比較底層的操作。它功能強大,但是沒有上面介紹的兩種套接字使用方便,一般的程序也涉及不到原始套接字。
1.3 套接字的相關知識
1.3.1 struct in_addr結構體
該結構體用於存儲IP地址,其定義如下:
| /* 因特網地址 (a structure for historical reasons) */ struct in_addr { unsigned long s_addr; /*4個字節的IP 地址(按網絡字節順序排放)*/ }; |
該結構體因為歷史原因,出現在下一節的struct sockaddr_in結構體中。
1.3.2 struct sockaddr結構體
這個結構用來存儲套接字地址,定義為:
| struct sockaddr { unsigned short sa_family; /* address族, AF_xxx */ char sa_data[14]; /* 14 bytes的協議地址 */ }; |
l sa_family 一般來說,都是 “AFINET”。
l sa_data 包含了一些遠程電腦的地址、端口和套接字的數目,它里面的數據是雜溶在一切的。
為了處理struct sockaddr,
程序員建立了另外一個相似的結構struct sockaddr_in:
struct sockaddr_in (“in” 代表 “Internet”)
| struct sockaddr_in { short int sin_family; /* Internet地址族 */ unsigned short int sin_port; /* 端口號 */ struct in_addr sin_addr; /* Internet地址 */ //見1.3.1節 unsigned char sin_zero[8]; /* 添0(和struct sockaddr一樣大小)*/ }; |
一個指向struct sockaddr_in 的指針可以聲明指向一個struct sockaddr 的結構。所以雖然socket() 函數需要一個struct sockaddr *,你也可以給他一個sockaddr_in *。
注意:在struct sockaddr_in中,sin_family 相當於在struct sockaddr中的sa_family,需要設成 “AF_INET”。
1.3.3 常用的轉換函數
1、字節序轉換
每一個機器內部對變量的字節存儲順序不同(有的系統是高位在前,底位在后,而有的系統是底位在前,高位在后),網絡傳輸的數據大家是一定要統一順序的。
套接字字節轉換程序的列表:
l htons() ——“Host to Network Short” 主機字節順序轉換為網絡字節順序(對無符號短型進行操作4 bytes)
l htonl() ——“Host to Network Long” 主機字節順序轉換為網絡字節順序(對無符號長型進行操作8 bytes)
l ntohs() ——“Network to Host Short” 網絡字節順序轉換為主機字節順序(對無符號短型進行操作4 bytes)
l ntohl() ——“Network to Host Long” 網絡字節順序轉換為主機字節順序(對無符號長型進行操作8 bytes)
2、地址轉換
IP地址既可以用點分十進制的字符串來表示,也可以用一個長整形數字來表示。
l inet_addr() ――“Address” 點分十進制的表示IP地址的字符串轉換成無符號長整型。
l inet_ntoa() ――“Network to ASCII” 把一個無符號長整型的IP地址轉換成點分十進制的字符串。
例如: ina.sin_addr.s_addr = inet_addr(“166.111.69.52”);
printf(“%s”, inet_ntoa(ina.sin_addr));
注意:(1)inet_addr() 返回的地址已經是網絡字節順序了
(2)如果inet_addr() 函數執行錯誤,它將會返回–1,相當於255.255.255.255廣播用的IP地址;
(3)inet_ntoa() 返回一個字符指針,它指向一個定義在函數inet_ntoa() 中的static 類型字符串。
1.3.4 頭文件即函數聲明
| #include <netinet/in.h>
struct sockaddr_in; |
| #include <sys/types.h> #include <sys/socket.h>
int socket(int domain , int type , int protocol); int bind (int sockfd , struct sockaddr *my_addr , int addrlen) ; int connect (int sockfd, struct sockaddr *serv_addr, int addrlen); int send(int sockfd, const void *msg, int len, int flags); int recv(int sockfd, void *buf, int len, unsigned int flags); int sendto(int sockfd, const void *msg, int len, unsigned int flags,const struct sockaddr *to, int tolen); int recvfrom(int sockfd, void *buf, int len, unsigned int flags,struct sockaddr *from, int *fromlen); |
| #include <sys/socket.h>
int listen(int sockfd, int backlog); int accept(int sockfd, void *addr, int *addrlen); int shutdown(int sockfd, int how); void close(int sockfd); |
1.3.5 socket庫的錯誤
問題描述:Linux所提供的socket 庫含有一個錯誤(bug)。此錯誤表現為你不能為一個套接字重新啟用同一個端口號,即使在你正常關閉該套接字以后。例如,比方說,你編寫一個服務器在一個套接字上等待的程序.服務器打開套接字並在其上偵聽是沒有問題的。無論如何,總有一些原因(不管是正常還是非正常的結束程序)使你的程序需要重新啟動。然而重啟動后你就不能把它綁定在原來那個端口上了。從bind()系統調用返回的錯誤代碼總是報告說你試圖連接的端口已經被別的進程所綁定。
解決辦法:當套接字已經打開但尚未有連接的時候用setsockopt()系統調用在其上設定選項(options)。即:
opt = 1; len = sizeof(opt); /* 設定參數數值 */
setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR,&opt,&len); /* 設置套接字屬性 */
1.3.6 其他常用函數
(1)setsockopt()與getsockopt()
setsockopt()調用設置選項,getsockopt()從給定的套接字取得選項。
| #include<sys/types.h> #include<sys/socket.h>
int getsockopt(int sockfd, int level, int name, char *value, int *optlen); int setsockopt(int sockfd, int level, int name, char *value, int *optlen); |
(2)getpeername()
取得一個已經連接上的套接字的遠程信息(比如IP 地址和端口)。
| #include <sys/socket.h>
int getpeername(int sockfd, struct sockaddr *addr, int *addrlen); |
(3)gethostname()
取得正在執行它的計算機的名字。返回的這個名字可以被gethostbyname()函數使用,由此可以得到本地主機的IP地址。
| #include <unistd.h>
int gethostname(char *hostname, size_t size); |
(4)gethostbyname()
取得本地主機的信息.
| #include <netdb.h>
struct hostent { char *h_name; //主機的正式名稱 char **h_aliases; //主機的備用名稱 int h_addrtype; //返回地址的類型,一般來說是“AF_INET”。 int h_length; //地址的字節長度 char **h_addr_list; //一個以0結尾的數組,存儲了主機的網絡地址 }; #define h_addr h_addr_list[0]
struct hostent *gethostbyname(const char *name); |
注意:使用gethostbyname()函數,你不能使用perror()來輸出錯誤信息(因為錯誤代碼存儲在h_errno中而不是errno中。所以,你需要調用herror()函數。
1.4 五種I/O模式
在Linux/UNIX下,有下面這五種I/O 操作方式:
l 阻塞I/O
l 非阻塞I/O
l I/O多路復用
l 信號驅動I/O(SIGIO)
l 異步I/O
1.4.1阻塞I/O
缺省的,一個套接字建立后所處於的模式就是阻塞I/O模式。
1.4.2非阻塞I/O
可以使用fcntl()函數將一個套接字設置為非阻塞模式,如:
| #include <unistd.h> #include <fcntl.h>
int fcntl(int fd, int cmd, long arg); |
int ret = fcntl(sockfd, F_SETFL, fcntl(sockfd,F_GETFD, 0)|O_NONBLOCK);
當一個應用程序使用了非阻塞模式的套接字,它需要使用一個循環來不聽的測試是否一個文件描述符有數據可讀(稱做polling)。應用程序不停的polling 內核來檢查I/O操作是否已經就緒。
|
||||
|
1.4.3 I/O多路復用
在使用I/O 多路技術的時候,我們調用select()函數和poll()函數,在調用它們的時候阻塞,而不是我們來調用accept()、recv()等函數的時候阻塞。
多路復用的高級之處在於,它能同時等待多個文件描述符,而這些文件描述符(套接字描述符)其中的任意一個進入讀就緒狀態,select()/poll()函數就可以返回。
此部分具體見第2、3、4節。
1.4.4信號驅動I/O(SIGIO)
信號驅動I/O模式是使用信號,讓內核在文件描述符就緒的時候使用SIGIO信號來通知我們。
1.4.5異步I/O(AIO)
當我們運行在異步I/O 模式下時,我們如果想進行I/O操作,只需要告訴內核我們要進行I/O 操作,然后內核會馬上返回。具體的I/O 和數據的拷貝全部由內核來完成,我們的程序可以繼續向下執行。當內核完成所有的I/O 操作和數據拷貝后,內核將通知我們的程序。
2、I/O多路復用——select模型
select系統調用時可以讓我們的程序監視多個文件句柄的狀態變化的。程序會停在select這里等待,直到被監視的文件句柄有一個或多個發生了狀態改變。
1.1 函數聲明
關於文件句柄,其實就是一個整數, socket函數的聲明如:
int socket(int domain, int type,int protocol);
| 我們最熟悉的句柄是0、1、2三個,0是標准輸入,1是標准輸出,2是標准錯誤輸出。0、1、2是整數表示的,對應的FILE *結構的表示就是stdin、stdout、stderr。 |
select函數就是用來監視某個或某些句柄的狀態變化的。select函數原型如下:
int select (int nfds, fd_set *readfds, fd_set *writefds, fd_set*exceptfds, struct timeval *timeout);
函數參數:
◆ 參數nfds是需要監視的最大的文件描述符值+1;
◆ 第2、3、4三個參數是一樣的類型;fd_set *,即我們在程序里要申請幾個fd_set類型的變量,比如rdfds,wtfds,exfds,然后把這個變量的地址&rdfds,&wtfds,&exfds傳遞給select函數。這三個參數都是一個句柄的集合,第一個rdfds是用來保存這樣的句柄的:當句柄的狀態變成可讀時系統就告訴select函數返回,同理第二個函數是指向有句柄狀態變成可寫時系統就會告訴select函數返回,同理第三個參數exfds是特殊情況,即句柄上有特殊情況發生時系統會告訴select函數返回。
◆ 函數的最后一個參數timeout是一個超時時間值。其類型是struct timeval *,即一個struct timeval結構的變量的指針,所以我們在程序里要聲明一個struct timeval tv;然后把變量tv的地址&tv傳遞給select函數。該結構體的定義如下:
struct timeval
{
long tv_sec; //seconds
long tv_usec; //microseconds
};
返回值:
執行成功則返回文件描述詞狀態已改變的個數,如果返回0代表在描述詞狀態改變前已超過timeout時間,沒有返回;當有錯誤發生時則返回-1,錯誤原因存於errno,此時參數readfds,writefds,exceptfds和timeout的值變成不可預測。錯誤值可能為:
EBADF 文件描述詞為無效的或該文件已關閉
EINTR 此調用被信號所中斷
EINVAL 參數n 為負值。
ENOMEM 核心內存不足
下面的宏提供了處理這三種描述詞組的方式:
FD_CLR(inr fd,fd_set* set);用來清除描述詞組set中相關fd 的位
FD_ISSET(int fd,fd_set *set);用來測試描述詞組set中相關fd 的位是否為真
FD_SET(int fd,fd_set*set);用來設置描述詞組set中相關fd的位
FD_ZERO(fd_set *set);用來清除描述詞組set的全部位
1.2 示例程序
| /** * FileName:main.cpp * Description:練習使用select模型 */ #include <stdio.h> #include <assert.h> #include <netinet/in.h> #include <sys/types.h> #include <sys/socket.h> #include <unistd.h> #include <string.h> #include <vector> using std::vector;
int main(int argc,char *argv[]) { int sfd=0; int ret=0;
sfd=socket(AF_INET,SOCK_STREAM,0); assert(-1!=sfd);
int optValue=1; setsockopt(sfd,SOL_SOCKET,SO_REUSEADDR,&optValue,sizeof(optValue));
struct sockaddr_in myAddr; myAddr.sin_family=AF_INET; myAddr.sin_addr.s_addr=htonl(INADDR_ANY); myAddr.sin_port=htons(4000); bzero(&(myAddr.sin_zero),sizeof(myAddr.sin_zero));//string.h
ret=bind(sfd,(struct sockaddr *)&myAddr,sizeof(myAddr)); assert(-1!=ret);
ret=listen(sfd,SOMAXCONN); assert(-1!=ret);
vector<int> acceptList; int maxfd=sfd+1; fd_set sockets; fd_set readfds,writefds; FD_ZERO(&sockets); FD_SET(sfd,&sockets); vector<int>::iterator it;
while(1) { FD_ZERO(&readfds); FD_ZERO(&writefds); readfds=sockets; writefds=sockets;
ret=select(maxfd,&readfds,&writefds,NULL,NULL); assert(-1!=ret);
if(FD_ISSET(sfd,&readfds)) {//accept client connection struct sockaddr_in clientAddr; socklen_t addrlen=sizeof(clientAddr); int cfd=accept(sfd,(struct sockaddr *)&clientAddr,&addrlen); assert(-1!=cfd); FD_SET(cfd,&sockets); maxfd=cfd+1; acceptList.push_back(cfd); } else { //check read && write for(it=acceptList.begin();it!=acceptList.end();++it) { int s=*it;
//check recieve if(FD_ISSET(s,&readfds)) {//recieve data from client char buffer[1024]; int buflen=1024; int recvlen=recv(s,buffer,buflen,0); if(-1==recvlen) //error accurred continue; else if(0==recvlen) //client disconnect { close(s);
it=acceptList.erase(it); it--;
FD_CLR(s,&sockets); } else // print recieve data { buffer[buflen]='\0'; printf(buffer); } }
//check send if(FD_ISSET(s,&writefds)) { //char sendbuf[128]="hello client"; //int sendlen=send(s,sendbuf,strlen(sendbuf),0); } } } }
close(sfd);
return 0; } |
3、I/O多路復用——poll模型
和select()不一樣,poll()沒有使用低效的三個基於位的文件描述符set,而是采用了一個單獨的結構體pollfd數組,由fds指針指向這個組。poll實現的效率要比select高。
3.1函數聲明
# include < sys/poll.h>
int poll ( struct pollfd * fds, unsigned int nfds, int timeout);
函數參數:
◆ pollfd結構體定義如下:
| # include < sys/poll.h> struct pollfd { int fd; /* 文件描述符 */ short events; /* 等待的事件 */ short revents; /* 實際發生了的事件 */ } ; |
每一個pollfd結構體指定了一個被監視的文件描述符,可以傳遞結構體數組,指示poll()監視多個文件描述符。每個結構體的events域是監視該文件描述符的事件掩碼,由用戶來設置這個域。revents域是文件描述符的操作結果事件掩碼。內核在調用返回時設置這個域。events域中請求的任何事件都可能在revents域中返回。合法的事件如下:
POLLIN 有數據可讀。
POLLRDNORM 有普通數據可讀。
POLLRDBAND 有優先數據可讀。
POLLPRI 有緊迫數據可讀。
POLLOUT 寫數據不會導致阻塞。
POLLWRNORM 寫普通數據不會導致阻塞。
POLLWRBAND 寫優先數據不會導致阻塞。
POLLMSG SIGPOLL消息可用。
此外,revents域中還可能返回下列事件:
POLLER 指定的文件描述符發生錯誤。
POLLHUP 指定的文件描述符掛起事件。
POLLNVAL 指定的文件描述符非法。
這些事件在events域中無意義,因為它們在合適的時候總是會從revents中返回。使用poll()和select()不一樣,你不需要顯式地請求異常情況報告。POLLIN | POLLPRI等價於select()的讀事件,POLLOUT |POLLWRBAND等價於select()的寫事件。POLLIN等價於POLLRDNORM |POLLRDBAND,而POLLOUT則等價於POLLWRNORM。
例如,要同時監視一個文件描述符是否可讀和可寫,我們可以設置 events為POLLIN |POLLOUT。在poll返回時,我們可以檢查revents中的標志,對應於文件描述符請求的events結構體。如果POLLIN事件被設置,則文件描述符可以被讀取而不阻塞。如果POLLOUT被設置,則文件描述符可以寫入而不導致阻塞。這些標志並不是互斥的:它們可能被同時設置,表示這個文件描述符的讀取和寫入操作都會正常返回而不阻塞。
◆ nfds參數用於標記數組fds中的結構體元素的總數量;
◆ timeout參數指定等待的毫秒數,無論I/O是否准備好,poll都會返回。timeout指定為負數值表示無限超時,使poll()一直掛起直到一個指定事件發生;timeout為0指示poll調用立即返回並列出准備好I/O的文件描述符,但並不等待其它的事件。這種情況下,poll()就像它的名字那樣,一旦選舉出來,立即返回。
返回值和錯誤代碼
成功時,poll()返回結構體中revents域不為0的文件描述符個數;如果在超時前沒有任何事件發生,poll()返回0;失敗時,poll()返回-1,並設置errno為下列值之一:
EBADF 一個或多個結構體中指定的文件描述符無效。
EFAULT fds指針指向的地址超出進程的地址空間。
EINTR 請求的事件之前產生一個信號,調用可以重新發起。
EINVAL fds參數超出PLIMIT_NOFILE值。
ENOMEM 可用內存不足,無法完成請求。
3.2示例程序
| /** * File:main.cpp * Description:練習使用poll模型 */ #include <stdio.h> #include <assert.h> #include <netinet/in.h> #include <sys/types.h> #include <sys/socket.h> #include <poll.h> #include <unistd.h> #include <string.h> #include <vector> #include <algorithm> using std::vector;
int main(int argc,char *argv[]) { int sfd=0; int ret=0;
sfd=socket(AF_INET,SOCK_STREAM,0); assert(-1!=sfd);
int optValue=1; setsockopt(sfd,SOL_SOCKET,SO_REUSEADDR,&optValue,sizeof(optValue));
struct sockaddr_in myAddr; myAddr.sin_family=AF_INET; myAddr.sin_addr.s_addr=htonl(INADDR_ANY); myAddr.sin_port=htons(4000); bzero(&(myAddr.sin_zero),sizeof(myAddr.sin_zero));//string.h
ret=bind(sfd,(struct sockaddr *)&myAddr,sizeof(myAddr)); assert(-1!=ret);
ret=listen(sfd,SOMAXCONN); assert(-1!=ret);
vector<int> acceptList; vector<int>::iterator it;
acceptList.push_back(sfd); struct pollfd *fds=NULL;
while(1) { int pollsize=acceptList.size(); if(fds) delete []fds; fds=new struct pollfd[pollsize]; memset(fds,0,pollsize*sizeof(struct pollfd)); size_t index=0;
for(it=acceptList.begin();it!=acceptList.end();++it) { fds[index].fd=*it; fds[index].events |=POLLIN|POLLOUT; index++; }
ret=poll(fds,index,-1); assert(-1!=ret);//0 means timeout
for(size_t i=0;i<index;++i) { if(fds[i].revents & POLLIN)//can read { if(fds[i].fd==sfd) //can accept { struct sockaddr_in clientAddr={0}; socklen_t addrlen=sizeof(clientAddr); int cfd=accept(sfd,(struct sockaddr *)&clientAddr,&addrlen); assert(-1!=cfd);
acceptList.push_back(cfd); } else // can receive { char buffer[1024]; int recvLen=recv(fds[i].fd,buffer,sizeof(buffer),0); if(recvLen<0) continue; else if(0==recvLen) { close(fds[i].fd); it=std::find(acceptList.begin(),acceptList.end(),fds[i].fd); acceptList.erase(it); } else { buffer[recvLen]='\0'; printf(buffer); } } }
if(fds[i].revents & POLLOUT) //can send { //char sendbuf[128]="hello client"; //int sendlen=send(s,sendbuf,strlen(sendbuf),0); } }
}
acceptList.clear(); close(sfd);
return 0; } |
4、I/O多路復用——epoll模型
epoll是Linux內核為處理大批句柄而作改進的poll,是Linux下多路復用IO接口select/poll的增強版本,它能顯著的減少程序在大量並發連接中只有少量活躍的情況下的系統CPU利用率。因為它會復用文件描述符集合來傳遞結果而不是迫使開發者每次等待事件之前都必須重新准備要被偵聽的文件描述符集合,另一個原因就是獲取事件的時候,它無須遍歷整個被偵聽的描述符集,只要遍歷那些被內核IO事件異步喚醒而加入Ready隊列的描述符集合就行了。epoll除了提供select/poll那種IO事件的電平觸發(Level Triggered)外,還提供了邊沿觸發(Edge Triggered),這就使得用戶空間程序有可能緩存IO狀態,減少epoll_wait/epoll_pwait的調用,提供應用程序的效率。
LT(level triggered):水平觸發,缺省方式,同時支持block和no-block socket,在這種做法中,內核告訴我們一個文件描述符是否被就緒了,如果就緒了,你就可以對這個就緒的fd進行IO操作。如果你不作任何操作,內核還是會繼續通知你的,所以,這種模式編程出錯的可能性較小。傳統的select\poll都是這種模型的代表。
ET(edge-triggered):邊沿觸發,高速工作方式,只支持no-block socket。在這種模式下,當描述符從未就緒變為就緒狀態時,內核通過epoll告訴你。然后它會假設你知道文件描述符已經就緒,並且不會再為那個描述符發送更多的就緒通知,直到你做了某些操作導致那個文件描述符不再為就緒狀態了(比如:你在發送、接受或者接受請求,或者發送接受的數據少於一定量時導致了一個EWOULDBLOCK錯誤)。但是請注意,如果一直不對這個fs做IO操作(從而導致它再次變成未就緒狀態),內核不會發送更多的通知。
區別:LT事件不會丟棄,而是只要讀buffer里面有數據可以讓用戶讀取,則不斷的通知你。而ET則只在事件發生之時通知。
4.1、使用流程
1、創建一個epoll句柄
#include <sys/epoll.h>
int epoll_create(int size)
int epoll_create1(int flag)
2、注冊epoll事件
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
函數參數
epfd為epoll的句柄;
◆ op表示動作,用3個宏來表示:
EPOLL_CTL_ADD(注冊新的fd到epfd)
EPOLL_CTL_MOD(修改已經注冊的fd的監聽事件)
EPOLL_CTL_DEL(從epfd刪除一個fd);
◆ fd為需要監聽的標示符;
◆ event告訴內核需要監聽的事件,event的結構如下:
| struct epoll_event { __uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ }; |
其中events可以用以下幾個宏的集合:
EPOLLIN :表示對應的文件描述符可以讀(包括對端SOCKET正常關閉)
EPOLLOUT:表示對應的文件描述符可以寫
EPOLLPRI:表示對應的文件描述符有緊急的數據可讀(這里應該表示有帶外數據到來)
EPOLLERR:表示對應的文件描述符發生錯誤
EPOLLHUP:表示對應的文件描述符被掛斷;
EPOLLET: 將EPOLL設為邊緣觸發(Edge Triggered)模式,這是相對於水平觸發(Level Triggered)來說的
EPOLLONESHOT:只監聽一次事件,當監聽完這次事件之后,如果還需要繼續監聽這個socket的話,需要再次把這個socket加入到EPOLL隊列里
3、等待事件的產生
int epoll_wait(int epfd, struct epoll_event * events, int maxevents,int timeout)
類似於select()調用。參數events用來從內核得到事件的集合,maxevents告之內核這個events有多大,這個maxevents的值不能大於創建epoll_create()時的size(此次待定,具體需要查相關文檔),參數timeout是超時時間(毫秒,0會立即返回,-1將不確定,也有說法說是永久阻塞)。該函數返回需要處理的事件數目,如返回0表示已超時。
4.2、示例程序
| /* * FileName:main.cpp * Description:練習使用epoll模型 */ #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <sys/epoll.h> #include <unistd.h> #include <string.h> #include <stdio.h> #include <errno.h> #include <assert.h> #include <list> using std::list;
int main(int argc,char *argv[]) { int listener; int epfd; int ret; list<int> clients;
listener=socket(AF_INET,SOCK_STREAM,0); assert(-1!=listener);
struct sockaddr_in myAddr; myAddr.sin_family=AF_INET; myAddr.sin_addr.s_addr=htonl(INADDR_ANY); myAddr.sin_port=htons(4000); bzero(&(myAddr.sin_zero),sizeof(myAddr.sin_zero));
ret=bind(listener,(struct sockaddr *)&myAddr,sizeof(myAddr)); assert(-1!=ret);
ret=listen(listener,SOMAXCONN); assert(-1!=ret);
epfd=epoll_create1(0); assert(-1!=epfd);
struct epoll_event event; event.events=EPOLLIN | EPOLLOUT; event.data.fd=listener;
ret=epoll_ctl(epfd,EPOLL_CTL_ADD,listener,&event); assert(-1!=ret);
struct epoll_event events[1024]; int maxsize=1024; while(1) { ret=epoll_wait(epfd,events,maxsize,-1); assert(-1!=ret); assert(ret>0); int event_counts=ret;
for(int i=0;i<event_counts;++i) { if(events[i].data.fd==listener) //can accept { struct sockaddr_in clientAddr; socklen_t addrLen=sizeof(clientAddr); int cfd=accept(listener,(struct sockaddr *)&clientAddr,&addrLen); assert(-1!=cfd);
event.events=EPOLLIN | EPOLLOUT; event.data.fd=cfd; ret=epoll_ctl(epfd,EPOLL_CTL_ADD,cfd,&event); assert(-1!=ret);
clients.push_back(cfd); } else if(events[i].events & EPOLLIN)//can receive { char buffer[1024]; int recvLen=recv(events[i].data.fd,buffer,sizeof(buffer),0); if(-1==recvLen) {//error accurred! perror("error when recv()\n"); } else if(0==recvLen) {//client disconnect ret=epoll_ctl(epfd,EPOLL_CTL_DEL,events[i].data.fd,&events[i]); assert(-1!=ret);
clients.remove(events[i].data.fd); close(events[i].data.fd); } else {//print receive data buffer[recvLen]='\0'; printf(buffer); } } else if(events[i].events & EPOLLOUT) //can send { //char sendBuf[]="hello client"; //int sendLen=send(events[i].data.fd,sendBuf,strlen(sendBuf)+1,0); } }
}
close(epfd); close(listener); clients.clear();
return 0; } |
5、select/poll/epoll比較
5.1 參數及實現對比
1. select的第一個參數nfds為fdset集合中最大描述符值加1,fdset是一個位數組,其大小限制為__FD_SETSIZE(1024),位數組的每一位代表其對應的描述符是否需要被檢查。
select的第二三四個參數表示需要關注讀、寫、錯誤事件的文件描述符位數組,這些參數既是輸入參數也是輸出參數,可能會被內核修改用於標示哪些描述符上發生了關注的事件。所以每次調用select前都需要重新初始化fdset。
timeout參數為超時時間,該結構會被內核修改,其值為超時剩余的時間。
select對應於內核中的sys_select調用,sys_select首先將第二三四個參數指向的fd_set拷貝到內核,然后對每個被SET的描述符調用進行poll,並記錄在臨時結果中(fdset),如果有事件發生,select會將臨時結果寫到用戶空間並返回;當輪詢一遍后沒有任何事件發生時,如果指定了超時時間,則select會睡眠到超時,睡眠結束后再進行一次輪詢,並將臨時結果寫到用戶空間,然后返回。
select返回后,需要逐一檢查關注的描述符是否被SET(事件是否發生)。
2. poll與select不同,通過一個pollfd數組向內核傳遞需要關注的事件,故沒有描述符個數的限制,pollfd中的events字段和revents分別用於標示關注的事件和發生的事件,故pollfd數組只需要被初始化一次。
poll的實現機制與select類似,其對應內核中的sys_poll,只不過poll向內核傳遞pollfd數組,然后對pollfd中的每個描述符進行poll,相比處理fdset來說,poll效率更高。
poll返回后,需要對pollfd中的每個元素檢查其revents值,來得指事件是否發生。
3. epoll通過epoll_create創建一個用於epoll輪詢的描述符,通過epoll_ctl添加/修改/刪除事件,通過epoll_wait檢查事件,epoll_wait的第二個參數用於存放結果。
epoll與select、poll不同,首先,其不用每次調用都向內核拷貝事件描述信息,在第一次調用后,事件信息就會與對應的epoll描述符關聯起來。另外epoll不是通過輪詢,而是通過在等待的描述符上注冊回調函數,當事件發生時,回調函數負責把發生的事件存儲在就緒事件鏈表中,最后寫到用戶空間。
epoll返回后,該參數指向的緩沖區中即為發生的事件,對緩沖區中每個元素進行處理即可,而不需要像poll、select那樣進行輪詢檢查。
5.2 性能對比
(無)。
5.3 連接數對比
對於select,感觸最深的是linux下select最大數目限制(windows下似乎沒有限制),每個進程的select最多能處理FD_SETSIZE個FD(文件句柄),
如果要處理超過1024個句柄,只能采用多進程了。
常見的使用slect的多進程模型是這樣的: 一個進程專門accept,成功后將fd通過unix socket傳遞給子進程處理,父進程可以根據子進程負載分派。曾經用過1個父進程+4個子進程承載了超過4000個的負載。
這種模型在我們當時的業務運行的非常好。epoll在連接數方面沒有限制,當然可能需要用戶調用API重現設置進程的資源限制。
6、信號驅動I/O(SIGIO)
信號驅動I/O是讓內核在描述符就緒時發送SIGIO信號通知我們。首先開啟套接口的信號驅動1/O功能,sigaction系統調用安裝一個信號處理函數,當內核數據包准備好時,會為該進程產生一個SIGIO信號。應用可以在信號處理時接收數據。
具體步驟:
1,建立SIGIO信號處理函數。
2,設置該套接口的屬主,通常使用fcntl的F_SETOWN命令設置。
3,開啟該套接口的信號驅動I/O,通常使用fcntl的F_SETFL命令打開O_ASYNC標志完成。
6.1 UDP套接口上的SIGIO信號
UDP上使用信號驅動I/O是簡單的。當下述事件發生時產生SIGIO信號:
1. 數據報到達套接口
2. 套接口上發生異步錯誤
6.2 TCP套接口上的SIGIO信號
不幸的是,信號驅動I/O對TCP套接口幾乎是沒用的,原因是該信號產生得過於頻繁,並且該信號的出現並沒有告訴我們發生了什么事情。
下列條件均可在TCP套接口上產生SIGIO信號(假設信號驅動I/O是使能的):
1. 在監聽套接口上有一個連接請求已經完成
2. 發起了一個連接拆除請求
3. 一個連接拆除請求已經完成
4. 一個連接的一半已經關閉
5. 數據到達了套接口
6. 數據已從套接口上發出(即輸出緩沖區有空閑時間)
7. 發生了一個異步錯誤
inet_ntoa(sockaddr_in addr)包含在arpa/inet.h
int fcntl(int nFd,int nCmd,...),該函數包含在頭文件fcntl.h中
信號處理函數的設置采用函數sighandler_t signal(int nSig,void(*DealFun)(intnSig)),該函數包含在頭文件signal.h中
7、異步非阻塞 I/O(AIO)
Linux aio是Linux下的異步讀寫模型。Linux 異步 I/O 是 Linux 內核中提供的一個相當新的增強。它是 2.6 版本內核的一個標准特性。
在深入介紹 AIOAPI 之前,讓我們先來探索一下 Linux 上可以使用的不同 I/O 模型。下圖給出了同步和異步模型,以及阻塞和非阻塞的模型。
異步非阻塞 I/O 模型是一種處理I/O 重疊的模型。讀請求會立即返回,說明read/write請求已經成功發起了。在后台完成讀操作時,應用程序然后會執行其他處理操作。當 read/write 的響應到達時,就會產生一個信號或執行一個基於線程的回調函數來完成這次 I/O 處理過程。
linux下主要有兩套異步IO,一套是由glibc實現的(以下稱之為glibc版本)、一套是由linux內核實現的,並由libaio來封裝調用接口(以下稱之為linux版本)。
7.1 glibc版本AIO接口
7.1.1 接口介紹
| 函數聲明 |
函數說明 |
| int aio_read(struct aiocb *aiocbp); |
提交一個異步讀 |
| int aio_write(struct aiocb *aiocbp); |
提交一個異步寫 |
| int aio_cancel(int fildes, struct aiocb *aiocbp); |
取消一個異步請求(或基於一個fd的所有異步請求,aiocbp==NULL) |
| int aio_error(const struct aiocb *aiocbp); |
查看一個異步請求的狀態(進行中EINPROGRESS?還是已經結束或出錯 |
| ssize_t aio_return(struct aiocb *aiocbp); |
查看一個異步請求的返回值(跟同步讀寫定義的一樣) |
| int aio_suspend(const struct aiocb * const list[], int nent, const struct timespec *timeout); |
阻塞等待請求完成 |
其中,struct aiocb主要包含以下字段:
struct aiocb
{
int aio_fildes; /* 要被讀寫的fd */
void * aio_buf; /* 讀寫操作對應的內存buffer */
__off64_t aio_offset; /* 讀寫操作對應的文件偏移 */
size_t aio_nbytes; /* 需要讀寫的字節長度 */
int aio_reqprio; /* 請求的優先級 */
structsigevent aio_sigevent; /* 異步事件,定義異步操作完成時的通知信號或回調函數 */
}
7.1.2 接口使用
要使用aio的功能,需要include頭文件aio.h,在編譯連接的時候需要加入POSIX實時擴展庫rt.
(1) int aio_read(struct aiocb *aiocbp);
異步讀操作,向內核發出讀的命令,傳入的參數是一個aiocb的結構,比如
| struct aiocb myaiocb; memset(&aiocb , 0x00 , sizeof(myaiocb)); myaiocb.aio_fildes = fd; myaiocb.aio_buf = new char[1024]; myaiocb.aio_nbytes = 1024; if (aio_read(&myaiocb) != 0) { printf("aio_read error:%s/n" , strerror(errno)); return false; }
|
(2)int aio_write(structaiocb *aiocbp);
異步寫操作,向內核發出寫的命令,傳入的參數仍然是一個aiocb的結構,當文件描述符的O_APPEND標志位設置后,異步寫操作總是將數據添加到文件末尾。如果沒有設置,則添加到aio_offset指定的地方,比如:
| struct aiocb myaiocb; memset(&aiocb , 0x00 , sizeof(myaiocb)); myaiocb.aio_fildes = fd; myaiocb.aio_buf = new char[1024]; myaiocb.aio_nbytes = 1024; myaiocb.aio_offset = 0; if (aio_write(&myaiocb) != 0) { printf("aio_read error:%s/n" , strerror(errno)); return false; } |
(3) int aio_error(const struct aiocb *aiocbp);
如果該函數返回0,表示aiocbp指定的異步I/O操作請求完成。
如果該函數返回EINPROGRESS,表示aiocbp指定的異步I/O操作請求正在處理中。
如果該函數返回ECANCELED,表示aiocbp指定的異步I/O操作請求已經取消。
如果該函數返回-1,表示發生錯誤,檢查errno。
(4)ssize_t aio_return(struct aiocb *aiocbp);
這個函數的返回值相當於同步I/O中,read/write的返回值。只有在aio_error調用后才能被調用。
(5)int aio_cancel(int fd, struct aiocb *aiocbp);
取消在文件描述符fd上的aiocbp所指定的異步I/O請求。
如果該函數返回AIO_CANCELED,表示操作成功。
如果該函數返回AIO_NOTCANCELED,表示取消操作不成功,使用aio_error檢查一下狀態。
如果返回-1,表示發生錯誤,檢查errno.
(6)int lio_listio(int mode, struct aiocb *restrict constlist[restrict],int nent, struct sigevent *restrict sig);
使用該函數,在很大程度上可以提高系統的性能,因為再一次I/O過程中,OS需要進行用戶態和內核態的切換,如果我們將更多的I/O操作都放在一次用戶太和內核太的切換中,減少切換次數,換句話說在內核盡量做更多的事情。這樣可以提高系統的性能。
用戶程序提供一個structaiocb的數組,每個元素表示一次AIO的請求操作。需要設置struct aiocb中的aio_lio_opcode數據成員的值,有LIO_READ,LIO_WRITE和LIO_NOP。nent表示數組中元素的個數。最后一個參數是對AIO操作完成后的通知機制的設置。
(7)設置AIO的通知機制,有兩種通知機制:信號和回調
(a).信號機制
首先我們應該捕獲SIGIO信號,對其作處理:
| struct sigaction sig_act; sigempty(&sig_act.sa_mask); sig_act.sa_flags = SA_SIGINFO; sig_act.sa_sigaction = aio_handler;
struct aiocb myaiocb; bzero( (char *)&myaiocb, sizeof(struct aiocb) ); myaiocb.aio_fildes = fd; myaiocb.aio_buf = malloc(BUF_SIZE+1); myaiocb.aio_nbytes = BUF_SIZE; myaiocb.aio_offset = next_offset;
myaiocb.aio_sigevent.sigev_notify = SIGEV_SIGNAL; myaiocb.aio_sigevent.sigev_signo = SIGIO; myaiocb.aio_sigevent.sigev_value.sival_ptr = &myaiocb;
ret = sigaction( SIGIO, &sig_act, NULL ); |
信號處理函數的實現:
| void aio_handler( int signo, siginfo_t *info, void *context ) { struct aiocb *req;
if (info->si_signo == SIGIO) { req = (struct aiocb *)info->si_value.sival_ptr;
if (aio_error( req ) == 0) { ret = aio_return( req ); } } return; } |
(b). 回調機制
需要設置:
myaiocb.aio_sigevent.sigev_notify= SIGEV_THREAD
my_aiocb.aio_sigevent.notify_function= aio_handler;
回調函數的原型:
typedef void (*FUNC_CALLBACK)(sigval_t sigval);
7.2 linux的AIO接口
7.2.1 接口介紹
linux下有aio封裝的接口API(也成為libaio)通常以io_開頭,libaio提供下面五個主要API函數:
int io_setup(int maxevents, io_context_t *ctxp);
int io_destroy(io_context_t ctx);
int io_submit(io_context_t ctx, long nr, struct iocb *ios[]);
int io_cancel(io_context_t ctx, struct iocb *iocb, struct io_event*evt);
int io_getevents(io_context_t ctx_id, long min_nr, long nr, structio_event *events, struct timespec *timeout);
和五個宏定義:
void io_set_callback(struct iocb *iocb, io_callback_t cb);
void io_prep_pwrite(struct iocb *iocb, int fd, void *buf, size_tcount, long long offset);
void io_prep_pread(struct iocb *iocb, int fd, void *buf, size_tcount, long long offset);
void io_prep_pwritev(struct iocb *iocb, int fd, const struct iovec*iov, int iovcnt, long long offset);
void io_prep_preadv(struct iocb *iocb, int fd, const struct iovec*iov, int iovcnt, long long offset);
這五個宏定義都是操作structiocb的結構體。struct iocb是libaio中很重要的一個結構體,用於表示IO,但是其結構略顯復雜,為了保持封裝性不建議直接操作其元素而用上面五個宏定義操作。
7.2.2 接口使用
(1)libaio的初始化和銷毀
觀察libaio五個主要API,都用到類型為io_context的變量,這個變量為libaio的工作空間。不用具體去了解這個變量的結構,只需要了解其相關操作。創建和銷毀libaio分別用到io_setup(也可以用io_queue_init,區別只是名字不一樣而已)和io_destroy。
int io_setup(int maxevents, io_context_t *ctxp);
int io_destroy(io_context_t ctx);
(2)libaio讀寫請求的下發和回收
a). 請求下發
libaio的讀寫請求都用io_submit下發。下發前通過io_prep_pwrite和io_prep_pread生成iocb的結構體,做為io_submit的參數。這個結構體中指定了讀寫類型、起始扇區、長度和設備標志符。
libaio的初始化不是針對一個具體設備進行初始,而是創建一個libaio的工作環境。讀寫請求下發到哪個設備是通過open函數打開的設備標志符指定。
b). 請求返回
讀寫請求下發之后,使用io_getevents函數等待io結束信號:
int io_getevents(io_context_t ctx_id, long min_nr, long nr, structio_event *events, struct timespec *timeout);
io_getevents返回events的數組,其參數events為數組首地址,nr為數組長度(即最大返回的event數),min_nr為最少返回的events數。timeout可填NULL表示無等待超時。io_event結構體的聲明為:
struct io_event
{
PADDEDptr(void *data,__pad1);
PADDEDptr(struct iocb*obj, __pad2);
PADDEDul(res, __pad3);
PADDEDul(res2, __pad4);
};
其中,res為實際完成的字節數;res2為讀寫成功狀態,0表示成功;obj為之前下發的structiocb結構體。這里有必要了解一下struct iocb這個結構體的主要內容:
iocbp->iocb.u.c.nbytes 字節數
iocbp->iocb.u.c.offset 偏移
iocbp->iocb.u.c.buf 緩沖空間
iocbp->iocb.u.c.flags 讀寫
c). 自定義字段
struct iocb除了自帶的元素外,還留有供用戶自定義的元素,包括回調函數和void *的data指針。如果在請求下發前用io_set_callback綁定用戶自定義的回調函數,那么請求返回后就可以顯示的調用該函數。回調函數的類型為:
void callback_function(io_context_t ctx, struct iocb *iocb, longres, long res2);
另外,還可以通過iocbp->data指針掛上用戶自己的數據。
注意:實際使用中發現回調函數和data指針不能同時用,可能回調函數本身就是使用的data指針。
7.2.2 示例程序
| #include <stdlib.h> #include <stdio.h> #include <libaio.h> #include <sys/stat.h> #include <fcntl.h> #include <libaio.h>
int srcfd=-1; int odsfd=-1;
#define AIO_BLKSIZE 1024 #define AIO_MAXIO 64
static void wr_done(io_context_t ctx, struct iocb *iocb, long res, long res2) { if(res2 != 0) { printf(“aio write error\n”); } if(res != iocb->u.c.nbytes) { printf( “write missed bytes expect %d got %d\n”, iocb->u.c.nbytes, res); exit(1); }
free(iocb->u.c.buf); free(iocb); }
static void rd_done(io_context_t ctx, struct iocb *iocb, long res, long res2) { /*library needs accessors to look at iocb*/ int iosize = iocb->u.c.nbytes; char *buf = (char *)iocb->u.c.buf; off_t offset = iocb->u.c.offset; int tmp; char *wrbuff = NULL;
if(res2 != 0) { printf(“aio read\n”); } if(res != iosize) { printf( “read missing bytes expect %d got %d”, iocb->u.c.nbytes, res); exit(1); }
/*turn read into write*/ tmp = posix_memalign((void **)&wrbuff, getpagesize(), AIO_BLKSIZE); if(tmp < 0) { printf(“posix_memalign222\n”); exit(1); }
snprintf(wrbuff, iosize + 1, “%s”, buf);
printf(“wrbuff-len = %d:%s\n”, strlen(wrbuff), wrbuff); printf(“wrbuff_len = %d\n”, strlen(wrbuff)); free(buf);
io_prep_pwrite(iocb, odsfd, wrbuff, iosize, offset); io_set_callback(iocb, wr_done);
if(1!= (res=io_submit(ctx, 1, &iocb))) printf(“io_submit write error\n”);
printf(“\nsubmit %d write request\n”, res); }
void main(int args,void * argv[]) { int length = sizeof(“abcdefg”); char * content = (char * )malloc(length); io_context_t myctx; int rc; char * buff=NULL; int offset=0; int num,i,tmp;
if(args<3) { printf(“the number of param is wrong\n”); exit(1); }
if((srcfd=open(argv[1],O_RDWR))<0) { printf(“open srcfile error\n”); exit(1); }
printf(“srcfd=%d\n”,srcfd);
lseek(srcfd,0,SEEK_SET); write(srcfd,”abcdefg”,length);
lseek(srcfd,0,SEEK_SET); read(srcfd,content,length);
printf(“write in the srcfile successful,content is %s\n”,content);
if((odsfd=open(argv[2],O_RDWR))<0) { close(srcfd); printf(“open odsfile error\n”); exit(1); }
memset(&myctx, 0, sizeof(myctx)); io_queue_init(AIO_MAXIO, &myctx);
struct iocb *io = (struct iocb*)malloc(sizeof(struct iocb)); int iosize = AIO_BLKSIZE; tmp = posix_memalign((void **)&buff, getpagesize(), AIO_BLKSIZE); if(tmp < 0) { printf(“posix_memalign error\n”); exit(1); } if(NULL == io) { printf( “io out of memeory\n”); exit(1); }
io_prep_pread(io, srcfd, buff, iosize, offset); io_set_callback(io, rd_done); printf(“START…\n\n”);
rc = io_submit(myctx, 1, &io); if(rc < 0) printf(“io_submit read error\n”);
printf(“\nsubmit %d read request\n”, rc);
//m_io_queue_run(myctx);
struct io_event events[AIO_MAXIO]; io_callback_t cb;
num = io_getevents(myctx, 1, AIO_MAXIO, events, NULL); printf(“\n%d io_request completed\n\n”, num);
for(i=0;i<num;i++) { cb = (io_callback_t)events[i].data; struct iocb *io = events[i].obj;
printf(“events[%d].data = %x, res = %d, res2 = %d\n”, i, cb, events[i].res, events[i].res2); cb(myctx, io, events[i].res, events[i].res2); }
} |
