Linux中epoll+線程池實現高並發


服務器並發模型通常可分為單線程和多線程模型,這里的線程通常是指“I/O線程”,即負責I/O操作,協調分配任務的“管理線程”,而實際的請求和任務通常交由所謂“工作者線程”處理。通常多線程模型下,每個線程既是I/O線程又是工作者線程。所以這里討論的是,單I/O線程+多工作者線程的模型,這也是最常用的一種服務器並發模型。我所在的項目中的server代碼中,這種模型隨處可見。它還有個名字,叫“半同步/半異步“模型,同時,這種模型也是生產者/消費者(尤其是多消費者)模型的一種表現。

這種架構主要是基於I/O多路復用的思想(主要是epoll,select/poll已過時),通過單線程I/O多路復用,可以達到高效並發,同時避免了多線程I/O來回切換的各種開銷,思路清晰,易於管理,而基於線程池的多工作者線程,又可以充分發揮和利用多線程的優勢,利用線程池,進一步提高資源復用性和避免產生過多線程。

瓶頸在於IO密集度。
線程池你開10個線程當然可以一上來全部accept阻塞住,這樣客戶端一連上來便會自動激活一個線程去處理,但是設想一下,如果10個線程全部用掉了,第11個客戶端就會發生丟棄。這樣為了實現”高並發“你得不斷加大線程池的數量。這樣會帶來嚴重的內存占用和線程切換的時延問題。
於是前置事件輪詢設施的方案就應運而生了,
主線程輪詢負責IO,作業交給線程池。
在高並發下,10W個客戶端上來,就主線程負責accept,放到隊列中,不至於發生沒有及時握手而丟棄掉連接的情況發生,而作業線程從隊列中認領作業,做完回復主線程,主線程負責write。這樣可以用極少的系統資源處理大數量連接。
在低並發下,比如2個客戶端上來,也不會出現100個線程hold住在那從而發生系統資源浪費的情況。

正確實現基本線程池模型的核心:
主線程負責所有的 I/O 操作,收齊一個請求所有數據之后如果有必要,交給工作線程進行處理 。處理完成之后,把需要寫回的數據還給主線程去做寫回 / 嘗試寫回數據直到阻塞,然后交回主線程繼續。
這里「如果有必要」的意思是:經過測量,確認這個處理過程中所消耗的 CPU 時間(不包括任何 I/O 等待,或者相關的 I/O 等待操作無法用 epoll 接管)相當顯著。如果這個處理過程(不包含可接管的 I/O 操作)不顯著,則可以直接放在主線程里解決。
這個「必要」與否的前提不過三個詞:假設,分析,測量。

所以,一個正確實現的線程池環境鍾,用 epoll + non-blocking I/O 代替 select + blocking I/O 的好處是,處理大量 socket 的時候,前者效率比后者高,因為前者不需要每次被喚醒之后重新檢查所有 fd 判斷哪個 fd 的狀態改變可以進行讀寫了。

實現單I/O線程的epoll模型是本架構的第一個技術要點,主要思想如下: 

單線程創建epoll並等待,有I/O請求(socket)到達時,將其加入epoll並從線程池中取一個空閑工作者線程,將實際的業務交由工作者線程處理。

以上摘自:https://www.cnblogs.com/cthon/p/9139384.html

創建一個epoll實例;
while(server running)
{
    epoll等待事件;
    if(新連接到達且是有效連接)
    {
        accept此連接;
        將此連接設置為non-blocking;
       為此連接設置event(EPOLLIN | EPOLLET ...);
        將此連接加入epoll監聽隊列;
        從線程池取一個空閑工作者線程並處理此連接;
    }
    else if(讀請求)
    {
        從線程池取一個空閑工作者線程並處理讀請求;
    }
    else if(寫請求)
    {
        從線程池取一個空閑工作者線程並處理寫請求;
    }
    else
        其他事件;     
}

剛學線程池,若有誤請大家指出(可聯系我,下有郵箱):

服務器代碼:

lock.h

/*************************************************************************
    > File Name: lock.h
    > Author: gushi
    > Mail: 971859774@qq.com 
    > Created Time: 2018年11月22日 星期四 20時06分55秒
 ************************************************************************/

#ifndef LOCK_H
#define LOCK_H
#include <iostream>
#include <pthread.h>
#include <semaphore.h>

using namespace std;

class Sem
{
    private:
        sem_t sem;

    public:
        Sem();
        ~Sem();
        bool wait();
        bool post();
};

Sem::Sem()
{
    if(sem_init(&sem,0,0)!=0)//信號量的初始值和和基於內存的信號量
        cerr<<"sem init error."<<endl;
}

Sem::~Sem()
{
    sem_destroy(&sem);
}

bool Sem::wait()
{
    return sem_wait(&sem)==0?true:false;
}

bool Sem::post()
{
    return sem_post(&sem)==0?true:false;
}

//互斥類
class Mutex
{
    private:
        pthread_mutex_t mutex;

    public:
        Mutex();
        ~Mutex();
        bool mutex_lock();
        bool mutex_unlock();
};

Mutex::Mutex()
{
    if(pthread_mutex_init(&mutex,NULL)!=0)//可用PTHRAD_MUTEX_INITIALIZER宏初始化
        cerr<<"mutex init error"<<endl;
}

Mutex::~Mutex()
{
    pthread_mutex_destroy(&mutex);
}

bool Mutex::mutex_lock()
{
    return pthread_mutex_lock(&mutex)==0?true:false;
}

bool Mutex::mutex_unlock()
{
    return pthread_mutex_unlock(&mutex)==0?true:false;
}

//條件變量的類
class Cond
{
    private:
        pthread_mutex_t mutex;
        pthread_cond_t cond;

    public:
        Cond();
        ~Cond();
        bool wait();
        bool signal();
        bool broadcast();
};

Cond::Cond()
{
    if(pthread_mutex_init(&mutex,NULL)!=0)
    {
        cerr<<"Cond mutex init error"<<endl;
        exit(0);
    }
    if(pthread_cond_init(&cond,NULL)!=0)
    {
        cerr<<"Cond cond init error"<<endl;
        pthread_mutex_destroy(&mutex);
        exit(0);
    }
}

Cond::~Cond()
{
    pthread_mutex_destroy(&mutex);
    pthread_cond_destroy(&cond);
}

bool Cond::wait()
{
    int rs=0;
    pthread_mutex_lock(&mutex);
    rs=pthread_cond_wait(&cond,&mutex);
    pthread_mutex_unlock(&mutex);
    return rs==0?true:false;
}

bool Cond::signal()
{
    return pthread_cond_signal(&cond)==0?true:false;
}

bool Cond::broadcast()
{
    return pthread_cond_broadcast(&cond);
}

#endif

threadpool.h,還沒有實現動態增加功能,以后待更新...

/*************************************************************************
    > File Name: threadpool.h
    > Author: gushi
    > Mail: 971859774@qq.com 
    > Created Time: 2018年11月22日 星期四 20時32分12秒
 ************************************************************************/

#ifndef THREADPOOL_H
#define THREADPOOL_H

#include <queue>
#include <vector>
#include <exception>
#include <errno.h>
#include "lock.h"
#define MAX_THREADS 1024

using namespace std;

template <class T>
class Threadpool
{
    private:
        int idle;//線程池中空閑線程的數量
        int num;//線程池中線程數
        vector<pthread_t> idle_tid;//空閑線程的集合
        vector<pthread_t> busy_tid;//正在執行任務的線程的集合
        queue<T *> task_queue;//任務隊列
        Mutex mutex;//互斥鎖
        Cond cond;//條件變量鎖
        bool is_stop;//是否結束線程

    public:
        static void *worker(void *arg);//線程函數,里面執行run函數
        void run();
        T *get_task();//獲取任務函數
        int mv_to_idle(pthread_t tid);//執行任務完成后,放入空閑
        int mv_to_busy(pthread_t tid);//移入到忙碌線程中

    public:
        Threadpool(int n=20);
        ~Threadpool();
        bool append_task(T *task);//添加任務函數
        void start();//開始創建線程池
        void stop();//線程停止函數
};

template <class T>
Threadpool<T>::Threadpool(int n):num(n),idle(n),is_stop(false)                                       
{
    if(num<=0)
    {
        cerr<<"threadpool can't init because num<=0."<<endl;
        exit(1);
    }
}

template <class T>
Threadpool<T>::~Threadpool()
{
    stop();
}

template <class T>
bool Threadpool<T>::append_task(T *task)
{
    mutex.mutex_lock();//臨界資源上鎖

    bool is_signal=task_queue.empty();
    task_queue.push(task);

    mutex.mutex_unlock();//解鎖

    if(is_signal)//signal to null queue
        cond.signal();

    return true;
}

template <class T>
void Threadpool<T>::start()
{
    for(int i=0;i<num;++i)    
    {
        pthread_t tid=0;
        if(pthread_create(&tid,NULL,worker,this)!=0)//this參數的傳遞對於開啟線程運行函數要用到,
        {
            throw exception();
            exit(1);
        }
        idle_tid.push_back(tid);//加入到空閑線程集合
    }
}

template <class T>
void Threadpool<T>::stop()
{
    is_stop=true;
    cond.broadcast();
}

template <class T>
void *Threadpool<T>::worker(void *arg)
{
    Threadpool<T> *thread=(Threadpool<T> *)arg;//thread為一個線程池的指針,指向整個線程池,
    thread->run();//調用線程運行函數,真正執行工作的函數
    return thread;
}

template <class T>
void Threadpool<T>::run()
{
    pthread_t tid=pthread_self();//mutex.mutex_lock();這會造成與append_task函數構成死鎖,訪問task_queue與idle_tid與busy_tid等臨界資源時,可以再各自的函數實現中加鎖
    while(1)//if
    {
        T *task=get_task();//函數實現中有互斥鎖臨界訪問
        if(task==NULL)
        {
            cerr<<"task_queue is null.wait()"<<endl;
            cond.wait();
        }
        else
        {
            mv_to_busy(tid);//函數實現中有互斥鎖保護訪問
            task->doit();//工作函數
            mv_to_idle(tid);
        }
    }
    //mutex.mutex_unlock();
}

template <class T>
T *Threadpool<T>::get_task()
{
    T *task=NULL;
    
    mutex.mutex_lock();
    if(!task_queue.empty())
    {
        task=task_queue.front();
        task_queue.pop();
    }
    mutex.mutex_unlock();

    return task;
}

template <class T>
int Threadpool<T>::mv_to_idle(pthread_t tid)
{
    vector<pthread_t>::iterator busy_iter=busy_tid.begin();
    while(busy_iter!=busy_tid.end())
    {
        if(tid==*busy_iter)
            break;
        ++busy_iter;
    }    
    
    mutex.mutex_lock();
    busy_tid.erase(busy_iter);//此線程空閑,從繁忙任務隊列中移除
    idle_tid.push_back(tid);//添加到空閑線程集合中

    //mutex.mutex_lock();
    ++idle;
    mutex.mutex_unlock();
    return 0;
}

template <class T>
int Threadpool<T>::mv_to_busy(pthread_t tid)
{
    vector<pthread_t>::iterator idle_iter=idle_tid.begin();
    while(idle_iter!=idle_tid.end())
    {
        if(tid==*idle_iter)
            break;
        ++idle_iter;
    }
    mutex.mutex_lock();
    idle_tid.erase(idle_iter);
    busy_tid.push_back(tid);

    //mutex.mutex_lock();
    --idle;
    mutex.mutex_unlock();
}

#endif

epollserver.h

/*************************************************************************
    > File Name: epollserver.h
    > Author: gushi
    > Mail: 971859774@qq.com 
    > Created Time: 2018年11月23日 星期五 17時07分25秒
 ************************************************************************/

#ifndef EPOLL_SERVER_H
#define EPOLL_SERVER_H

#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>//write頭文件
#include <errno.h>
#include "threadpool.h"

#define MAX_EVENT 1024
#define MAX_BUFFER 2048
using namespace std;

class Basetask
{
    public:
        virtual void doit()=0;
};

class Task:public Basetask
{
    private:
        int sockfd;
        char msg[MAX_BUFFER];
    
    public:
        Task(int ,char *);
        void doit();
};

Task::Task(int fd,char *str):sockfd(fd)
{
    memset(msg,0,sizeof(msg));
    strcpy(msg,str);
}

void Task::doit()
{
    cout<<"server reveive message is: "<<msg<<endl;
    write(sockfd,msg,strlen(msg));
    //Threadpool<Task>::mv_to_idle(tid);
}

class Epollserver
{
    private:
        bool is_stop;//是否停止epoll_wait
        int num;//線程數目
        int sockfd;
        int port;
        int epollfd;
        Threadpool<Task> *pool;//線程池的指針
        epoll_event events[MAX_EVENT];
        struct sockaddr_in servaddr;
    
    public:
        Epollserver(int p,int n);
        Epollserver(){}
        ~Epollserver();
        void init();
        void epoll();
        static int setnonblocking(int fd);
        static void addfd(int epollfd,int sockfd,bool onshot);
};
Epollserver::Epollserver(int p,int n):port(p),num(n),is_stop(false),
                        pool(NULL)
{
}

Epollserver::~Epollserver()
{
    delete pool;
}

void Epollserver::init()
{
    bzero(&servaddr,sizeof(servaddr));
    servaddr.sin_family=AF_INET;
    servaddr.sin_addr.s_addr=htonl(INADDR_ANY);
    servaddr.sin_port=htons(port);

    //監聽套接字
    sockfd=socket(AF_INET,SOCK_STREAM,0);
    if(sockfd<0)
    {
        cerr<<"Epollserver socket init error"<<endl;
        exit(1);
    }

    int tmp=bind(sockfd,(struct sockaddr *)&servaddr,sizeof(servaddr));
    if(tmp<0)
    {
        cerr<<"Epollserver bind init error"<<endl;
        exit(1);
    }

    tmp=listen(sockfd,10);
    if(tmp<0)
    {
        //cout<<errno<<endl;
        cerr<<"Epollserver listen init error:"<<strerror(errno)<<endl;
        exit(1);
    }

    epollfd=epoll_create(1024);
    if(epollfd<0)
    {
        cerr<<"Epollserver epoll_create init error"<<endl;
        exit(1);
    }

    //創建線程池,num是線程池中線程的個數,調用構造函數
    pool=new Threadpool<Task>(num); 
}

void Epollserver::epoll()
{
    pool->start();//啟動線程池

    addfd(epollfd,sockfd,false);
    while(!is_stop)
    {
        int ret=epoll_wait(epollfd,events,MAX_EVENT,-1);
        if(ret<0)
        {
            cerr<<"epoll_wait error"<<endl;
            exit(1);
        }

        for(int i=0;i<ret;++i)
        {
            //int fd=events[i].data.fd;
            if(events[i].data.fd==sockfd)
            {
                struct sockaddr_in cliaddr;
                socklen_t len=sizeof(cliaddr);
                //accpet 返回已連接套接字
                int confd=accept(sockfd,(struct sockaddr *)&cliaddr,&len);
                Epollserver::addfd(epollfd,confd,false);
            }
            else if(events[i].events&EPOLLIN)//有數據可讀
            {
                char buffer[MAX_BUFFER];
                int fd=events[i].data.fd;//接受已連接套接字,對客戶端進行內容回送
    readagain:    memset(buffer,0,sizeof(buffer));
                ret=read(fd,buffer,MAX_BUFFER-1);
                if(ret==0)//某個fd關閉連接
                {
                    struct epoll_event ev;
                    ev.events=EPOLLIN;
                    ev.data.fd=fd;
                    epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,&ev);
                    shutdown(fd,SHUT_RDWR);
                    cout<<fd<<" exit"<<endl;
                    continue;
                }
                else if(ret<0)//讀取失敗
                {
                    if(errno==EAGAIN)
                    {
                        cout<<"read error. read again"<<endl;
                        goto readagain;
                        break;
                    }
                }
                else//讀取成功
                {
                    Task *task=new Task(fd,buffer);
                    pool->append_task(task);
                }
            }//else if
            else
                cerr<<"something else had happend"<<endl;
        }//for
    }//while
    close(sockfd);
    pool->stop();
}

int Epollserver::setnonblocking(int fd)
{
    int old_opt=fcntl(fd,F_GETFL);
    int new_opt=old_opt|O_NONBLOCK;
    fcntl(fd,F_SETFL,new_opt);
    return old_opt;
}

void Epollserver::addfd(int epollfd,int sockfd,bool oneshot)
{
    epoll_event event;
    event.data.fd=sockfd;
    event.events=EPOLLIN|EPOLLET;
    if(oneshot)
        event.events|=EPOLLONESHOT;
    epoll_ctl(epollfd,EPOLL_CTL_ADD,sockfd,&event);
    Epollserver::setnonblocking(sockfd);
}

#endif

server.cpp服務器的主函數

/*************************************************************************
    > File Name: server.cpp
    > Author: gushi
    > Mail: 971859774@qq.com 
    > Created Time: 2018年11月23日 星期五 19時34分29秒
 ************************************************************************/

#include "epollserver.h"
#define INDARRY_PORT 9877

using namespace std;

int main(int argc,char **argv)
{
    Epollserver *epoll=new Epollserver(INDARRY_PORT,20);
    epoll->init();//對初始化服務器(socket,bind,listen,epoll_create...等函數,病完成線程池的初始化)
    epoll->epoll();//開啟線程池,完成相應的任務添加之后,自動調用線程池中空閑的函數來完成doit工作
    return 0;
}

客戶端程序:

/*************************************************************************
    > File Name: client.cpp
    > Author:gushi
    > Mail: 971859774@qq.com 
    > Created Time: 2018年11月24日 星期六 15時36分23秒
 ************************************************************************/

#include <iostream>
#include <sys/socket.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <netinet/in.h>
#include <strings.h>
#define SERV_PORT 9877
#define MAXLINE 1204

using namespace std;

void str_cli(FILE *fp,int sockfd)
{
    fd_set set;
    FD_ZERO(&set);

    char buff[1024];
    int stdineof=0,n;
    while(1)
    {
        if(stdineof==0)
            FD_SET(fileno(fp),&set);
        FD_SET(sockfd,&set);

        int maxfd=max(fileno(fp),sockfd)+1;
        
        select(maxfd,&set,NULL,NULL,NULL);

        if(FD_ISSET(sockfd,&set))
        {
            if((n=read(sockfd,buff,MAXLINE))==0)
                if(stdineof==1)
                    return;
                else
                    cerr<<"str_cli: server terinated peraturely"<<endl;

            write(fileno(stdout),buff,n);
        }
        else if(FD_ISSET(fileno(fp),&set))
        {
            if((n=read(fileno(fp),buff,MAXLINE))==0)//客戶完成輸入
                stdineof=1;
            write(sockfd,buff,n);

            //shutdown(sockfd,SHUT_WR);

            FD_CLR(fileno(fp),&set);
            continue;
        }
        //write(sockfd,buff,n);
    }
    return;
}

int main(int argc,char **argv)
{
    if(argc!=2)
    {
        cerr<<"please input server address."<<endl;
        exit(1);
    }

    int sockfd=socket(AF_INET,SOCK_STREAM,0);

    struct sockaddr_in servaddr;
    bzero(&servaddr,sizeof(servaddr));
    servaddr.sin_family=AF_INET;
    inet_pton(AF_INET,argv[1],&servaddr.sin_addr);
    servaddr.sin_port=htons(SERV_PORT);
    
    connect(sockfd,(struct sockaddr *)&servaddr,sizeof(servaddr));

    str_cli(stdin,sockfd);
    return 0;
}

GitHub:https://github.com/tianzengBlog/websServer


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM