利用libevent 和線程池實現高並發服務器


鏈接:https://pan.baidu.com/s/16UcOJplrcQ3EfLurycmSmA 

提取碼:7roj 

主進程添加監聽套接字的事件並進行事件循環,將連接描述符放入定義的數據結構中,並在主進程中進行寫管道,觸發子線程的讀管道事件,然后從連接結構中獲取連接描述符進行和客戶端進行通信。其中主進程和子線程都有不同的基事件base.

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

#include <sys/types.h>
#include <sys/socket.h>

#include <event.h>
#include <queue>
#include <iostream>

#include <arpa/inet.h>


#include <memory.h>


const int thread_num = 10;
#define BUF_SIZE 1024

using namespace std;



typedef struct {
    pthread_t tid;
    struct event_base *base;
    struct event event;
    int read_fd;
    int write_fd;
    //queue<int> q;
    int f_connect;
    char * buffer;
}LIBEVENT_THREAD;                 //需要保存的信息結構,用於管道通信和基事件的管理





typedef struct {
    pthread_t tid;
    struct event_base *base;
}DISPATCHER_THREAD;



LIBEVENT_THREAD *threads = (LIBEVENT_THREAD *) calloc(thread_num, sizeof(LIBEVENT_THREAD));


void on_write(int sock, short event, void* arg);

void on_read(int sock, short event, void* arg)
{

    cout<<"on_read() called, sock="<<sock<<endl;
    if(NULL == arg){
        return;
    }
    LIBEVENT_THREAD* event_thread = (LIBEVENT_THREAD*) arg;//獲取傳進來的參數
    char* buffer = new char[BUF_SIZE];
    memset(buffer, 0, sizeof(char)*BUF_SIZE);
    //--本來應該用while一直循環,但由於用了libevent,只在可以讀的時候才觸發on_read(),故不必用while了
    int size = read(sock, buffer, BUF_SIZE);
    if(0 == size){//說明socket關閉
        cout<<"read size is 0 for socket:"<<sock<<endl;
      //  destroy_sock_ev(event_struct);

      //event_thread->q.pop();
       close(sock);
        return;
    }
    cout<<"i have receive: "<<buffer<<endl;



    event_thread->buffer=buffer;

    struct event* write_ev = (struct event*)malloc(sizeof(struct event));//發生寫事件(也就是只要socket緩沖區可寫)時,就將反饋數據通過socket寫回客戶端
    event_set(write_ev, sock, EV_WRITE, on_write, event_thread);

    event_base_set(event_thread->base, write_ev);
    event_add(write_ev, NULL);
    cout<<"on_read() finished, sock="<<sock<<endl;

}




void on_write(int sock, short event, void* arg)
{

    if(NULL == arg){
        return;
    }
    LIBEVENT_THREAD* event_write_thread = (LIBEVENT_THREAD*) arg;//獲取傳進來的參數
    //char* buffer = new char[BUF_SIZE];
    //memset(buffer, 0, sizeof(char)*BUF_SIZE);

    //strcpy(buffer,event_write_thread->buffer,sizeof(event_write_thread->buffer));
    //--本來應該用while一直循環,但由於用了libevent,只在可以讀的時候才觸發on_read(),故不必用while了
    write(sock, event_write_thread->buffer, BUF_SIZE);

    free(event_write_thread->buffer);

    event_write_thread->buffer=NULL;

}




static void thread_libevent_process(int fd, short which, void *arg)
{
    int ret;
    char buf[128];
    LIBEVENT_THREAD *me = (LIBEVENT_THREAD *) arg;

    int fdconnect;

    if (fd != me->read_fd) {
        printf("thread_libevent_process error : fd != me->read_fd\n");
        exit(1);
    }


            ret = read(fd, buf, 128);
           if (ret > 0) 
            {

              buf[ret] = '\0';

              printf("thread %llu receive message : %s\n", (unsigned long long)me->tid, buf);

            }


           cout<<"thread_libevent_process\n"<<endl;

    /*if(me->q.size()>0)
        {
            fdconnect=me->q.front();
            me->q.pop();

           ret = read(fd, buf, 128);
           if (ret > 0) 
            {

              buf[ret] = '\0';

              printf("thread %llu receive message : %s\n", (unsigned long long)me->tid, buf);

            }
        }*/

        /*if(me->q.size()>0)
        {
            fdconnect=me->q.front();


            cout<<"thread_libevent_process succeed "<<endl;
            //me->q.pop();
        }

        else
            return ;*/


        fdconnect=me->f_connect;

        struct event* read_ev = (struct event*)malloc(sizeof(struct event));//發生讀事件后,從socket中取出數據

        event_set(read_ev, fdconnect, EV_READ|EV_PERSIST, on_read, me);

        event_base_set(me->base, read_ev);

        event_add(read_ev, NULL);


    return;
}






void thread_init()
{
    int ret;
    int fd[2];
    for (int i = 0; i < thread_num; i++) {

            ret = socketpair(AF_LOCAL, SOCK_STREAM, 0, fd);




            if (ret == -1) {
                perror("socketpair()");
                return  ;
            }

            threads[i].read_fd = fd[0];
            threads[i].write_fd = fd[1];

            threads[i].base = event_init();


            if (threads[i].base == NULL) {
                perror("event_init()");
                return ;
            }

            event_set(&threads[i].event,threads[i].read_fd, EV_READ | EV_PERSIST, thread_libevent_process, &threads[i]);



            event_base_set(threads[i].base, &threads[i].event);
            if (event_add(&threads[i].event, 0) == -1) {
                perror("event_add()");
                return ;
            }

            cout<<"thread_init succeed"<<endl;

        }
}


 void * worker_thread(void *arg)
{
    LIBEVENT_THREAD *me = (LIBEVENT_THREAD *)arg;
    me->tid = pthread_self();

    //event_base_loop(me->base, 0);

     event_base_dispatch(me->base);


    return NULL;
}




void CreatPhreadPool()
{

    for (int i = 0; i < thread_num; i++) {
        pthread_create(&threads[i].tid, NULL, worker_thread, &threads[i]);
    }


    cout<<"CreatPhreadPool"<<endl;
}







int getSocket(){
    int fd =socket( AF_INET, SOCK_STREAM, 0 );
    if(-1 == fd){
        cout<<"Error, fd is -1"<<endl;
    }
    return fd;
}


int last_thread=0;

void event_handler(int sock, short event, void* arg)  //添加其他信息

{
    struct sockaddr_in remote_addr;
    int sin_size=sizeof(struct sockaddr_in);
    int new_fd = accept(sock,  (struct sockaddr*) &remote_addr, (socklen_t*)&sin_size);    //如果線程池已用完,怎么辦呢?
    if(new_fd < 0){
        cout<<"Accept error in on_accept()"<<endl;
        return;
    }
    cout<<"new_fd accepted is "<<new_fd<<endl;

    int tid = (last_thread + 1) % thread_num;        //memcached中線程負載均衡算法

    LIBEVENT_THREAD *thread = threads + tid;

    last_thread = tid;  

    thread->f_connect=new_fd;

    write(thread->write_fd, " ", 1);

    cout<<"on_accept() finished for fd="<<new_fd<<endl;
}






DISPATCHER_THREAD dispatcher_thread;        //用於設置主線程的結構變量




int main(int argc, char** argv)  
{



     thread_init();


     CreatPhreadPool();






    int fd_listen = getSocket();
     if(fd_listen <0){
         cout<<"Error in main(), fd<0"<<endl;
     }
     //cout<<"main() fd="<<fd<<endl;
     //----為服務器主線程綁定ip和port------------------------------
     struct sockaddr_in local_addr; //服務器端網絡地址結構體
     memset(&local_addr,0,sizeof(local_addr)); //數據初始化--清零
     local_addr.sin_family=AF_INET; //設置為IP通信
     local_addr.sin_addr.s_addr=inet_addr(argv[1]);//服務器IP地址
     local_addr.sin_port=htons(atoi(argv[2])); //服務器端口號
     int bind_result = bind(fd_listen, (struct sockaddr*) &local_addr, sizeof(struct sockaddr));
     if(bind_result < 0){
         cout<<"Bind Error in main()"<<endl;
         return -1;
     }
     cout<<"bind_result="<<bind_result<<endl;
     listen(fd_listen, 10);


      evutil_make_socket_nonblocking(fd);
     //-----設置libevent事件,每當socket出現可讀事件,就調用on_accept()------------
     struct event_base* base = event_base_new();
     dispatcher_thread.base=base;
     dispatcher_thread.tid = pthread_self();

     struct event listen_ev;
     event_set(&listen_ev, fd_listen, EV_READ|EV_PERSIST, event_handler, NULL);
     event_base_set(dispatcher_thread.base, &listen_ev);
     event_add(&listen_ev, NULL);
     event_base_dispatch(dispatcher_thread.base);
     //------以下語句理論上是不會走到的---------------------------
     cout<<"event_base_dispatch() in main() finished"<<endl;
     //----銷毀資源-------------
     event_del(&listen_ev);
     event_base_free(dispatcher_thread.base);
     cout<<"main() finished"<<endl;

}

 

#include <stdio.h>#include <stdlib.h>#include <unistd.h>#include <pthread.h>
#include <sys/types.h>#include <sys/socket.h>
#include <event.h>#include <queue>#include <iostream>
#include <arpa/inet.h>

#include <memory.h>

const int thread_num = 10;#define BUF_SIZE 1024
using namespace std;


typedef struct {    pthread_t tid;    struct event_base *base;    struct event event;    int read_fd;    int write_fd;    //queue<int> q;    int f_connect;    char * buffer;}LIBEVENT_THREAD;                 //需要保存的信息結構,用於管道通信和基事件的管理




typedef struct {    pthread_t tid;    struct event_base *base;}DISPATCHER_THREAD;


LIBEVENT_THREAD *threads = (LIBEVENT_THREAD *) calloc(thread_num, sizeof(LIBEVENT_THREAD));

void on_write(int sock, short event, void* arg);
void on_read(int sock, short event, void* arg){
    cout<<"on_read() called, sock="<<sock<<endl;    if(NULL == arg){        return;    }    LIBEVENT_THREAD* event_thread = (LIBEVENT_THREAD*) arg;//獲取傳進來的參數    char* buffer = new char[BUF_SIZE];    memset(buffer, 0, sizeof(char)*BUF_SIZE);    //--本來應該用while一直循環,但由於用了libevent,只在可以讀的時候才觸發on_read(),故不必用while了    int size = read(sock, buffer, BUF_SIZE);    if(0 == size){//說明socket關閉        cout<<"read size is 0 for socket:"<<sock<<endl;      //  destroy_sock_ev(event_struct);
      //event_thread->q.pop();       close(sock);        return;    }    cout<<"i have receive: "<<buffer<<endl;


    event_thread->buffer=buffer;
    struct event* write_ev = (struct event*)malloc(sizeof(struct event));//發生寫事件(也就是只要socket緩沖區可寫)時,就將反饋數據通過socket寫回客戶端    event_set(write_ev, sock, EV_WRITE, on_write, event_thread);
    event_base_set(event_thread->base, write_ev);    event_add(write_ev, NULL);    cout<<"on_read() finished, sock="<<sock<<endl;
}



void on_write(int sock, short event, void* arg){
    if(NULL == arg){        return;    }    LIBEVENT_THREAD* event_write_thread = (LIBEVENT_THREAD*) arg;//獲取傳進來的參數    //char* buffer = new char[BUF_SIZE];    //memset(buffer, 0, sizeof(char)*BUF_SIZE);
    //strcpy(buffer,event_write_thread->buffer,sizeof(event_write_thread->buffer));    //--本來應該用while一直循環,但由於用了libevent,只在可以讀的時候才觸發on_read(),故不必用while了    write(sock, event_write_thread->buffer, BUF_SIZE);
    free(event_write_thread->buffer);
    event_write_thread->buffer=NULL;
}



static void thread_libevent_process(int fd, short which, void *arg){    int ret;    char buf[128];    LIBEVENT_THREAD *me = (LIBEVENT_THREAD *) arg;
    int fdconnect;
    if (fd != me->read_fd) {        printf("thread_libevent_process error : fd != me->read_fd\n");        exit(1);    }

            ret = read(fd, buf, 128);           if (ret > 0)             {
              buf[ret] = '\0';
              printf("thread %llu receive message : %s\n", (unsigned long long)me->tid, buf);
            }

           cout<<"thread_libevent_process\n"<<endl;
    /*if(me->q.size()>0)        {            fdconnect=me->q.front();            me->q.pop();
           ret = read(fd, buf, 128);           if (ret > 0)             {
              buf[ret] = '\0';
              printf("thread %llu receive message : %s\n", (unsigned long long)me->tid, buf);
            }        }*/
        /*if(me->q.size()>0)        {            fdconnect=me->q.front();

            cout<<"thread_libevent_process succeed "<<endl;            //me->q.pop();        }
        else            return ;*/

        fdconnect=me->f_connect;
        struct event* read_ev = (struct event*)malloc(sizeof(struct event));//發生讀事件后,從socket中取出數據
        event_set(read_ev, fdconnect, EV_READ|EV_PERSIST, on_read, me);
        event_base_set(me->base, read_ev);
        event_add(read_ev, NULL);

    return;}





void thread_init(){    int ret;    int fd[2];    for (int i = 0; i < thread_num; i++) {
            ret = socketpair(AF_LOCAL, SOCK_STREAM, 0, fd);



            if (ret == -1) {                perror("socketpair()");                return  ;            }
            threads[i].read_fd = fd[0];            threads[i].write_fd = fd[1];
            threads[i].base = event_init();

            if (threads[i].base == NULL) {                perror("event_init()");                return ;            }
            event_set(&threads[i].event,threads[i].read_fd, EV_READ | EV_PERSIST, thread_libevent_process, &threads[i]);


            event_base_set(threads[i].base, &threads[i].event);            if (event_add(&threads[i].event, 0) == -1) {                perror("event_add()");                return ;            }
            cout<<"thread_init succeed"<<endl;
        }}

 void * worker_thread(void *arg){    LIBEVENT_THREAD *me = (LIBEVENT_THREAD *)arg;    me->tid = pthread_self();
    //event_base_loop(me->base, 0);
     event_base_dispatch(me->base);

    return NULL;}



void CreatPhreadPool(){
    for (int i = 0; i < thread_num; i++) {        pthread_create(&threads[i].tid, NULL, worker_thread, &threads[i]);    }

    cout<<"CreatPhreadPool"<<endl;}






int getSocket(){    int fd =socket( AF_INET, SOCK_STREAM, 0 );    if(-1 == fd){        cout<<"Error, fd is -1"<<endl;    }    return fd;}

int last_thread=0;
void event_handler(int sock, short event, void* arg)  //添加其他信息
{    struct sockaddr_in remote_addr;    int sin_size=sizeof(struct sockaddr_in);    int new_fd = accept(sock,  (struct sockaddr*) &remote_addr, (socklen_t*)&sin_size);    //如果線程池已用完,怎么辦呢?    if(new_fd < 0){        cout<<"Accept error in on_accept()"<<endl;        return;    }    cout<<"new_fd accepted is "<<new_fd<<endl;
    int tid = (last_thread + 1) % thread_num;        //memcached中線程負載均衡算法
    LIBEVENT_THREAD *thread = threads + tid;
    last_thread = tid;  
    thread->f_connect=new_fd;
    write(thread->write_fd, " ", 1);
    cout<<"on_accept() finished for fd="<<new_fd<<endl;}





DISPATCHER_THREAD dispatcher_thread;        //用於設置主線程的結構變量



int main(int argc, char** argv)  {


     thread_init();

     CreatPhreadPool();





    int fd_listen = getSocket();     if(fd_listen <0){         cout<<"Error in main(), fd<0"<<endl;     }     //cout<<"main() fd="<<fd<<endl;     //----為服務器主線程綁定ip和port------------------------------     struct sockaddr_in local_addr; //服務器端網絡地址結構體     memset(&local_addr,0,sizeof(local_addr)); //數據初始化--清零     local_addr.sin_family=AF_INET; //設置為IP通信     local_addr.sin_addr.s_addr=inet_addr(argv[1]);//服務器IP地址     local_addr.sin_port=htons(atoi(argv[2])); //服務器端口號     int bind_result = bind(fd_listen, (struct sockaddr*) &local_addr, sizeof(struct sockaddr));     if(bind_result < 0){         cout<<"Bind Error in main()"<<endl;         return -1;     }     cout<<"bind_result="<<bind_result<<endl;     listen(fd_listen, 10);

      evutil_make_socket_nonblocking(fd);     //-----設置libevent事件,每當socket出現可讀事件,就調用on_accept()------------     struct event_base* base = event_base_new();     dispatcher_thread.base=base;     dispatcher_thread.tid = pthread_self();
     struct event listen_ev;     event_set(&listen_ev, fd_listen, EV_READ|EV_PERSIST, event_handler, NULL);     event_base_set(dispatcher_thread.base, &listen_ev);     event_add(&listen_ev, NULL);     event_base_dispatch(dispatcher_thread.base);     //------以下語句理論上是不會走到的---------------------------     cout<<"event_base_dispatch() in main() finished"<<endl;     //----銷毀資源-------------     event_del(&listen_ev);     event_base_free(dispatcher_thread.base);     cout<<"main() finished"<<endl;
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM