Linux socket編程(四) 簡單聊天室之epoll版


  這一篇我們用epoll改寫之前寫的簡單聊天室,Epoll是Linux內核為處理大批量句柄而作了改進的poll。

我們要用到epoll的三個函數,分別是:int epoll_create(int size);  

                                                            int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

                                                  int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

下面對要用到epoll的操作進行封裝

Epoll.h

 

#ifndef EPOLL_H
#define EPOLL_H

#include "Socket.h"
#include <sys/epoll.h>
#include <sys/resource.h>

const int MAXEPOLLSIZE=MAXCONNECTION+5;

class Epoll
{
    public:
        Epoll();
        bool Add(int fd,int eventsOption);
        //Returns the number of triggered events
        int Wait();
        bool Delete(const int eventIndex);
        int GetEventOccurfd(const int eventIndex) const;
        int GetEvents(const int eventIndex) const;

    private:
        int epollfd;
        int fdNumber;
        struct epoll_event event;
        struct epoll_event events[MAXEPOLLSIZE];
        struct rlimit rt;
};

#endif

 

Socket類的實現見我的這篇博文Linux socket編程(一) 對套接字操作的封裝

更好一點的做法是把Socket類做成一個共享函數庫

Epoll.cpp

 

#include "Epoll.h"
#include <stdio.h>
#include <stdlib.h>

Epoll::Epoll()
:fdNumber(0)
{
    //set resource limits respectively
    rt.rlim_max=rt.rlim_cur=MAXEPOLLSIZE;
    if(::setrlimit(RLIMIT_NOFILE, &rt) == -1)
    {
        perror("setrlimit");
        exit(1);
    }
    //create epoll
    epollfd=epoll_create(MAXEPOLLSIZE);
}

bool Epoll::Add(int fd,int eventsOption)
{
    //handle readable event,set Edge Triggered
    event.events=eventsOption;//EPOLLIN | EPOLLET;
    event.data.fd=fd;

    if(epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&event)<0)
        return false;

    fdNumber++;
    return true;
}

bool Epoll::Delete(const int eventIndex)
{
    if(epoll_ctl(epollfd,EPOLL_CTL_DEL,
                 events[eventIndex].data.fd,&event)<0)
        return false;
    fdNumber--;
    return true;
}

int Epoll::Wait()
{
    int eventNumber;
    eventNumber=epoll_wait(epollfd,events,fdNumber,-1);
    if(eventNumber<0)
    {
        perror("epoll_wait");
        exit(1);
    }
    return eventNumber;
}

int Epoll::GetEventOccurfd(const int eventIndex) const
{
    return events[eventIndex].data.fd;
}

int Epoll::GetEvents(const int eventIndex) const
{
    return events[eventIndex].events;
}

 

 

現在考慮如何把epol用到socket的通信中

參考了這篇博文 http://www.cnblogs.com/OnlyXP/archive/2007/08/10/851222.html

epoll有兩種觸發模式:

LT(level triggered)是缺省的工作方式,並且同時支持block和no-block socket.在這種做法中,內核告訴你一個文件描述符是否就緒了,然后你可以對這個就緒的fd進行IO操作。如果你不作任何操作,內核還是會繼續通知你 的,所以,這種模式編程出錯誤可能性要小一點。傳統的select/poll都是這種模型的代表.

ET(edge-triggered) 是高速工作方式,只支持no-block socket。在這種模式下,當描述符從未就緒變為就緒時,內核通過epoll告訴你。然后它會假設你知道文件描述符已經就緒,並且不會再為那個文件描述 符發送更多的就緒通知,直到你做了某些操作導致那個文件描述符不再為就緒狀態了(比如,你在發送,接收或者接收請求,或者發送接收的數據少於一定量時導致 了一個EWOULDBLOCK 錯誤)。但是請注意,如果一直不對這個fd作IO操作(從而導致它再次變成未就緒),內核不會發送更多的通知(only once)。

接下來我們使用邊沿觸發這種方式(ET),先看一下手冊是怎么說的(man epoll):

 

     Q9  Do I need to continuously read/write a file descriptor until EAGAIN when
using the EPOLLET flag (edge-triggered behavior) ?

A9 Receiving an event from epoll_wait(2) should suggest to you that such file
descriptor is ready for the requested I/O operation. You must consider it
ready until the next (nonblocking) read/write yields EAGAIN. When and how
you will use the file descriptor is entirely up to you.

For packet/token-oriented files (e.g., datagram socket, terminal in
canonical mode), the only way to detect the end of the read/write I/O
space is to continue to read/write until EAGAIN.

For stream-oriented files (e.g., pipe, FIFO, stream socket), the condition
that the read/write I/O space is exhausted can also be detected by
checking the amount of data read from / written to the target file
descriptor. For example, if you call read(2) by asking to read a certain
amount of data and read(2) returns a lower number of bytes, you can be
sure of having exhausted the read I/O space for the file descriptor. The
same is true when writing using write(2). (Avoid this latter technique if
you cannot guarantee that the monitored file descriptor always refers to a
stream-oriented file.)

意思大概是說當使用ET這種方式時,要不斷地對文件描訴符進行讀/寫,直至遇到EAGAIN為止。

為什么要這樣呢:

假如發送端流量大於接收端的流量 (意思是epoll所在的程序讀比轉發的socket要快),由於是非阻塞的socket,那么send()函數雖然返回,但實際緩沖區的數據並未真正發 給接收端,這樣不斷的讀和發,當緩沖區滿后會產生EAGAIN錯誤(參考man send),同時,不理會這次請求發送的數據.所以,需要封裝socket_send()的函數用來處理這種情況,該函數會盡量將數據寫完再返回,同樣對於recv函數也要進行相應的封裝。

以下是我的封裝:(英文注釋寫的不是很好,大家湊合着看吧)

 

void EpollServerSocket::SendMessage(Socket& clientSocket,const std::string& message) const
{
    while(true)
    {
        if(Socket::Send(clientSocket,message)==false)
        {
            //
            if(errno == EINTR)
                return;

            //this means the cache queue is full,
            //sleep 1 second and send again
            if(errno==EAGAIN)
            {
                sleep(1);
                continue;
            }
        }

        return;
    }
}

void EpollServerSocket::ReceiveMessage(Socket& clientSocket,std::string& message)
{
    bool done=true;

    while(done)
    {
        int receiveNumber=Socket::Receive(clientSocket,message);
        if(receiveNumber==-1)
        {
            //if errno == EAGAIN, that means we have read all data.
            //so return
if (errno != EAGAIN) { perror ("ReceiveMessage error"); DeleteClient(clientSocket.GetSocketfd()); } return; } else if(receiveNumber==0) { // End of file. The remote has closed the connection. DeleteClient(clientSocket.GetSocketfd()); } //if receiveNumber is equal to MAXRECEIVE, //maybe there is data still in cache,so it has to read again if(receiveNumber==MAXRECEIVE) done=true; else done=false; } }

 

好了接下來是Socket類的派生類,EpollServerSocket類

EpollServerSocket.h

#ifndef EPOLLSERVERSOCKET_H
#define EPOLLSERVERSOCKET_H

#include "Socket.h"
#include "Epoll.h"
#include <map>

class EpollServerSocket:public Socket
{
    public:
        EpollServerSocket(const int port);
        virtual ~EpollServerSocket();

        void Run();

    private:
        //when using the EPOLLET flag,
        //need to continuously read/write a file descriptor until EAGAIN,
        //so we write these two functions for read/write
        void SendMessage(Socket& clientSocket,const std::string& message) const;
        void ReceiveMessage(Socket& clientSocket,std::string& message);

        void ProcessMessage(Socket& clientSocket);
        void SendToAllUsers(const std::string& message) const;
        //add event to epoll
        bool AddNewClient(Socket& clientSocket);
        //delete client from map clientSockets
        void DeleteClient(int sockfd);
        std::map<int,Socket*> clientSockets;
        Epoll epoll;
};

#endif

以下是EpollServerSocket類的實現

View Code
#include "EpollServerSocket.h"
#include "SocketException.h"
#include <iostream>
#include <errno.h>
#include <stdio.h>

#define DEBUG

EpollServerSocket::EpollServerSocket(const int port)
{
      if ( ! Socket::Create() )
        {
          throw SocketException ( "Could not create server socket." );
        }

      if ( ! Socket::Bind ( port ) )
        {
          throw SocketException ( "Could not bind to port." );
        }

      if ( ! Socket::Listen() )
        {
          throw SocketException ( "Could not listen to socket." );
        }

      //set listener socket non-blocking!!
      Socket::SetNonBlocking(true);

}

EpollServerSocket::~EpollServerSocket()
{
    std::map<int,Socket*>::iterator it;
    for(it=clientSockets.begin();it!=clientSockets.end();it++)
        delete it->second;
}

void EpollServerSocket::Run()
{
    //add listener socketfd to epoll
    if(epoll.Add(Socket::GetSocketfd(),EPOLLIN)==false)
        return;

    int i;
    int eventNumber;
    Socket* clientSocket;

    while(true)
    {
        eventNumber=epoll.Wait();

        #ifdef DEBUG
            std::cout<<"eventNumbers: "<<eventNumber<<" ";
        #endif

        for(i=0;i<eventNumber;i++ )
        {
             if ((epoll.GetEvents(i) & EPOLLERR) ||
                 (epoll.GetEvents(i) & EPOLLHUP) ||
                 (!(epoll.GetEvents(i) & EPOLLIN)))
            {
                      /* An error has occured on this fd, or the socket is not
                         ready for reading (why were we notified then?) */
                      perror ("epoll error\n");
                      DeleteClient(epoll.GetEventOccurfd(i));
                      continue;
            }

            //if event is triggered by listener socket
            else if(epoll.GetEventOccurfd(i)==Socket::GetSocketfd())
            {
                clientSocket=new Socket();
                if(AddNewClient(*clientSocket)==false)
                    return;
                clientSockets[clientSocket->GetSocketfd()]=clientSocket;

            }
            //else event is triggered by client sockets
            else
            {
                clientSocket=clientSockets[epoll.GetEventOccurfd(i)];
                ProcessMessage(*clientSocket);
            }
        }
    }
}

void EpollServerSocket::ProcessMessage(Socket& clientSocket)
{
    std::string message;
    ReceiveMessage(clientSocket,message);

    if(message=="exit")
    {
        SendMessage(clientSocket,"user_exit");

        DeleteClient(clientSocket.GetSocketfd());
    }
    else
        SendToAllUsers(message);
}

bool EpollServerSocket::AddNewClient(Socket& clientSocket)
{
    if(Socket::Accept(clientSocket)==false)
        return false;

    //set socket non-blocking!!
    clientSocket.SetNonBlocking(true);

    if(epoll.Add(clientSocket.GetSocketfd(),EPOLLIN | EPOLLET)==false)
        return false;

    #ifdef DEBUG
        std::cout<<"New user...\n";
    #endif

    return true;
}

void EpollServerSocket::DeleteClient(int sockfd)
{
    //epoll doesn't need to handle events from sockfd anymore
    epoll.Delete(sockfd);

    delete clientSockets[sockfd];
    clientSockets.erase(sockfd);
}

void EpollServerSocket::SendToAllUsers(const std::string& message) const
{
    std::map<int,Socket*>::const_iterator it;
    for(it=clientSockets.begin();it!=clientSockets.end();it++)
        SendMessage(*(it->second),message);
}

void EpollServerSocket::SendMessage(Socket& clientSocket,const std::string& message) const
{
    while(true)
    {
        if(Socket::Send(clientSocket,message)==false)
        {
            //this means the socket can be wrote
            if(errno == EINTR)
                return;

            //this means the cache queue is full,
            //sleep 1 second and send again
            if(errno==EAGAIN)
            {
                sleep(1);
                continue;
            }
        }

        return;
    }
}

void EpollServerSocket::ReceiveMessage(Socket& clientSocket,std::string& message)
{
    bool done=true;

    while(done)
    {
        int receiveNumber=Socket::Receive(clientSocket,message);
        if(receiveNumber==-1)
        {
            //if errno == EAGAIN, that means we have read all data.
            if (errno != EAGAIN)
            {
                perror ("ReceiveMessage error");
                DeleteClient(clientSocket.GetSocketfd());
            }
            return;
        }
        else if(receiveNumber==0)
        {
            // End of file. The remote has closed the connection.
            DeleteClient(clientSocket.GetSocketfd());
        }

        //if receiveNumber is equal to MAXRECEIVE,
        //maybe there is data still in cache,so it has to read again
        if(receiveNumber==MAXRECEIVE)
            done=true;
        else
            done=false;
    }
}

(以前寫的客戶端不用更改,直接可以與這個服務器通信)

對於大數據量的傳輸,很明顯要不斷地進行讀/寫,這樣就會出現長時間的阻塞,甚至成為系統的性能瓶頸

但是對於只有較少活躍的socket,同時數據量較小的情況,epoll的效率應該是比select和poll高的(呃,不過沒有很好的測試過)

不過好像有一種做法可以避免阻塞,就是利用EPOLLOUT事件

“EPOLLOUT事件的意思就是 當前這個socket的發送狀態是空閑的,此時處理能力很強,告知用戶可以發送數據。
所以在正常情況下,基本上socket在epoll_wait后,都會得到一個socket的EPOLLOUT事件。

【如果你不是一直在寫數據或者你不是在傳送一個幾百M的數據文件,send一半都處於空閑狀態】
而這個特性剛好可以處理 阻塞問題。
當數據發送不出去的時候,說明網絡阻塞或者延遲太厲害了。
那么將要發送的數據放在一個buffer中,當下次你發現了EPOLLOUT事件時,說明現在網絡處於空閑狀態,OK,此時你可以用另外一個線程來發送上次堆積在buffer中的數據了。這樣就不會阻塞了“

本文為原創博文,轉載請注明原作者博客地址:http://www.cnblogs.com/-Lei/archive/2012/09/12/2681475.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM