摘要:
本文將討論UDP的並發實現機制。給出了兩種實現方法。第一種是最為常見的,TFTP傳輸的方式。
第二種是對UDP進一步封裝,以達到並發的可能。主要是采用隊列、多線程的方法。后面會給出一個簡單的實現例子,以供大家參考。功能方面較為簡單,以后會慢慢完善。
眾所周知,通常所見的的TCP服務器都是並發實現的,即服務同時處理多個請求,
而不是等待前一個完成再處理下一個請求,這個實現得益於TCP的listen()與connect()的分工處理機制。
而對於 UDP 沒有這種監聽和連接機制,所以它必須等待前一處理完成才能繼續處理下一個客戶的請求。
但並不是說UDP實現並發服務器是不可能的,只是與上面的實現稍有不同。
UDP服務器並發的兩種方法:
一、比較常用的處理方法是:
服務器(知名端口)等待一下客戶的到來,當一個客戶到來后,記下其IP和port,然后同理,
服務器fork一個子進程,建立一個socket再bind一個隨機端口,然后建立與客戶的連接,
並處理該客戶的請求。父進程繼續循環,等待下一個客戶的到來。在tftpd中就是使用這種技術的。
大概的實現如下:
for ( ; ; )
{
/* 等待新的客戶端連接 */
recvform( &from_addr)
/* 創建一個新的進程,由該進程去處理 */
if (fork() == 0)
break; //子進程跳出循環
}
//child now here
peer = socket(AF_INET, SOCK_DGRAM, 0);
//綁定一個隨機端口
myaddr.sin_port = htons(0);
bind(peer,(struct sockaddr *)&myaddr, /
sizeof myaddr)
/*
把這個套接字跟客戶端的地址連接起來
這也就意味之后之后套接字使用 send recv這些函數時
都是直接跟指定的客戶端進行通信的
*/
connect(peer, (struct sockaddr *)&from, sizeof from)
以上方式簡單實用,但是每來個客戶端都需要創建一個新的 socket,為每個客戶端分配一個新的臨時端口,然后客戶端,之后的通信需要跟新的端口進行數據傳輸。
二、如果對上述不滿意。我們可以采用新的策略。對UDP進行封裝,以此實現類型TCP的功能。 我們來看下一個簡單 TCP 服務器的原型:12
int main()
{
/* 初始化socket套接字 */
sockfd = init_socket(); /* 開始監聽 */
if(listen(sock_fd, BACKLOG) == -1)
{ perror("listen is error\r\n");
exit(1);
} while(1)
{ /* 等待新的客戶端連接 */
if((new_fd = accept(sock_fd, (struct sockaddr *)&their_addr, &sin_size)) == -1)
{ perror("accept");
continue;
} /* fork出一個進程,由該進程去處理這個連接 */
if(!fork())
{ } } }
我們封裝出幾個跟上面的TCP相似的函數接口。使用這些接口,可以很簡單寫出一個UDP並發服務器。例如:
/* 主函數 */
int main(int argc, char *argv[])
{
/* 定義一個listen指針。該結構體是自己定義的 */
struct listen *_listen;
/* 初始化socket,這個初始化過程跟普通的UDP初始化 socket套接字一樣 */
sockfd = init_socket(); /*
開始監聽這個socket. 最大的連接數為10,也就是說最多只有10個客戶端
封裝好的一個函數,功能有點類似於 TCP協議中的 listen 函數
*/
server_listen(&sockfd, 10);
while(1)
{ /*
獲得一個連接。類似於TCP的 accept 函數
需要注意的是,如果沒有連接, server_accept 函數將進入休眠狀態,直到有一個新的客戶端數據
客戶端只有在第一次發生數據過來的時候,才會創建一個新的 listen ,並喚醒 server_accept 函數
之后,這個客戶端的所有數據都將發送到 這個新的 listen 的數據隊列中。
所以。通過這個 listen ,我們可以創建一個進程,由該進程去處理這個客戶端之后的請求
這里,listen 有點像 TCP 協議中的 accept 函數新建的 sockfd
*/
_listen = server_accept(); /*
雖然說 server_accept 會進入休眠,但是仍然會被其它信號喚醒,所以要做個判斷
判斷下是否為 NULL 。為 NULL 則說明沒有新的連接
*/
if(_listen == NULL){
continue;
} printf("new client \r\n");
/*
啟動一個 listen_phread 線程,並且,由該線程去處理這個連接
類似於TCP 的fork
*/
listen_pthread(_listen, listen_phread); } }
listen_phread 線程簡單實現:
void *listen_phread(void *pdata)
{
int ret;
char buf[1204];
struct sockaddr_in clientaddr;
/* 獲得 listen */
struct listen *_listen;
_listen = (struct listen *)pdata; while(1)
{ /*
recv_from_listen 也是一個封裝好的函數,功能是從這個 lsiten 中獲取數據
最后一個參數表示無數據時休眠的時間
-1 表示永久休眠。知道有數據為止
*/
ret = recv_from_listen(_listen, &clientaddr, buf, 1204, -1);
if(ret == -1)
{ printf("%p recv is err \r\n", _listen);
}else{
printf("%p recv %d byte data is [%s]\r\n", _listen, ret, buf);
if((ret = sendto(sockfd, buf, ret, 0, (struct sockaddr *)(&(_listen->addr)),
sizeof(struct sockaddr))) == -1)
{ perror("sendto :");
} printf("sento [%s]\r\n", buf);
} } /* 關閉連接,會釋放內存,注意,一個listen 被創建后,需要使用這個函數釋放內存 */
listen_close(_listen); }
lsiten 結構體原型:
12
struct listen{
struct sockaddr addr; /* 數據包地址信息 */
int data_num; /* 數據包數量 */
int list_flg; /* 是否已經被監聽了 */
pthread_mutex_t mutex; /* 線程鎖 */
/* 這兩個條件變量相關的 */
pthread_mutex_t recv_mtx;
pthread_cond_t recv_cond;
struct list_head head; /* 數據包隊列 */
struct list_head listen_list; /*接收的線程隊列 */
};實現原理: 這個接口函數是基於隊列、多線程實現的。這里簡單地說下原理,稍后有時間我會對代碼進一步分析1. listen 隊列:
系統會創建一個隊列,該隊列的成員為一個 listen ,每個 listen 的 addr 元素會記錄下自己要接收的 客戶端。 之后,server_listen 創建一個線程,由該線程去接收數據。 接收到網絡數據后,會遍歷 listen 鏈表,找到一個想要接收這個數據的 listen 。 如果沒有,會創建一個新的 listen ,並將這個 listen 加入到 listen 隊列中去2 數據包隊列
找到 listen 后,每個 listen 其實就是一個 數據包隊列頭。系統會把數據放到 這個 listen 數據包隊列中去 然后喚醒 recv_from_listen 也就是說,系統的隊列結構如下listen 隊列 listen(1) -> listen(2) -> listen(3) -> listen(4) -> .......
| | | data(1) data data
| | data(1) data
每個listen本身就是一個數據包隊列頭recv_from_listen 函數會試圖去從一個 listen 的數據包隊列中獲取數據,如果沒有數據,則進入休眠狀態。
lsiten 結構體原型:
12
struct listen{
struct sockaddr addr; /* 數據包地址信息 */
int data_num; /* 數據包數量 */
int list_flg; /* 是否已經被監聽了 */
pthread_mutex_t mutex; /* 線程鎖 */
/* 這兩個條件變量相關的 */
pthread_mutex_t recv_mtx;
pthread_cond_t recv_cond;
struct list_head head; /* 數據包隊列 */
struct list_head listen_list; /*接收的線程隊列 */
};實現原理: 這個接口函數是基於隊列、多線程實現的。這里簡單地說下原理,稍后有時間我會對代碼進一步分析1. listen 隊列:
系統會創建一個隊列,該隊列的成員為一個 listen ,每個 listen 的 addr 元素會記錄下自己要接收的 客戶端。 之后,server_listen 創建一個線程,由該線程去接收數據。 接收到網絡數據后,會遍歷 listen 鏈表,找到一個想要接收這個數據的 listen 。 如果沒有,會創建一個新的 listen ,並將這個 listen 加入到 listen 隊列中去2 數據包隊列
找到 listen 后,每個 listen 其實就是一個 數據包隊列頭。系統會把數據放到 這個 listen 數據包隊列中去 然后喚醒 recv_from_listen 也就是說,系統的隊列結構如下listen 隊列 listen(1) -> listen(2) -> listen(3) -> listen(4) -> .......
| | | data(1) data data
| | data(1) data
每個listen本身就是一個數據包隊列頭recv_from_listen 函數會試圖去從一個 listen 的數據包隊列中獲取數據,如果沒有數據,則進入休眠狀態。
主要函數接口:
void listen_head_init(struct list_head *head)
初始化一個 鏈表頭
int listen_add(struct list_head *head, listen_t *listen)
將要監聽的 listen 添加到這個鏈表頭
recv_from_listen_head
從鏈表中獲取數據12345678
示例:
//我們創建兩個 listen_head
struct list_head poll_head_1, poll_head_2; int main(int argc, char *argv[])
{ int poll_num = 0;
struct listen *_listen;
/* 初始化socket */
sockfd = init_socket(); /*
開始監聽這個socket. 運行最大的連接數為10
該函數類似於TCP協議中的 int listen(SOCKET sockfd, int backlog)
*/
server_listen(&sockfd, 10);
/* 初始化這個poll 機制 */
listen_head_init(&poll_head_1); listen_head_init(&poll_head_2); while(1)
{ /* 獲得一個連接。類似於TCP的
int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
*/
_listen = server_accept(); if(_listen == NULL){
continue;
} printf("new client \r\n");
if(poll_num < 5)
{ /* 前面五個連接者添加到 poll_head_1 */
poll_num ++; listen_add(&poll_head_1, _listen); }else{
/* 添加到 poll_head_2 */
poll_num ++; listen_add(&poll_head_2, _listen); } } }```
然后我們就可以從中兩個 listen_head 中讀取數據了
```
while(1)
{ /*
從 poll_head_1 中讀取數據。
此時,前面五個 listen 被掛鈎到這個 poll_head_1,所以這五個listen中任何一個有了數據
recv_from_listen_head 都會返回,而且將 _listen 指向這個 listen
這樣,我們就可以知道是哪個listen有數據了
*/
ret = recv_from_listen_head(poll_head_1, &_listen, (struct sockaddr *)&clientaddr, buf, 1204, -1);
if(ret == -1)
{ printf("%p recv is err \r\n", _listen);
}else{
printf("__ poll %p recv %d byte data is [%s]\r\n", _listen, ret, buf);
if((ret = sendto(sockfd, buf, ret, 0, (struct sockaddr *)(&(_listen->addr)),
sizeof(struct sockaddr))) == -1)
{ perror("sendto :");
} printf("sento [%s]\r\n", buf);
} }