linux epoll機制對TCP 客戶端和服務端的監聽C代碼通用框架實現


1 TCP簡介

tcp是一種基於流的應用層協議,其“可靠的數據傳輸”實現的原理就是,“擁塞控制”的滑動窗口機制,該機制包含的算法主要有“慢啟動”,“擁塞避免”,“快速重傳”。

 2 TCP socket建立和epoll監聽實現

數據結構設計

linux環境下,應用層TCP消息體定義如下:

typedef struct TcpMsg_s
{
     TcpMsgHeader head;
     void* msg;
}TcpMsg;

其中,head表示自定義的TCP消息頭,它的定義如下:

 

//TCP消息類型,根據業務需求定義
typedef enum MSGTYPE _e { EP_REG_REQ = 0, EP_REQ_RSP = 1, }MSGTYPE;
//TCP消息頭定義的通用框架 typedef
struct TcpMsgHead_s { int len;//消息長度(用作TCP粘包處理) MSGTYPE type;//消息類型(用作接收端消息的解析) }TcpMsgHead;

socket建立C代碼

TCP客戶端和服務端都采用linux提供的epoll機制(epoll_create(),epoll_wait(),epoll_ctl())對socket實現監聽(可讀,可寫事件等)。

開源事件驅動庫lievent對socket事件的監聽也是通過對epoll事件的封裝實現的。

(1)TCP服務端socket建立C代碼

基本原理:利用linux網絡通信API(scoket(),bind(),listen())來創建服務器端socket;

代碼如下:輸入參數:localip,本地ip;port:服務端本地的監聽端口號;輸出:返回-1,表示失敗;返回>0的fd,表示socket建立成功;

 1    int TcpServer(uint32_t lcoalip, int port)
 2    {
 3        int fd;
 4        struct sockaddr_in addr;
 5 
 6        //socket建立
 7        if ((fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)  
 8        {
 9            printf("IN TcpServer() scoket created failed,errno is %d, strerror is %s\n", errno, strerror(errno));
10            return -1;
11        }  
12 
13        //設置socket為非阻塞模式
14        int flags = fcntl(fd, F_GETFL, 0);
15        fcntl(fd, F_SETFL, flags | O_NONBLOCK);
16 
17        memset(&addr, 0 , sizeof(addr));
18        addr.sin_family = AF_INET;
19        addr.sin_addr.s_addr = localip;
20        addr.sin_port = port;
21 
22        //綁定本地端口和IP
23        if (bind(fd, (struct sockaddr_in)&addr, sizeof(addr) < 0))
24        {
25            printf("IN TcpServer() bind failed,fd is%d, errno is %d, strerror is %s\n", fd, errno, strerror(errno));
26            return -1;
27        }
28 
29        if (listen(fd, 20< 0))
30        {
31            printf("IN TcpServer() listen failed,fd is%d, errno is %d, strerror is %s\n", fd, errno, strerror(errno));
32            return -1;
33        }
34     
35        //add the socket to epoll event
36        if (SubscribeFd(fd, SOCKET_EV) != 0)
{
return -1;
}
37 return fd; 38 }

 

而SubscribeFd函數功能是將socket添加到epoll的監聽事件中

實現如下:

輸入參數:fd,待監聽的fd;type,枚舉型變量,表明TCP類型,是客戶端還是服務端;port:服務端的監聽端口號;輸出:返回-1,表示監聽失敗;返回0,表示將該socket成功添加到維護在全局變量g_epoll(TCP_EPOLL類型結構體)中的監聽事件中;其中TCP_TYPE枚舉變量和TCP_EPOLL結構體的定義如下:

typedef enum
{
     CLIENT = 0,
     SERVER = 1,
}TCP_TYPE;
 
#define MAX_NUM_EPOLL 1000//最多可監聽的socket數目
typedef struct TCP_EPOLL_s
{
     struct epoll_event* p_event;
     int nb_evnet;
int nb_client;//for tcp server
int epoll_fd; int sock_listen;//for tcp server int sock[MAX_NUM_EPOLL]; TCP_NL_MSG* p_tcp_nl_msg;//TCP粘包處理數據結構 }TCP_EPOLL;

SubscribeFd函數實現如下:

int SubscribeFd (int fd, TCP_TYPE type)
{
    struct epoll_event event;
     
    if (CLIENT == type)
    {
        event.events = EPOLLOUT | EPOLLET;//監聽類型為可寫事件
    }
    else if (SERVER == type)
    {
        event.events = EPOLLIN | EPOLLET;//監聽類型為可讀事件
    }

    event.date.u64 = 0;
    evnet.data.fd = fd;

    g_epoll.nb_event++;
    g_epoll.p_event = realloc(g_epoll.p_event, g_epoll.nb_event * sizeof(struct epoll_event));

    //add epoll control event 
    if (epoll_ctl(g_epoll.epoll_fd, EPOLL_CTL_ADD, fd, &event) != 0)
    {
        printf("epoll_ctl failed for fd %d, errno is %d, strerror is %s\n", fd, errno, strerror(errno));
        return -1;
    }

    printf("successfully subscribe fd %d\n", fd);

    return 0;
}

 

(2)TCP客戶端socket建立C代碼

基本原理:利用linux網絡通信API(scoket(),connect())來創建客戶端socket;

代碼如下:輸入參數:peerip,服務端IP;localip,本地ip;port:服務端的監聽端口號;輸出:返回-1,表示失敗;返回>0的fd,表示socket建立成功;

 1 int TCPClient(uint32_t peerip,uint32_t localip,uint16_t port)
 2 {
 3      int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
 4      if (fd < 0)
 5      {
 6        printf("TCPClient() socket failed");
 7        return -1;
 8      }
 9 
10     struct sockaddr_in localaddr = {0};
11     localaddr.sin_family = AF_INET;
12     localaddr.sin_addr.s_addr = localip;
13     //localaddr.sin_port = htons(port);
14     
15     int ret = bind(fd, (struct sockaddr *)&localaddr, sizeof(localaddr));
16     if (ret < 0)
17     {
18         printf("TCPClient() bind failed localip %u", localip);
19         return -1;
20     }
21     
22     int flags = fcntl(fd, F_GETFL, 0);
23     fcntl(fd, F_SETFL, flags | O_NONBLOCK);
24 
25     struct sockaddr_in servaddr = {0};
26     servaddr.sin_family = AF_INET;
27     servaddr.sin_addr.s_addr = peerip;
28     servaddr.sin_port = htons(port);
29     
30     ret = connect(fd, (sockaddr *)&servaddr, sizeof(servaddr));
31     if(ret < 0)
32     {
33         if (errno != EINPROGRESS)
34         {
35             printf("TCPClient() connect failed, peerip %u, port %u", peerip, port);
36             return -1;
37         }
38     }
39     
40     printf("TCPClient() connect success, fd = %u,peerip %u, port %u",fd, peerip, port);
41     
42     return fd;
43 }

 

(3) TCP客戶端和服務端利用epoll_wait()實現對socket的監聽和消息的接收的通用框架

TCP服務端監聽到監聽socket有EPOLLIN事件到來時,調用int accept_fd = accept();接收此連接請求,然后服務端要利用epoll_create()為accept_fd創建新的監聽事件;

 

linux利用epoll機制實現socket事件的消息接收的C代碼(TCP接收線程的入口)如下:

 1 void tcp_thread()
 2 { 3  CreateEpoll(); 4  CreateSocketFdEpoll(g_tcp_type); 5 6 while (1) 7  { 8 //wait for a message 9  EpollRecvMsg(); 10  } 11 }


CreateEpoll函數是調用epoll_create來創建epoll事件:

1 TCP_EPOLL g_epoll;//全局Epoll變量
2 
3 //EPOLL事件的建立
4 void CreateEpoll() 5 { 6 g_epoll.epoll_fd = epoll_create1(0); 7 g_epoll.nb_event = 0; 8 }

 

CreateSocketFdEpoll函數功能為創建TCP socket和TCP粘連處理數據結構初始化:

 1 int CreateSocketFdEpoll(TCP_TYPE type)
 2 {
 3     uint32_t server_ip = inet_addr(SERVER_IP);
 4     uint32_t local_ip = inet_addr(LOCAL_IP);
 5 
 6     int fd;
 7     if (CLIENT == type)
 8     {
 9         fd = TcpClient(server_ip, SERVER_PORT, local_ip);
10         g_epoll.sock = fd;
11     }
12     else if (SERVER == type) 13 { 14 fd = TcpServer(local_ip, LOCAL_PORT); 15 g_epoll.sock_listen = fd; 16 } 17 18 g_epoll.p_tcpNLMsg = (TCP_NL_MSG)malloc(sizeof(TCP_NL_MSG)); 19 20 InitTcpNLMsg(g_epoll.p_tcpNLMsg); 21 }

InitTcpNLMsg函數是對TCP粘連處理數據結構的初始化:

1 void InitTcpNLMsg(TCP_NL_MSG* pTcpNLMsg)
2 { 3 pTcpNLMsg->g_recv_len = 0; 4 pTcpNLMsg->flag_in_NL_proc = FALSE; 5 memset(pTcpNLMsg->g_recv_buff, 0, MAX_MSG_LEN); 6 }

 其中,TCP粘包處理的數據結構設計和處理邏輯分析詳見另一篇博文:

TCP粘包處理通用框架--C代碼

EpollRecvMsg函數是調用epoll_wait()實現對Socket事件的監聽和消息的接收:

 1 void EpollRecvMsg()
 2 { 3 int epoll_ret = 0; 4 int epoll_timeout = -1; 5 6 do 7  { 8 epoll_ret = epoll_wait(g_epoll.epoll_fd, g_epoll.p_event, g_epoll.nb_event, epoll_timeout); 9 }while(epoll_ret < 0 && errno == EINTR); 10 11 if (epoll_ret < 0) 12  { 13 printf("epoll_wait failed: %s\n", strerror(errno)); 14 return; 15  } 16 17 //遍歷處理每一個當前監聽到的事件 18 for (int i=0;i<epoll_ret;++i) 19  { 20 int fd = g_epoll.p_event[i].data.fd; 21 22 if (CLIENT == g_tcp_type) 23  { 24 if (g_epoll.p_event[i].events & EPOLLOUT) //the socket is writable,socket可寫,表明服務端已accept該客戶端的connect請求 25  { 26 if (JudgeIfConnSucc(fd) == 0)//判斷TCP連接是否建立成功 27  { 28 struct epoll_event* p_ev = &(g_epoll.p_event[i]); 29 p_ev ->events = EPOLLIN | EPOLLET; 30 31  epoll_ctl(g_epoll.epoll_fd, EPOLL_CTL_MOD,fd, p_ev );//對TCP客戶端socket修改其監聽類型,由可寫改為可讀 32 33 printf("tcp_fd_client %d can be written\n", fd); 34  } 35  } 36 else if(g_epoll.p_event[i].events & EPOLLIN) //the socket is readable 37  { 38  RecvTcpMsg(fd); 39  } 40  } 41 else if (SERVER== g_tcp_type) 42 { if (g_epoll.p_event[i].events & EPOLLIN) //the socket is readable,服務端socket可讀 43  { 44 if (fd == g_epoll.sock_listen)//服務端接收到一個TCP連接請求 45  { 46 struct sockaddr s_addr; 47 socklen_t length = sizeof(struct sockaddr); 48 49 int conn_fd = accept(fd, &s_addr, &length);//服務端接收來自客戶端的連接請求 50 51 int flags = fcntl(conn_fd, F_GETFL, 0); 52 fcmt(conn_fd, F_SETFL, flags | O_NONBLOCK); 53 54 g_epoll.sock[g_epoll.nb_client++] = conn_fd; 55 56  SubscribeFd(conn_fd, SERVER);//服務端將新建立的TCP連接建立新的epoll監聽事件,並維護在全局變量中 57 58 printf("Receive a tcp conn request, conn_fd is %d\n", fd); 59  } 60 else //support multi tcp client 61  { 62  RecvTcpMsg(fd);//接收TCP消息(先進行粘包處理,然后根據消息類型進入不同的處理分支) 63  } 64  } 65  } 66  } 67 } 

 

(4)通用的TCP消息發送函數

函數實現如下:

輸入:fd,發送socket;type,業務定義的tcp消息類型;msg指針:指向待發送的消息地址;length,待發送的msg的字節數;

輸出:成功,返回發送的字節數;失敗,返回-1;

#define MAX_LEN_BUFF 65535
int
SendTcpMsg(int fd, MSGTYPE type, void* msg, int length) { uint8_t buf[MAX_LEN_BUFF]; memset(buf,0,MAX_LEN_BUFF); uint32_t bsize = 0; TcpMsgHead* head = (TcpMsgHead*)buf; bsize += sizeof(TcpMsgHead);
//將待發送消息內容拷貝到待發送緩存中 memcpy(buf
+bsize, msg, length); bsize += length;
//封裝TCP消息頭,指明消息類型(用作接收端消息的解析)和消息長度(用作TCP粘包處理) head
->type = type; head->msglen = bsize; int ret = send(fd,(const void*)buf,bsize,0); if(ret != bsize) { printf("Failed to send tcp msg,errno=%u,ret=%d, strerror is %s\n", errno, ret, strerror(errno)); return -1; } printf("Success to send tcp msg, msg type is %d\n", type); return ret; }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM