鏈接:https://pan.baidu.com/s/16UcOJplrcQ3EfLurycmSmA
提取碼:7roj
主進程添加監聽套接字的事件並進行事件循環,將連接描述符放入定義的數據結構中,並在主進程中進行寫管道,觸發子線程的讀管道事件,然后從連接結構中獲取連接描述符進行和客戶端進行通信。其中主進程和子線程都有不同的基事件base.
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> #include <sys/types.h> #include <sys/socket.h> #include <event.h> #include <queue> #include <iostream> #include <arpa/inet.h> #include <memory.h> const int thread_num = 10; #define BUF_SIZE 1024 using namespace std; typedef struct { pthread_t tid; struct event_base *base; struct event event; int read_fd; int write_fd; //queue<int> q; int f_connect; char * buffer; }LIBEVENT_THREAD; //需要保存的信息結構,用於管道通信和基事件的管理 typedef struct { pthread_t tid; struct event_base *base; }DISPATCHER_THREAD; LIBEVENT_THREAD *threads = (LIBEVENT_THREAD *) calloc(thread_num, sizeof(LIBEVENT_THREAD)); void on_write(int sock, short event, void* arg); void on_read(int sock, short event, void* arg) { cout<<"on_read() called, sock="<<sock<<endl; if(NULL == arg){ return; } LIBEVENT_THREAD* event_thread = (LIBEVENT_THREAD*) arg;//獲取傳進來的參數 char* buffer = new char[BUF_SIZE]; memset(buffer, 0, sizeof(char)*BUF_SIZE); //--本來應該用while一直循環,但由於用了libevent,只在可以讀的時候才觸發on_read(),故不必用while了 int size = read(sock, buffer, BUF_SIZE); if(0 == size){//說明socket關閉 cout<<"read size is 0 for socket:"<<sock<<endl; // destroy_sock_ev(event_struct); //event_thread->q.pop(); close(sock); return; } cout<<"i have receive: "<<buffer<<endl; event_thread->buffer=buffer; struct event* write_ev = (struct event*)malloc(sizeof(struct event));//發生寫事件(也就是只要socket緩沖區可寫)時,就將反饋數據通過socket寫回客戶端 event_set(write_ev, sock, EV_WRITE, on_write, event_thread); event_base_set(event_thread->base, write_ev); event_add(write_ev, NULL); cout<<"on_read() finished, sock="<<sock<<endl; } void on_write(int sock, short event, void* arg) { if(NULL == arg){ return; } LIBEVENT_THREAD* event_write_thread = (LIBEVENT_THREAD*) arg;//獲取傳進來的參數 //char* buffer = new char[BUF_SIZE]; //memset(buffer, 0, sizeof(char)*BUF_SIZE); //strcpy(buffer,event_write_thread->buffer,sizeof(event_write_thread->buffer)); //--本來應該用while一直循環,但由於用了libevent,只在可以讀的時候才觸發on_read(),故不必用while了 write(sock, event_write_thread->buffer, BUF_SIZE); free(event_write_thread->buffer); event_write_thread->buffer=NULL; } static void thread_libevent_process(int fd, short which, void *arg) { int ret; char buf[128]; LIBEVENT_THREAD *me = (LIBEVENT_THREAD *) arg; int fdconnect; if (fd != me->read_fd) { printf("thread_libevent_process error : fd != me->read_fd\n"); exit(1); } ret = read(fd, buf, 128); if (ret > 0) { buf[ret] = '\0'; printf("thread %llu receive message : %s\n", (unsigned long long)me->tid, buf); } cout<<"thread_libevent_process\n"<<endl; /*if(me->q.size()>0) { fdconnect=me->q.front(); me->q.pop(); ret = read(fd, buf, 128); if (ret > 0) { buf[ret] = '\0'; printf("thread %llu receive message : %s\n", (unsigned long long)me->tid, buf); } }*/ /*if(me->q.size()>0) { fdconnect=me->q.front(); cout<<"thread_libevent_process succeed "<<endl; //me->q.pop(); } else return ;*/ fdconnect=me->f_connect; struct event* read_ev = (struct event*)malloc(sizeof(struct event));//發生讀事件后,從socket中取出數據 event_set(read_ev, fdconnect, EV_READ|EV_PERSIST, on_read, me); event_base_set(me->base, read_ev); event_add(read_ev, NULL); return; } void thread_init() { int ret; int fd[2]; for (int i = 0; i < thread_num; i++) { ret = socketpair(AF_LOCAL, SOCK_STREAM, 0, fd); if (ret == -1) { perror("socketpair()"); return ; } threads[i].read_fd = fd[0]; threads[i].write_fd = fd[1]; threads[i].base = event_init(); if (threads[i].base == NULL) { perror("event_init()"); return ; } event_set(&threads[i].event,threads[i].read_fd, EV_READ | EV_PERSIST, thread_libevent_process, &threads[i]); event_base_set(threads[i].base, &threads[i].event); if (event_add(&threads[i].event, 0) == -1) { perror("event_add()"); return ; } cout<<"thread_init succeed"<<endl; } } void * worker_thread(void *arg) { LIBEVENT_THREAD *me = (LIBEVENT_THREAD *)arg; me->tid = pthread_self(); //event_base_loop(me->base, 0); event_base_dispatch(me->base); return NULL; } void CreatPhreadPool() { for (int i = 0; i < thread_num; i++) { pthread_create(&threads[i].tid, NULL, worker_thread, &threads[i]); } cout<<"CreatPhreadPool"<<endl; } int getSocket(){ int fd =socket( AF_INET, SOCK_STREAM, 0 ); if(-1 == fd){ cout<<"Error, fd is -1"<<endl; } return fd; } int last_thread=0; void event_handler(int sock, short event, void* arg) //添加其他信息 { struct sockaddr_in remote_addr; int sin_size=sizeof(struct sockaddr_in); int new_fd = accept(sock, (struct sockaddr*) &remote_addr, (socklen_t*)&sin_size); //如果線程池已用完,怎么辦呢? if(new_fd < 0){ cout<<"Accept error in on_accept()"<<endl; return; } cout<<"new_fd accepted is "<<new_fd<<endl; int tid = (last_thread + 1) % thread_num; //memcached中線程負載均衡算法 LIBEVENT_THREAD *thread = threads + tid; last_thread = tid; thread->f_connect=new_fd; write(thread->write_fd, " ", 1); cout<<"on_accept() finished for fd="<<new_fd<<endl; } DISPATCHER_THREAD dispatcher_thread; //用於設置主線程的結構變量 int main(int argc, char** argv) { thread_init(); CreatPhreadPool(); int fd_listen = getSocket(); if(fd_listen <0){ cout<<"Error in main(), fd<0"<<endl; } //cout<<"main() fd="<<fd<<endl; //----為服務器主線程綁定ip和port------------------------------ struct sockaddr_in local_addr; //服務器端網絡地址結構體 memset(&local_addr,0,sizeof(local_addr)); //數據初始化--清零 local_addr.sin_family=AF_INET; //設置為IP通信 local_addr.sin_addr.s_addr=inet_addr(argv[1]);//服務器IP地址 local_addr.sin_port=htons(atoi(argv[2])); //服務器端口號 int bind_result = bind(fd_listen, (struct sockaddr*) &local_addr, sizeof(struct sockaddr)); if(bind_result < 0){ cout<<"Bind Error in main()"<<endl; return -1; } cout<<"bind_result="<<bind_result<<endl; listen(fd_listen, 10); evutil_make_socket_nonblocking(fd); //-----設置libevent事件,每當socket出現可讀事件,就調用on_accept()------------ struct event_base* base = event_base_new(); dispatcher_thread.base=base; dispatcher_thread.tid = pthread_self(); struct event listen_ev; event_set(&listen_ev, fd_listen, EV_READ|EV_PERSIST, event_handler, NULL); event_base_set(dispatcher_thread.base, &listen_ev); event_add(&listen_ev, NULL); event_base_dispatch(dispatcher_thread.base); //------以下語句理論上是不會走到的--------------------------- cout<<"event_base_dispatch() in main() finished"<<endl; //----銷毀資源------------- event_del(&listen_ev); event_base_free(dispatcher_thread.base); cout<<"main() finished"<<endl; }
#include <stdio.h>#include <stdlib.h>#include <unistd.h>#include <pthread.h>
#include <sys/types.h>#include <sys/socket.h>
#include <event.h>#include <queue>#include <iostream>
#include <arpa/inet.h>
#include <memory.h>
const int thread_num = 10;#define BUF_SIZE 1024
using namespace std;
typedef struct { pthread_t tid; struct event_base *base; struct event event; int read_fd; int write_fd; //queue<int> q; int f_connect; char * buffer;}LIBEVENT_THREAD; //需要保存的信息結構,用於管道通信和基事件的管理
typedef struct { pthread_t tid; struct event_base *base;}DISPATCHER_THREAD;
LIBEVENT_THREAD *threads = (LIBEVENT_THREAD *) calloc(thread_num, sizeof(LIBEVENT_THREAD));
void on_write(int sock, short event, void* arg);
void on_read(int sock, short event, void* arg){
cout<<"on_read() called, sock="<<sock<<endl; if(NULL == arg){ return; } LIBEVENT_THREAD* event_thread = (LIBEVENT_THREAD*) arg;//獲取傳進來的參數 char* buffer = new char[BUF_SIZE]; memset(buffer, 0, sizeof(char)*BUF_SIZE); //--本來應該用while一直循環,但由於用了libevent,只在可以讀的時候才觸發on_read(),故不必用while了 int size = read(sock, buffer, BUF_SIZE); if(0 == size){//說明socket關閉 cout<<"read size is 0 for socket:"<<sock<<endl; // destroy_sock_ev(event_struct);
//event_thread->q.pop(); close(sock); return; } cout<<"i have receive: "<<buffer<<endl;
event_thread->buffer=buffer;
struct event* write_ev = (struct event*)malloc(sizeof(struct event));//發生寫事件(也就是只要socket緩沖區可寫)時,就將反饋數據通過socket寫回客戶端 event_set(write_ev, sock, EV_WRITE, on_write, event_thread);
event_base_set(event_thread->base, write_ev); event_add(write_ev, NULL); cout<<"on_read() finished, sock="<<sock<<endl;
}
void on_write(int sock, short event, void* arg){
if(NULL == arg){ return; } LIBEVENT_THREAD* event_write_thread = (LIBEVENT_THREAD*) arg;//獲取傳進來的參數 //char* buffer = new char[BUF_SIZE]; //memset(buffer, 0, sizeof(char)*BUF_SIZE);
//strcpy(buffer,event_write_thread->buffer,sizeof(event_write_thread->buffer)); //--本來應該用while一直循環,但由於用了libevent,只在可以讀的時候才觸發on_read(),故不必用while了 write(sock, event_write_thread->buffer, BUF_SIZE);
free(event_write_thread->buffer);
event_write_thread->buffer=NULL;
}
static void thread_libevent_process(int fd, short which, void *arg){ int ret; char buf[128]; LIBEVENT_THREAD *me = (LIBEVENT_THREAD *) arg;
int fdconnect;
if (fd != me->read_fd) { printf("thread_libevent_process error : fd != me->read_fd\n"); exit(1); }
ret = read(fd, buf, 128); if (ret > 0) {
buf[ret] = '\0';
printf("thread %llu receive message : %s\n", (unsigned long long)me->tid, buf);
}
cout<<"thread_libevent_process\n"<<endl;
/*if(me->q.size()>0) { fdconnect=me->q.front(); me->q.pop();
ret = read(fd, buf, 128); if (ret > 0) {
buf[ret] = '\0';
printf("thread %llu receive message : %s\n", (unsigned long long)me->tid, buf);
} }*/
/*if(me->q.size()>0) { fdconnect=me->q.front();
cout<<"thread_libevent_process succeed "<<endl; //me->q.pop(); }
else return ;*/
fdconnect=me->f_connect;
struct event* read_ev = (struct event*)malloc(sizeof(struct event));//發生讀事件后,從socket中取出數據
event_set(read_ev, fdconnect, EV_READ|EV_PERSIST, on_read, me);
event_base_set(me->base, read_ev);
event_add(read_ev, NULL);
return;}
void thread_init(){ int ret; int fd[2]; for (int i = 0; i < thread_num; i++) {
ret = socketpair(AF_LOCAL, SOCK_STREAM, 0, fd);
if (ret == -1) { perror("socketpair()"); return ; }
threads[i].read_fd = fd[0]; threads[i].write_fd = fd[1];
threads[i].base = event_init();
if (threads[i].base == NULL) { perror("event_init()"); return ; }
event_set(&threads[i].event,threads[i].read_fd, EV_READ | EV_PERSIST, thread_libevent_process, &threads[i]);
event_base_set(threads[i].base, &threads[i].event); if (event_add(&threads[i].event, 0) == -1) { perror("event_add()"); return ; }
cout<<"thread_init succeed"<<endl;
}}
void * worker_thread(void *arg){ LIBEVENT_THREAD *me = (LIBEVENT_THREAD *)arg; me->tid = pthread_self();
//event_base_loop(me->base, 0);
event_base_dispatch(me->base);
return NULL;}
void CreatPhreadPool(){
for (int i = 0; i < thread_num; i++) { pthread_create(&threads[i].tid, NULL, worker_thread, &threads[i]); }
cout<<"CreatPhreadPool"<<endl;}
int getSocket(){ int fd =socket( AF_INET, SOCK_STREAM, 0 ); if(-1 == fd){ cout<<"Error, fd is -1"<<endl; } return fd;}
int last_thread=0;
void event_handler(int sock, short event, void* arg) //添加其他信息
{ struct sockaddr_in remote_addr; int sin_size=sizeof(struct sockaddr_in); int new_fd = accept(sock, (struct sockaddr*) &remote_addr, (socklen_t*)&sin_size); //如果線程池已用完,怎么辦呢? if(new_fd < 0){ cout<<"Accept error in on_accept()"<<endl; return; } cout<<"new_fd accepted is "<<new_fd<<endl;
int tid = (last_thread + 1) % thread_num; //memcached中線程負載均衡算法
LIBEVENT_THREAD *thread = threads + tid;
last_thread = tid;
thread->f_connect=new_fd;
write(thread->write_fd, " ", 1);
cout<<"on_accept() finished for fd="<<new_fd<<endl;}
DISPATCHER_THREAD dispatcher_thread; //用於設置主線程的結構變量
int main(int argc, char** argv) {
thread_init();
CreatPhreadPool();
int fd_listen = getSocket(); if(fd_listen <0){ cout<<"Error in main(), fd<0"<<endl; } //cout<<"main() fd="<<fd<<endl; //----為服務器主線程綁定ip和port------------------------------ struct sockaddr_in local_addr; //服務器端網絡地址結構體 memset(&local_addr,0,sizeof(local_addr)); //數據初始化--清零 local_addr.sin_family=AF_INET; //設置為IP通信 local_addr.sin_addr.s_addr=inet_addr(argv[1]);//服務器IP地址 local_addr.sin_port=htons(atoi(argv[2])); //服務器端口號 int bind_result = bind(fd_listen, (struct sockaddr*) &local_addr, sizeof(struct sockaddr)); if(bind_result < 0){ cout<<"Bind Error in main()"<<endl; return -1; } cout<<"bind_result="<<bind_result<<endl; listen(fd_listen, 10);
evutil_make_socket_nonblocking(fd); //-----設置libevent事件,每當socket出現可讀事件,就調用on_accept()------------ struct event_base* base = event_base_new(); dispatcher_thread.base=base; dispatcher_thread.tid = pthread_self();
struct event listen_ev; event_set(&listen_ev, fd_listen, EV_READ|EV_PERSIST, event_handler, NULL); event_base_set(dispatcher_thread.base, &listen_ev); event_add(&listen_ev, NULL); event_base_dispatch(dispatcher_thread.base); //------以下語句理論上是不會走到的--------------------------- cout<<"event_base_dispatch() in main() finished"<<endl; //----銷毀資源------------- event_del(&listen_ev); event_base_free(dispatcher_thread.base); cout<<"main() finished"<<endl;
}