信號驅動式I/O是指進程預先告知內核,使得當某個描述符上發生某事時,內核使用信號通知相關進程。
異步I/O是進程執行I/O系統調用(讀或寫)告知內核啟動某個I/O操作,內核啟動I/O操作后立刻返回到進程,進程在I/O操作發生期間繼續執行,當操作完成或遭遇錯誤時,內核以進程在I/O系統調用中指定的某種方式通知進程,
對一個套接字使用信號驅動式I/O
- 建立SIGIO信號的信號處理函數。
- 設置該套接字的屬主,通常使用fcntl的F_SETOWN命令設置。(因該在設置套接字屬主之前建立信號處理函數,因為在調用fcntl后調用signal之前有較小的機會產生SIGIO信號,此時信號被丟棄)
- 開啟該套接字的信號驅動式I/O,通常通過使用fcntl的F_SETFL命令打開O_ASYNC標志完成。
UDP套接字的SIGIO信號
在UDP上使用信號驅動式I/O是簡單的。SIGIO信號在發生以下事件時產生:
- 數據報到達套接字
- 套接字上發生異步錯誤
因此當捕獲對於某個UDP套接字的SIGIO信號時,我們調用recvfrom或者讀入到達的數據報,或者獲取發生的異步錯誤(發生異步錯誤的前提是udp套接字已連接)
TCP套接字的SIGIO信號
不幸的是,信號驅動式I/O對於TCP套接字近乎無用。問題在於該信號產生的過於頻繁,並且它的出現並沒有告訴我們發生了什么事情。下列條件均導致對於一個TCP套接字產生SIGIO信號(假設該套接字的信號驅動式I/O已經開啟):
- 監聽套接字上某個連接請求已經完成
- 某個斷連請求已經發起
- 某個斷連請求已經完成
- 某個連接之半已經關閉
- 數據到達套接字
- 數據已經從套接字發送走
- 發生某個異步錯誤
如果一個進程既讀又寫一個tcp套接字(此時應該設置成非阻塞套接字,防止read或write阻塞),那么當有新數據到達時或數據寫出前SIGIO信號均會產生,而且信號處理函數無法區分這兩種情況。
應該只對監聽TCP套接字使用SIGIO,因為對於監聽套接字產生SIGIO的唯一條件是某個新的連接已完成。
使用SIGIO的UDP回射服務器程序
當一個新數據報到達時,SIGIO處理函數讀入該數據報,同時記錄它到達的時刻,然后將它置於進程內核的另一個隊列中,以便服務器循環移走並處理(下圖)
client:
#include<stdio.h> #include<sys/types.h> #include<sys/socket.h> #include<unistd.h> #include<stdlib.h> #include<errno.h> #include<arpa/inet.h> #include<netinet/in.h> #include<string.h> #include<signal.h> #include <fcntl.h> #define MAXLINE 1024 #define SERV_PORT 3333 #define ERR_EXIT(m) \ do { \ perror(m); \ exit(EXIT_FAILURE); \ } while (0) typedef struct sockaddr SA; void dg_cli(FILE *fp, int sockfd, const SA *pservaddr, socklen_t servlen) { int n; char sendline[MAXLINE], recvline[MAXLINE + 1]; while (fgets(sendline, MAXLINE, fp) != NULL) { sendto(sockfd, sendline, strlen(sendline), 0, pservaddr, servlen); n = recvfrom(sockfd, recvline, MAXLINE, 0, NULL, NULL); recvline[n] = 0; /* null terminate */ fputs(recvline, stdout); } } int main(int argc, char **argv) { int sockfd; struct sockaddr_in servaddr; if (argc != 2) ERR_EXIT("usage: udpcli <IPaddress>"); bzero(&servaddr, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(SERV_PORT); inet_pton(AF_INET, argv[1], &servaddr.sin_addr); sockfd = socket(AF_INET, SOCK_DGRAM, 0); dg_cli(stdin, sockfd, (SA *) &servaddr, sizeof(servaddr)); exit(0); }
server:
#include "unp.h" static int sockfd; #define QSIZE 8 /* size of input queue */ #define MAXDG 4096 /* max datagram size */ typedef struct { void *dg_data; /* ptr to actual datagram */ size_t dg_len; /* length of datagram */ struct sockaddr *dg_sa; /* ptr to sockaddr{} w/client's address */ socklen_t dg_salen; /* length of sockaddr{} */ } DG; static DG dg[QSIZE]; /* queue of datagrams to process */ static long cntread[QSIZE+1]; /* diagnostic counter */ static int iget; /* next one for main loop to process */ static int iput; /* next one for signal handler to read into */ static int nqueue; /* # on queue for main loop to process */ static socklen_t clilen;/* max length of sockaddr{} */ static void sig_io(int); static void sig_hup(int); void dg_echo(int sockfd_arg, SA *pcliaddr, socklen_t clilen_arg) { int i; const int on = 1; sigset_t zeromask, newmask, oldmask; sockfd = sockfd_arg; clilen = clilen_arg; for (i = 0; i < QSIZE; i++) { /* init queue of buffers */ dg[i].dg_data = malloc(MAXDG); dg[i].dg_sa = malloc(clilen); dg[i].dg_salen = clilen; } iget = iput = nqueue = 0; signal(SIGHUP, sig_hup); signal(SIGIO, sig_io); fcntl(sockfd, F_SETOWN, getpid()); ioctl(sockfd, FIOASYNC, &on); ioctl(sockfd, FIONBIO, &on); sigemptyset(&zeromask); /* init three signal sets */ sigemptyset(&oldmask); sigemptyset(&newmask); sigaddset(&newmask, SIGIO); /* signal we want to block */ sigprocmask(SIG_BLOCK, &newmask, &oldmask); for ( ; ; ) { while (nqueue == 0) sigsuspend(&zeromask); /* wait for datagram to process */ /* 4unblock SIGIO */ sigprocmask(SIG_SETMASK, &oldmask, NULL); sendto(sockfd, dg[iget].dg_data, dg[iget].dg_len, 0, dg[iget].dg_sa, dg[iget].dg_salen); if (++iget >= QSIZE) iget = 0; /* 4block SIGIO */ sigprocmask(SIG_BLOCK, &newmask, &oldmask); nqueue--; } } static void sig_io(int signo) { ssize_t len; int nread; DG *ptr; for (nread = 0; ; ) { if (nqueue >= QSIZE) ERR_EXIT("receive overflow"); ptr = &dg[iput]; ptr->dg_salen = clilen; len = recvfrom(sockfd, ptr->dg_data, MAXDG, 0, ptr->dg_sa, &ptr->dg_salen); if (len < 0) { if (errno == EWOULDBLOCK) break; /* all done; no more queued to read */ else ERR_EXIT("recvfrom error"); } ptr->dg_len = len; nread++; nqueue++; if (++iput >= QSIZE) iput = 0; } cntread[nread]++; /* histogram of # datagrams read per signal */ } static void sig_hup(int signo) { int i; for (i = 0; i <= QSIZE; i++) printf("cntread[%d] = %ld\n", i, cntread[i]); }
已收取數據報隊列
SIGIO信號處理函數把到達的數據報放入一個隊列。該隊列是一個DG結構數組,我們把它作為一個環形緩沖區處理。每個DG結構包括指向所收取數據報的一個指針,該數據報的長度,指向含有客戶協議地址的某個套接字地址結構的一個指針,該協議地址的大小。靜態分配QSIZE個DG結構,dg_echo函數調用malloc動態分配所有數據報和套接字地址結構的內存空間。我們還分配一個診斷用計數器cntread。下圖展示了這個DG結構數組,其中假設第一個元素指向一個150字節的數據報,與它關聯的套接字地址結構長度為16.
數組下標
iget是主循環將處理的下一個數組元素的下標,iput是信號處理函數將存放到的下一個數組元素的下標,nqueue是隊列中供主循環處理的數據報的總數。
初始化已接收數據報隊列
把套接字描述符保存在一個全局變量中,因為信號處理函數需要它。初始化已接收數據報隊列。
建立信號處理函數並設置套接字標志
為SIGHUP(用於診斷目的)和SIGIO建立信號處理函數。使用fcntl設置套接字的屬主,使用ioctl設置信號驅動和非阻塞式I/O標志。
初始化信號集
初始化三個信號集:zeromask(從不改變),oldmask(記錄我們阻塞SIGIO時原來的信號掩碼)和newmask。使用sigaddset打開newmask中與SIGIO對應的位。
阻塞SIGIO並等待有事可做
調用sigprocmask把進程的當前信號掩碼保存到oldmask中,然后把newmask邏輯或到當前信號掩碼。這將阻塞SIGIO並返回當前信號掩碼。接着進入for循環,並測試nqueue計數器。只要該計數器為0,進程就無事可做,這時我們可以調用sigsuspend。該POSIX函數先內部保存當前信號掩碼,再把當前信號掩碼設置為它的參數(zeromask)。既然zeromask是一個空信號集,因而所有信號都被開通。sigsuspend在進程捕獲一個信號並且該信號的處理函數返回之后才返回。(它是一個不尋常的函數,因為它總是返回EINTR錯誤)。在返回之前sigsuspend總是把當前信號掩碼恢復為調用時刻的值,在本例中就是newmask的值,從而確保sigsuspend返回之后SIGIO繼續被阻塞。這時我們可以測試計數器nqueue的理由,因為我們知道測試它時SIGIO信號不可能被遞交。
解阻塞SIGIO並發送應答
調用sigprocmask把進程的信號掩碼設置為先前保存的值(oldmask),從而解除SIGIO的阻塞。然后調用sendto發送應答。遞增iget下標,若其值等於DG結構數組元素數目則將其值置回0。因為我們把該數組作為環形緩沖區對待。注意:修改iget時我們不必阻塞SIGIO,因為只有主循環使用這個下標,信號處理函數從不改動它。
阻塞SIGIO
阻塞SIGIO,遞減nqueue。修改nqueue時我們必須阻塞SIGIO,因為它是主循環和信號處理函數共同使用的變量。我們在循環頂部測試nqueue時也需要SIGIO阻塞着。
我們也可以去掉for循環內的兩個sigprocmask調用,整個循環期間SIGIO一直阻塞,從而降低了信號處理函數的及時性,數據報不應該因此變動而丟失(假設套接字緩沖區足夠大),但是SIGIO信號向進程的遞交將在整個阻塞期間一直被拖延。
然而當一個數據報到達導致SIGIO被遞交,它的信號處理函數讀入該數據報並把它放到供主循環讀取的隊列中,然而在信號處理函數執行期間,另有兩個數據包到達,這一點意味着如果我們在信號處理函數執行(期間確保該信號被阻塞),期間該信號又發生了2次,那么它實際只被遞交1次。讓我們考慮下述情形。一個數據報到達導致SIGIO被遞交。它的信號處理函數讀入該數據報並把它放到供主循環讀取的隊列中。然而在信號處理函數執行期間,另有兩個數據報到達,導致SIGIO再產生兩次。由於SIGIO被阻塞,當他的信號處理函數返回時,該處理函數僅僅再被調用一次。該信號處理函數的第二次執行讀入第二個數據報,第三個數據報則仍然留在套接字接收隊列中。第三個數據報被讀入的前提條件時由第四個數據報到達。當第四個數據報到達時,被讀入並放到供主循環讀取的隊列中的是第三個而不是第四個數據報。
既然信號時不排隊的,開啟信號驅動式I/O的描述符通常也被設置為非阻塞式。這個前提下,我們把SIGIO信號處理函數編寫成在一個循環中執行讀入操作,知道該操作返回EWOULDBLOCK時菜結束循環。
檢查隊列溢出
如果DG結構數組隊列已滿,進程就終止。
讀入數據報
在非阻塞套接字上調用recvfrom。下標為iput的數組元素用於存放讀入的數據報。如果沒有可讀的數據報,那就break出for循環。
遞增計數器和下標
nread是一個計量每次信號遞交讀入數據報數目的診斷計數器。nqueue是有待主循環處理的數據報數目。
在信號處理函數返回之前,遞增與每次信號遞交讀入數據報數目對應的計數器。當SIGHUP信號被遞交時。