使用libevent進行多線程socket編程demo


最近要對一個用libevent寫的C/C++項目進行修改,要改成多線程的,故做了一些學習和研究。

libevent是一個用C語言寫的開源的一個庫。它對socket編程里的epoll/select等功能進行了封裝,並且使用了一些設計模式(比如反應堆模式),用事件機制來簡化了socket編程。libevent的好處網上有很多,但是初學者往往都看不懂。我打個比方吧,1)假設有N個客戶端同時往服務端通過socket寫數據,用了libevent之后,你的server程序里就不用再使用epoll或是select來判斷都哪些socket的緩沖區里已經收到了客戶端寫來的數據。當某個socket的緩沖區里有可讀數據時,libevent會自動觸發一個“讀事件”,通過這個“讀事件”來調用相應的代碼來讀取socket緩沖區里的數據即可。換句話說,libevent自己調用select()或是epoll的函數來判斷哪個緩沖區可讀了,只要可讀了,就自動調用相應的處理程序。2)對於“寫事件”,libevent會監控某個socket的緩沖區是否可寫(一般情況下,只要緩沖區沒滿就可寫),只要可寫,就會觸發“寫事件”,通過“寫事件”來調用相應的函數,將數據寫到socket里。

以上兩個例子分別從“讀”和“寫”兩方面簡介了一下,可能不十分准確(但十分准確的描述往往會讓人看不懂)。

以下兩個鏈接關於libevent的剖析比較詳細,想學習libevent最好看一下。

  1)sparkliang的專欄        2)魚思故淵的專欄

=========關於libevent使用多線程的討論=========================

網上很多資料說libevent不支持多線程,也有很多人說libevent可以支持多線程。究竟值不支持呢?我的答案是:得看你的多線程是怎么寫的,如何跟libevent結合的。

1)可以肯定的是,libevent的信號事件是不支持多線程的(因為源碼里用了個全局變量)。可以看這篇文章(http://blog.csdn.net/sparkliang/article/details/5306809)。(注:libevent里有“超時事件”,“IO事件”,“信號事件”。)

2)對於不同的線程,使用不同的base,是可以的。

3)如果不同的線程使用相同的base呢?——如果在不同的線程里的事件都注冊到同一個base上,會有問題嗎?

  (http://www.cnblogs.com/zzyoucan/p/3970578.html)這篇博客里提到說,不行!即使加鎖也不行。我最近稍微看了部分源碼,我的答案是:不加鎖會有並發問題,但如果對每個event_add(),event_del()等這些操作event的動作都用同一個臨界變量來加鎖,應該是沒問題的。——貌似也有點問題,如果某個事件沒有用event_set()設置為EV_PERSIST,當事件發生時,會被自動刪除。有可能線程a在刪除事件的時候,線程b卻在添加事件,這樣還是會出現並發問題。最后的結論是——不行!

========本次實驗代碼邏輯的說明==========================

我采取的方案是對於不同的線程,使用不同的base。——即每個線程對應一個base,將線程里的事件注冊到線程的base上,而不是所有線程里的事件都用同一個base。

一 實驗需求描述:

  1)寫一個client和server程序。多個client可以同時連接一個server;

  2)client接收用戶在標准輸入的字符,發往server端;

  3)server端收到后,再把收到的數據處理一下,返回給client;

  4)client收到server返回的數據后,將其打印在終端上。

二 設計方案:

1. client:

  1)  client采用兩個線程,主線程接收用戶在終端上的輸入,並通過socket將用戶的輸入發往server。

  2)  派生一個子線程,接收server返回來的數據,如果收到數據,就打印出來。

2. server:

  在主線程里監聽client有沒有連接連過來,如果有,立馬accept出一個socket,並創建一個子線程,在子線程里接收client傳過來的數據,並對數據進行一些修改,然后將修改后的數據寫回到client端。

三 代碼實現

1. client代碼如下:

  1 #include <iostream>
  2 #include <sys/select.h>
  3 #include <sys/socket.h>
  4 #include <unistd.h>
  5 #include <pthread.h>
  6 #include <stdio.h>
  7 #include <stdlib.h>
  8 #include <sys/types.h>
  9 #include <netinet/in.h>
 10 #include <arpa/inet.h>
 11 #include <string>
 12 #include <string.h>
 13 #include <event.h>
 14 using namespace std;
 15 
 16 #define BUF_SIZE 1024
 17 
 18 /**
 19  * 連接到server端,如果成功,返回fd,如果失敗返回-1
 20  */
 21 int connectServer(char* ip, int port){
 22     int fd = socket( AF_INET, SOCK_STREAM, 0 );
 23     cout<<"fd= "<<fd<<endl;
 24     if(-1 == fd){
 25         cout<<"Error, connectServer() quit"<<endl;
 26         return -1;
 27     }
 28     struct sockaddr_in remote_addr; //服務器端網絡地址結構體
 29     memset(&remote_addr,0,sizeof(remote_addr)); //數據初始化--清零
 30     remote_addr.sin_family=AF_INET; //設置為IP通信
 31     remote_addr.sin_addr.s_addr=inet_addr(ip);//服務器IP地址
 32     remote_addr.sin_port=htons(port); //服務器端口號
 33     int con_result = connect(fd, (struct sockaddr*) &remote_addr, sizeof(struct sockaddr));
 34     if(con_result < 0){
 35         cout<<"Connect Error!"<<endl;
 36         close(fd);
 37         return -1;
 38     }
 39     cout<<"con_result="<<con_result<<endl;
 40     return fd;
 41 }
 42 
 43 void on_read(int sock, short event, void* arg)
 44 {
 45     char* buffer = new char[BUF_SIZE];
 46     memset(buffer, 0, sizeof(char)*BUF_SIZE);
 47     //--本來應該用while一直循環,但由於用了libevent,只在可以讀的時候才觸發on_read(),故不必用while了
 48     int size = read(sock, buffer, BUF_SIZE);
 49     if(0 == size){//說明socket關閉
 50         cout<<"read size is 0 for socket:"<<sock<<endl;
 51         struct event* read_ev = (struct event*)arg;
 52         if(NULL != read_ev){
 53             event_del(read_ev);
 54             free(read_ev);
 55         }
 56         close(sock);
 57         return;
 58     }
 59     cout<<"Received from server---"<<buffer<<endl;
 60     delete[]buffer;
 61 }
 62 
 63 void* init_read_event(void* arg){
 64     long long_sock = (long)arg;
 65     int sock = (int)long_sock;
 66     //-----初始化libevent,設置回調函數on_read()------------
 67     struct event_base* base = event_base_new();
 68     struct event* read_ev = (struct event*)malloc(sizeof(struct event));//發生讀事件后,從socket中取出數據
 69     event_set(read_ev, sock, EV_READ|EV_PERSIST, on_read, read_ev);
 70     event_base_set(base, read_ev);
 71     event_add(read_ev, NULL);
 72     event_base_dispatch(base);
 73     //--------------
 74     event_base_free(base);
 75 }
 76 /**
 77  * 創建一個新線程,在新線程里初始化libevent讀事件的相關設置,並開啟event_base_dispatch
 78  */
 79 void init_read_event_thread(int sock){
 80     pthread_t thread;
 81     pthread_create(&thread,NULL,init_read_event,(void*)sock);
 82     pthread_detach(thread);
 83 }
 84 int main() {
 85     cout << "main started" << endl; // prints Hello World!!!
 86     cout << "Please input server IP:"<<endl;
 87     char ip[16];
 88     cin >> ip;
 89     cout << "Please input port:"<<endl;
 90     int port;
 91     cin >> port;
 92     cout << "ServerIP is "<<ip<<" ,port="<<port<<endl;
 93     int socket_fd = connectServer(ip, port);
 94     cout << "socket_fd="<<socket_fd<<endl;
 95     init_read_event_thread(socket_fd);
 96     //--------------------------
 97     char buffer[BUF_SIZE];
 98     bool isBreak = false;
 99     while(!isBreak){
100         cout << "Input your data to server(\'q\' or \"quit\" to exit)"<<endl;
101         cin >> buffer;
102         if(strcmp("q", buffer)==0 || strcmp("quit", buffer)==0){
103             isBreak=true;
104             close(socket_fd);
105             break;
106         }
107         cout << "Your input is "<<buffer<<endl;
108         int write_num = write(socket_fd, buffer, strlen(buffer));
109         cout << write_num <<" characters written"<<endl;
110         sleep(2);
111     }
112     cout<<"main finished"<<endl;
113     return 0;
114 }
client端的代碼

  1)在main()里先調用init_read_event_thread()來生成一個子線程,子線程里調用init_read_event()來將socket的讀事件注冊到libevent的base上,並調用libevent的event_base_dispatch()不斷地進行輪詢。一旦socket可讀,libevent就調用“讀事件”上綁定的on_read()函數來讀取數據。

  2)在main()的主線程里,通過一個while循環來接收用戶從終端的輸入,並通過socket將用戶的輸入寫到server端。

-------------------------------------------------------------

2. server端代碼如下:

  1 #include <iostream>
  2 #include <sys/select.h>
  3 #include <sys/socket.h>
  4 #include <stdio.h>
  5 #include <unistd.h>
  6 #include <pthread.h>
  7 #include <stdio.h>
  8 #include <sys/types.h>
  9 #include <netinet/in.h>
 10 #include <arpa/inet.h>
 11 #include <string>
 12 #include <string.h>
 13 #include <event.h>
 14 #include <stdlib.h>
 15 using namespace std;
 16 
 17 #define SERVER_IP "127.0.0.1"
 18 #define SERVER_PORT 9090
 19 #define BUF_SIZE 1024
 20 
 21 struct sock_ev_write{//用戶寫事件完成后的銷毀,在on_write()中執行
 22     struct event* write_ev;
 23     char* buffer;
 24 };
 25 struct sock_ev {//用於讀事件終止(socket斷開)后的銷毀
 26     struct event_base* base;//因為socket斷掉后,讀事件的loop要終止,所以要有base指針
 27     struct event* read_ev;
 28 };
 29 
 30 /**
 31  * 銷毀寫事件用到的結構體
 32  */
 33 void destroy_sock_ev_write(struct sock_ev_write* sock_ev_write_struct){
 34     if(NULL != sock_ev_write_struct){
 35 //        event_del(sock_ev_write_struct->write_ev);//因為寫事件沒用EV_PERSIST,故不用event_del
 36         if(NULL != sock_ev_write_struct->write_ev){
 37             free(sock_ev_write_struct->write_ev);
 38         }
 39         if(NULL != sock_ev_write_struct->buffer){
 40             delete[]sock_ev_write_struct->buffer;
 41         }
 42         free(sock_ev_write_struct);
 43     }
 44 }
 45 
 46 
 47 /**
 48  * 讀事件結束后,用於銷毀相應的資源
 49  */
 50 void destroy_sock_ev(struct sock_ev* sock_ev_struct){
 51     if(NULL == sock_ev_struct){
 52         return;
 53     }
 54     event_del(sock_ev_struct->read_ev);
 55     event_base_loopexit(sock_ev_struct->base, NULL);//停止loop循環
 56     if(NULL != sock_ev_struct->read_ev){
 57         free(sock_ev_struct->read_ev);
 58     }
 59     event_base_free(sock_ev_struct->base);
 60 //    destroy_sock_ev_write(sock_ev_struct->sock_ev_write_struct);
 61     free(sock_ev_struct);
 62 }
 63 int getSocket(){
 64     int fd =socket( AF_INET, SOCK_STREAM, 0 );
 65     if(-1 == fd){
 66         cout<<"Error, fd is -1"<<endl;
 67     }
 68     return fd;
 69 }
 70 
 71 void on_write(int sock, short event, void* arg)
 72 {
 73     cout<<"on_write() called, sock="<<sock<<endl;
 74     if(NULL == arg){
 75         cout<<"Error! void* arg is NULL in on_write()"<<endl;
 76         return;
 77     }
 78     struct sock_ev_write* sock_ev_write_struct = (struct sock_ev_write*)arg;
 79 
 80     char buffer[BUF_SIZE];
 81     sprintf(buffer, "fd=%d, received[%s]", sock, sock_ev_write_struct->buffer);
 82 //    int write_num0 = write(sock, sock_ev_write_struct->buffer, strlen(sock_ev_write_struct->buffer));
 83 //    int write_num = write(sock, sock_ev_write_struct->buffer, strlen(sock_ev_write_struct->buffer));
 84     int write_num = write(sock, buffer, strlen(buffer));
 85     destroy_sock_ev_write(sock_ev_write_struct);
 86     cout<<"on_write() finished, sock="<<sock<<endl;
 87 }
 88 
 89 void on_read(int sock, short event, void* arg)
 90 {
 91     cout<<"on_read() called, sock="<<sock<<endl;
 92     if(NULL == arg){
 93         return;
 94     }
 95     struct sock_ev* event_struct = (struct sock_ev*) arg;//獲取傳進來的參數
 96     char* buffer = new char[BUF_SIZE];
 97     memset(buffer, 0, sizeof(char)*BUF_SIZE);
 98     //--本來應該用while一直循環,但由於用了libevent,只在可以讀的時候才觸發on_read(),故不必用while了
 99     int size = read(sock, buffer, BUF_SIZE);
100     if(0 == size){//說明socket關閉
101         cout<<"read size is 0 for socket:"<<sock<<endl;
102         destroy_sock_ev(event_struct);
103         close(sock);
104         return;
105     }
106     struct sock_ev_write* sock_ev_write_struct = (struct sock_ev_write*)malloc(sizeof(struct sock_ev_write));
107     sock_ev_write_struct->buffer = buffer;
108     struct event* write_ev = (struct event*)malloc(sizeof(struct event));//發生寫事件(也就是只要socket緩沖區可寫)時,就將反饋數據通過socket寫回客戶端
109     sock_ev_write_struct->write_ev = write_ev;
110     event_set(write_ev, sock, EV_WRITE, on_write, sock_ev_write_struct);
111     event_base_set(event_struct->base, write_ev);
112     event_add(write_ev, NULL);
113     cout<<"on_read() finished, sock="<<sock<<endl;
114 }
115 
116 
117 /**
118  * main執行accept()得到新socket_fd的時候,執行這個方法
119  * 創建一個新線程,在新線程里反饋給client收到的信息
120  */
121 void* process_in_new_thread_when_accepted(void* arg){
122     long long_fd = (long)arg;
123     int fd = (int)long_fd;
124     if(fd<0){
125         cout<<"process_in_new_thread_when_accepted() quit!"<<endl;
126         return 0;
127     }
128     //-------初始化base,寫事件和讀事件--------
129     struct event_base* base = event_base_new();
130     struct event* read_ev = (struct event*)malloc(sizeof(struct event));//發生讀事件后,從socket中取出數據
131 
132     //-------將base,read_ev,write_ev封裝到一個event_struct對象里,便於銷毀---------
133     struct sock_ev* event_struct = (struct sock_ev*)malloc(sizeof(struct sock_ev));
134     event_struct->base = base;
135     event_struct->read_ev = read_ev;
136     //-----對讀事件進行相應的設置------------
137     event_set(read_ev, fd, EV_READ|EV_PERSIST, on_read, event_struct);
138     event_base_set(base, read_ev);
139     event_add(read_ev, NULL);
140     //--------開始libevent的loop循環-----------
141     event_base_dispatch(base);
142     cout<<"event_base_dispatch() stopped for sock("<<fd<<")"<<" in process_in_new_thread_when_accepted()"<<endl;
143     return 0;
144 }
145 
146 /**
147  * 每當accept出一個新的socket_fd時,調用這個方法。
148  * 創建一個新線程,在新線程里與client做交互
149  */
150 void accept_new_thread(int sock){
151     pthread_t thread;
152     pthread_create(&thread,NULL,process_in_new_thread_when_accepted,(void*)sock);
153     pthread_detach(thread);
154 }
155 
156 /**
157  * 每當有新連接連到server時,就通過libevent調用此函數。
158  *    每個連接對應一個新線程
159  */
160 void on_accept(int sock, short event, void* arg)
161 {
162     struct sockaddr_in remote_addr;
163     int sin_size=sizeof(struct sockaddr_in);
164     int new_fd = accept(sock,  (struct sockaddr*) &remote_addr, (socklen_t*)&sin_size);
165     if(new_fd < 0){
166         cout<<"Accept error in on_accept()"<<endl;
167         return;
168     }
169     cout<<"new_fd accepted is "<<new_fd<<endl;
170     accept_new_thread(new_fd);
171     cout<<"on_accept() finished for fd="<<new_fd<<endl;
172 }
173 
174 int main(){
175     int fd = getSocket();
176     if(fd<0){
177         cout<<"Error in main(), fd<0"<<endl;
178     }
179     cout<<"main() fd="<<fd<<endl;
180     //----為服務器主線程綁定ip和port------------------------------
181     struct sockaddr_in local_addr; //服務器端網絡地址結構體
182     memset(&local_addr,0,sizeof(local_addr)); //數據初始化--清零
183     local_addr.sin_family=AF_INET; //設置為IP通信
184     local_addr.sin_addr.s_addr=inet_addr(SERVER_IP);//服務器IP地址
185     local_addr.sin_port=htons(SERVER_PORT); //服務器端口號
186     int bind_result = bind(fd, (struct sockaddr*) &local_addr, sizeof(struct sockaddr));
187     if(bind_result < 0){
188         cout<<"Bind Error in main()"<<endl;
189         return -1;
190     }
191     cout<<"bind_result="<<bind_result<<endl;
192     listen(fd, 10);
193     //-----設置libevent事件,每當socket出現可讀事件,就調用on_accept()------------
194     struct event_base* base = event_base_new();
195     struct event listen_ev;
196     event_set(&listen_ev, fd, EV_READ|EV_PERSIST, on_accept, NULL);
197     event_base_set(base, &listen_ev);
198     event_add(&listen_ev, NULL);
199     event_base_dispatch(base);
200     //------以下語句理論上是不會走到的---------------------------
201     cout<<"event_base_dispatch() in main() finished"<<endl;
202     //----銷毀資源-------------
203     event_del(&listen_ev);
204     event_base_free(base);
205     cout<<"main() finished"<<endl;
206 }
server端的代碼

  1)在main()里(運行在主線程中),先設置服務端的socket,然后為主線程生成一個libevent的base,並將一個“讀事件”注冊到base上。“讀事件”綁定了一個on_accept(),每當client有新連接連過來時,就會觸發這個“讀事件”,進而調用on_accept()方法。

  2)在on_accept()里(運行在主線程中),每當有新連接連過來時,就會accept出一個新的new_fd,並調用accept_new_thread()來創建一個新的子線程。子線程里會調用process_in_new_thread_when_accepted()方法。

  3)process_in_new_thread_when_accepted()方法里(運行在子線程中),創建一個子線程的base,並創建一個“讀事件”,注冊到“子線程的base”上。並調用event_base_dispatch(base)進入libevent的loop中。當發現new_fd的socket緩沖區中有數據可讀時,就觸發了這個“讀事件”,繼而調用on_read()方法。

  4)on_read()方法里(運行在子線程中),從socket緩沖區里讀取數據。讀完數據之后,將一個“寫事件”注冊到“子線程的base”上。一旦socket可寫,就調用on_write()函數。

  5)on_write()方法(運行在子線程中),對數據進行修改,然后通過socket寫回到client端。

  注:其實可以不用注冊“寫事件”——在on_read()方法中直接修改數據,然后寫回到client端也是可以的——但這有個問題。就是如果socket的寫緩沖區是滿的,那么這時候 write(sock, buffer, strlen(buffer))會阻塞的。這會導致整個on_read()方法阻塞掉,而無法讀到接下來client傳過來的數據了。而用了libevent的”寫事件“之后,雖然 write(sock, buffer, strlen(buffer))仍然會阻塞,但是只要socket緩沖區不可以寫就不會觸發這個“寫事件”,所以程序就不會阻塞,也就不會影響on_read()函數里的流程了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM