以前的文章看過緩沖區buffer了,libevent用bufferevent來負責管理緩沖區與buffer讀寫事件。
今天就帶大家看下evbuffer.c,使用bufferevent處理事件的數據,是buffer和event的綜合。在最后用一個稍微綜合的例子看下使用bufferevent的整個流程。
首先依舊看下bufferevent的結構。結構很清晰。源碼版本1.4.14。
1 struct bufferevent { 2 struct event_base *ev_base; 3 4 //讀事件 5 struct event ev_read; 6 //寫事件 7 struct event ev_write; 8 //讀緩沖區,輸入緩沖 9 struct evbuffer *input; 10 //寫緩沖區,輸出緩沖 11 struct evbuffer *output; 12 13 //讀水位 14 struct event_watermark wm_read; 15 //寫水位 16 struct event_watermark wm_write; 17 18 //發生讀觸發用戶設置的回調 19 evbuffercb readcb; 20 //發生寫觸發用戶設置的回調 21 evbuffercb writecb; 22 //發生錯誤觸發用戶設置的回調 23 everrorcb errorcb; 24 //當前設置的回調函數傳遞的參數,和上面3個回調配合使用 25 void *cbarg; 26 27 //設置讀超時時間,默認為0 28 int timeout_read; /* in seconds */ 29 //設置寫超時時間,默認為0 30 int timeout_write; /* in seconds */ 31 32 //當前事件是否可用 33 short enabled; /* events that are currently enabled */ 34 }; 35 //水位 36 struct event_watermark { 37 //低水位 38 size_t low; 39 //高水位 40 size_t high; 41 };
evbuffer中有2個緩沖區,一個是讀緩沖區,一個寫緩沖區。分別用來處理讀寫事件的數據。
evbuffer中有讀水位和寫水位,分別對應了讀緩沖區和寫緩沖區。
里面有個水位的概念。其實很好理解。水位有一個高水位,一個低水位。
如果水位達到高水位時,不能再往里面灌水了。如果水位達到低水位,不能再從中取水了。
讀操作發生時:如果高於高水位,那就不能再讀入數據了,等待數據被讀掉然后再開始讀入數據。低水位只做判斷。低水位不為0,如果緩沖區低於低水位,可以繼續直接讀數據到緩沖區。
寫操作發生時:如果寫緩沖區數據長度小於等於低水位,觸發用戶寫事件,通知用戶。寫數據高水位沒用。因為寫數據是把緩沖區的數據讀出寫到對應的文件描述符中,所以水位肯定是下降的。
我的理解:水位控制了信息的顆粒度,多少數據觸發次用戶事件。數據緩沖區降低了頻繁申請內存帶來的開銷。
接着我們來看evbuffer.c中最重要的幾個函數
1.bufferevent_new
進行一些初始化。最重要的是指定了eventbuffer內部讀寫事件的回調,bufferevent_readcb與bufferevent_writecb。當前也可以通過后面的bufferevent_setcb實現。
1 struct bufferevent * 2 bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb, 3 everrorcb errorcb, void *cbarg) 4 { 5 struct bufferevent *bufev; 6 7 //申請內存空間並且初始化,使用calloc 8 if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL) 9 return (NULL); 10 11 if ((bufev->input = evbuffer_new()) == NULL) { 12 free(bufev); 13 return (NULL); 14 } 15 16 if ((bufev->output = evbuffer_new()) == NULL) { 17 evbuffer_free(bufev->input); 18 free(bufev); 19 return (NULL); 20 } 21 //讀事件關聯回調,傳遞參數 22 event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev); 23 24 //寫事件關聯回調,傳遞參數 25 event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev); 26 27 //設置bufferevent的讀、寫和出錯事件回調,並且傳遞cbarg參數。 28 bufferevent_setcb(bufev, readcb, writecb, errorcb, cbarg); 29 30 /* 31 * Set to EV_WRITE so that using bufferevent_write is going to 32 * trigger a callback. Reading needs to be explicitly enabled 33 * because otherwise no data will be available. 34 */ 35 //開啟可寫,否則無法執行寫入回調 36 bufev->enabled = EV_WRITE; 37 38 return (bufev); 39 }
2.bufferevent_readcb
讀事件,最先接觸到數據,讀出數據然后寫入緩沖區
首先看下bufferevent_readcb的流程圖
1 //讀事件,最先接觸到數據,讀出數據然后寫入緩沖區 2 static void 3 bufferevent_readcb(int fd, short event, void *arg) 4 { 5 struct bufferevent *bufev = arg; 6 int res = 0; 7 short what = EVBUFFER_READ; 8 size_t len; 9 int howmuch = -1; 10 //超時事件,報錯 11 if (event == EV_TIMEOUT) { 12 what |= EVBUFFER_TIMEOUT; 13 goto error; 14 } 15 16 /* 17 * If we have a high watermark configured then we don't want to 18 * read more data than would make us reach the watermark. 19 */ 20 //查看高水位,如果緩沖區數據已經高於高水位,不應該再寫入。 21 if (bufev->wm_read.high != 0) { 22 howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(bufev->input); 23 /* we might have lowered the watermark, stop reading */ 24 if (howmuch <= 0) { 25 struct evbuffer *buf = bufev->input; 26 //達到高水位,刪除讀入事件,不再讀入數據到緩沖區 27 event_del(&bufev->ev_read); 28 //設置bufev->input變化需要調用的回調函數和回調參數 29 evbuffer_setcb(buf, 30 bufferevent_read_pressure_cb, bufev); 31 return; 32 } 33 } 34 //沒達到高水位,讀取數據到input緩沖區中 35 res = evbuffer_read(bufev->input, fd, howmuch); 36 if (res == -1) { 37 //信號中斷等一些原因,goto reschedule,可以繼續。 38 if (errno == EAGAIN || errno == EINTR) 39 goto reschedule; 40 /* error case */ 41 what |= EVBUFFER_ERROR; 42 } else if (res == 0) { 43 /* eof case */ 44 what |= EVBUFFER_EOF; 45 } 46 47 if (res <= 0) 48 goto error; 49 //讀事件加入事件隊列 50 bufferevent_add(&bufev->ev_read, bufev->timeout_read); 51 52 /* See if this callbacks meets the water marks */ 53 len = EVBUFFER_LENGTH(bufev->input); 54 if (bufev->wm_read.low != 0 && len < bufev->wm_read.low) 55 return; 56 //如果高水位不為0,並且緩沖區數據長度已經不小於高水位了,觸發事件。 57 if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) { 58 //緩沖區數據已經不小於高水位,不能再進數據了,刪除讀緩沖區的讀外部數據事件 59 struct evbuffer *buf = bufev->input; 60 event_del(&bufev->ev_read); 61 62 /* Now schedule a callback for us when the buffer changes */ 63 //緩沖區大小發生變化,觸發回調 64 //設置回調函數和回調參數 65 evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev); 66 } 67 68 /* Invoke the user callback - must always be called last */ 69 //觸發用戶回調事件 70 if (bufev->readcb != NULL) 71 (*bufev->readcb)(bufev, bufev->cbarg); 72 return; 73 74 reschedule: 75 //讀事件加入事件隊列,繼續進行讀取 76 bufferevent_add(&bufev->ev_read, bufev->timeout_read); 77 return; 78 79 error: 80 (*bufev->errorcb)(bufev, what, bufev->cbarg); 81 }
3.bufferevent_writecb
寫事件
1 static void 2 bufferevent_writecb(int fd, short event, void *arg) 3 { 4 //事件緩沖區管理 5 struct bufferevent *bufev = arg; 6 int res = 0; 7 short what = EVBUFFER_WRITE; 8 9 //超時事件,報錯 10 if (event == EV_TIMEOUT) { 11 what |= EVBUFFER_TIMEOUT; 12 goto error; 13 } 14 15 if (EVBUFFER_LENGTH(bufev->output)) { 16 //將緩沖區數據讀出,寫入到fd文件描述符對應的文件中 17 res = evbuffer_write(bufev->output, fd); 18 if (res == -1) { 19 #ifndef WIN32 20 /*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not 21 *set errno. thus this error checking is not portable*/ 22 if (errno == EAGAIN || 23 errno == EINTR || 24 errno == EINPROGRESS) 25 goto reschedule; 26 /* error case */ 27 what |= EVBUFFER_ERROR; 28 29 #else 30 goto reschedule; 31 #endif 32 33 } else if (res == 0) { 34 /* eof case */ 35 what |= EVBUFFER_EOF; 36 } 37 if (res <= 0) 38 goto error; 39 } 40 //緩沖區不為0,寫事件加入執行隊列 41 if (EVBUFFER_LENGTH(bufev->output) != 0) 42 bufferevent_add(&bufev->ev_write, bufev->timeout_write); 43 44 /* 45 * Invoke the user callback if our buffer is drained or below the 46 * low watermark. 47 */ 48 //緩沖區數據長度低於低水位,用戶寫事件觸發。 49 if (bufev->writecb != NULL && 50 EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low) 51 (*bufev->writecb)(bufev, bufev->cbarg); 52 return; 53 54 reschedule: 55 if (EVBUFFER_LENGTH(bufev->output) != 0) 56 bufferevent_add(&bufev->ev_write, bufev->timeout_write); 57 return; 58 59 error: 60 (*bufev->errorcb)(bufev, what, bufev->cbarg); 61 }
示例
下面看一個改造過的服務器和客戶端的例子。(當前你可以直接使用test中的regress.c例子,我這邊因為libevent本來就是用來解決網絡問題的,所以自己就用了這個例子)
server.c
我的編譯命令:gcc -g -Wall -I/usr/local/include -o server server.c -L/usr/local/lib -levent
服務端監聽所有socket。端口5555。這里我們為了演示:evbuffer讀緩沖區對應水位設置為高水位10,低水位0。
1 /* 2 * libevent echo server example using buffered events. 3 */ 4 5 #include <sys/types.h> 6 #include <sys/socket.h> 7 #include <netinet/in.h> 8 #include <arpa/inet.h> 9 10 /* Required by event.h. */ 11 #include <sys/time.h> 12 13 #include <stdlib.h> 14 #include <stdio.h> 15 #include <string.h> 16 #include <fcntl.h> 17 #include <unistd.h> 18 #include <errno.h> 19 #include <err.h> 20 21 /* Libevent. */ 22 #include <event.h> 23 24 /* Port to listen on. */ 25 #define SERVER_PORT 5555 26 27 /** 28 * A struct for client specific data, also includes pointer to create 29 * a list of clients. 30 */ 31 struct client { 32 /* The clients socket. */ 33 int fd; 34 35 /* The bufferedevent for this client. */ 36 struct bufferevent *buf_ev; 37 }; 38 39 /** 40 * Set a socket to non-blocking mode. 41 */ 42 //用於設置非阻塞 43 int 44 setnonblock(int fd) 45 { 46 int flags; 47 48 flags = fcntl(fd, F_GETFL); 49 if (flags < 0) 50 return flags; 51 flags |= O_NONBLOCK; 52 if (fcntl(fd, F_SETFL, flags) < 0) 53 return -1; 54 55 return 0; 56 } 57 58 /** 59 * Called by libevent when there is data to read. 60 */ 61 void 62 buffered_on_read(struct bufferevent *bev, void *arg) 63 { 64 /* Write back the read buffer. It is important to note that 65 * bufferevent_write_buffer will drain the incoming data so it 66 * is effectively gone after we call it. */ 67 char msg[4096]; 68 69 size_t len = bufferevent_read(bev, msg, sizeof(msg)); 70 71 msg[len] = '\0'; 72 printf("recv the client msg: %s\n", msg); 73 74 char reply_msg[4096] = "I have recvieced the msg: "; 75 strcat(reply_msg + strlen(reply_msg), msg); 76 bufferevent_write(bev, reply_msg, strlen(reply_msg)); 77 78 } 79 80 /** 81 * Called by libevent when the write buffer reaches 0. We only 82 * provide this because libevent expects it, but we don't use it. 83 */ 84 //當寫緩沖區達到低水位時觸發調用,我們這邊不用 85 void 86 buffered_on_write(struct bufferevent *bev, void *arg) 87 { 88 89 } 90 91 /** 92 * Called by libevent when there is an error on the underlying socket 93 * descriptor. 94 */ 95 void 96 buffered_on_error(struct bufferevent *bev, short what, void *arg) 97 { 98 struct client *client = (struct client *)arg; 99 100 if (what & EVBUFFER_EOF) { 101 /* Client disconnected, remove the read event and the 102 * free the client structure. */ 103 printf("Client disconnected.\n"); 104 } 105 else { 106 warn("Client socket error, disconnecting.\n"); 107 } 108 bufferevent_free(client->buf_ev); 109 close(client->fd); 110 free(client); 111 } 112 113 /** 114 * This function will be called by libevent when there is a connection 115 * ready to be accepted. 116 */ 117 void 118 on_accept(int fd, short ev, void *arg) 119 { 120 int client_fd; 121 struct sockaddr_in client_addr; 122 socklen_t client_len = sizeof(client_addr); 123 struct client *client; 124 125 client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len); 126 if (client_fd < 0) { 127 warn("accept failed"); 128 return; 129 } 130 131 /* Set the client socket to non-blocking mode. */ 132 if (setnonblock(client_fd) < 0) 133 warn("failed to set client socket non-blocking"); 134 135 /* We've accepted a new client, create a client object. */ 136 client = calloc(1, sizeof(*client)); 137 if (client == NULL) 138 err(1, "malloc failed"); 139 client->fd = client_fd; 140 141 /* Create the buffered event. 142 * 143 * The first argument is the file descriptor that will trigger 144 * the events, in this case the clients socket. 145 * 146 * The second argument is the callback that will be called 147 * when data has been read from the socket and is available to 148 * the application. 149 * 150 * The third argument is a callback to a function that will be 151 * called when the write buffer has reached a low watermark. 152 * That usually means that when the write buffer is 0 length, 153 * this callback will be called. It must be defined, but you 154 * don't actually have to do anything in this callback. 155 * 156 * The fourth argument is a callback that will be called when 157 * there is a socket error. This is where you will detect 158 * that the client disconnected or other socket errors. 159 * 160 * The fifth and final argument is to store an argument in 161 * that will be passed to the callbacks. We store the client 162 * object here. 163 */ 164 client->buf_ev = bufferevent_new(client_fd, buffered_on_read, 165 buffered_on_write, buffered_on_error, client); 166 client->buf_ev->wm_read.high = 10; 167 client->buf_ev->wm_read.low = 0; 168 /* We have to enable it before our callbacks will be 169 * called. */ 170 bufferevent_enable(client->buf_ev, EV_READ); 171 172 printf("Accepted connection from %s\n", 173 inet_ntoa(client_addr.sin_addr)); 174 } 175 176 int 177 main(int argc, char **argv) 178 { 179 int listen_fd; 180 struct sockaddr_in listen_addr; 181 struct event ev_accept; 182 int reuseaddr_on; 183 184 /* Initialize libevent. */ 185 event_init(); 186 187 /* Create our listening socket. */ 188 listen_fd = socket(AF_INET, SOCK_STREAM, 0); 189 if (listen_fd < 0) 190 err(1, "listen failed"); 191 memset(&listen_addr, 0, sizeof(listen_addr)); 192 listen_addr.sin_family = AF_INET; 193 listen_addr.sin_addr.s_addr = INADDR_ANY; 194 listen_addr.sin_port = htons(SERVER_PORT); 195 if (bind(listen_fd, (struct sockaddr *)&listen_addr, 196 sizeof(listen_addr)) < 0) 197 err(1, "bind failed"); 198 if (listen(listen_fd, 5) < 0) 199 err(1, "listen failed"); 200 reuseaddr_on = 1; 201 setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr_on, 202 sizeof(reuseaddr_on)); 203 204 /* Set the socket to non-blocking, this is essential in event 205 * based programming with libevent. */ 206 if (setnonblock(listen_fd) < 0) 207 err(1, "failed to set server socket to non-blocking"); 208 209 /* We now have a listening socket, we create a read event to 210 * be notified when a client connects. */ 211 event_set(&ev_accept, listen_fd, EV_READ | EV_PERSIST, on_accept, NULL); 212 event_add(&ev_accept, NULL); 213 214 /* Start the event loop. */ 215 event_dispatch(); 216 217 return 0; 218 }
client.c
讀鍵盤輸入,發送到服務端,服務端再返回,客戶端回顯。
gcc -g -Wall -I/usr/local/include -o client client.c -L/usr/local/lib -levent
1 #include<sys/types.h> 2 #include<sys/socket.h> 3 #include<netinet/in.h> 4 #include<arpa/inet.h> 5 #include<errno.h> 6 #include<unistd.h> 7 8 #include <stdlib.h> 9 #include <stdio.h> 10 #include <string.h> 11 #include <fcntl.h> 12 #include <err.h> 13 14 #include<event.h> 15 16 #define SERVER_PORT 5555 17 18 19 //服務端信息 20 struct server { 21 /* The server socket. */ 22 int fd; 23 24 /* The bufferedevent for this server. */ 25 struct bufferevent *buf_ev; 26 }; 27 28 //全局server數據 29 struct server *serv; 30 31 //設置文件狀態標記 32 int setnonblock(int fd) 33 { 34 int flags; 35 flags = fcntl(fd, F_GETFL); 36 if (flags < 0) 37 return flags; 38 flags |= O_NONBLOCK; 39 if (fcntl(fd, F_SETFL, flags) < 0) 40 return -1; 41 return 0; 42 } 43 44 //鍵盤事件 45 void cmd_msg_cb(int fd, short events, void* arg) 46 { 47 printf("cmd_msg_cb\n"); 48 char msg[1024]; 49 50 int ret = read(fd, msg, sizeof(msg)); 51 if (ret < 0) 52 { 53 perror("read fail "); 54 exit(1); 55 } 56 struct bufferevent* bev = (struct bufferevent*)arg; 57 //把終端的消息發送給服務器端 58 bufferevent_write(bev, msg, ret); 59 } 60 61 //讀服務端發來的數據 62 void read_msg_cb(struct bufferevent* bev, void* arg) 63 { 64 printf("read_msg_cb\n"); 65 char msg[1024]; 66 67 size_t len = bufferevent_read(bev, msg, sizeof(msg)); 68 msg[len] = '\0'; 69 printf("recv %s from server", msg); 70 } 71 72 //連接斷開或者出錯回調 73 void event_error(struct bufferevent *bev, short event, void *arg) 74 { 75 printf("event_error\n"); 76 if (event & EVBUFFER_EOF) 77 printf("connection closed\n"); 78 else if (event & EVBUFFER_ERROR) 79 printf("some other error\n"); 80 struct event *ev = (struct event*)arg; 81 //因為socket已經沒有,所以這個event也沒有存在的必要了 82 free(ev); 83 //當發生錯誤退出事件循環 84 event_loopexit(0); 85 bufferevent_free(bev); 86 } 87 88 //連接到server 89 typedef struct sockaddr SA; 90 int tcp_connect_server(const char* server_ip, int port) 91 { 92 int sockfd, status, save_errno; 93 struct sockaddr_in server_addr; 94 95 memset(&server_addr, 0, sizeof(server_addr)); 96 97 server_addr.sin_family = AF_INET; 98 server_addr.sin_port = htons(port); 99 status = inet_aton(server_ip, &server_addr.sin_addr); 100 101 if (status == 0) //the server_ip is not valid value 102 { 103 errno = EINVAL; 104 return -1; 105 } 106 107 sockfd = socket(AF_INET, SOCK_STREAM, 0); 108 if (sockfd == -1) 109 return sockfd; 110 status = connect(sockfd, (SA*)&server_addr, sizeof(server_addr)); 111 112 if (status == -1) 113 { 114 save_errno = errno; 115 close(sockfd); 116 errno = save_errno; //the close may be error 117 return -1; 118 } 119 120 setnonblock(sockfd); 121 122 return sockfd; 123 } 124 125 126 int main(int argc, char** argv) 127 { 128 129 event_init(); 130 //測試用直接連接本地server 131 int sockfd = tcp_connect_server("127.0.0.1", SERVER_PORT); 132 if (sockfd == -1) 133 { 134 perror("tcp_connect error "); 135 return -1; 136 } 137 138 printf("connect to server successful\n"); 139 serv = calloc(1, sizeof(*serv)); 140 if (serv == NULL) 141 err(1, "malloc failed"); 142 serv->fd = sockfd; 143 serv->buf_ev = bufferevent_new(sockfd, read_msg_cb, 144 NULL, NULL, (void *)serv); 145 146 //監聽終端輸入事件 147 struct event *ev_cmd = calloc(1,sizeof(*ev_cmd)); 148 event_set(ev_cmd, STDIN_FILENO, 149 EV_READ | EV_PERSIST, cmd_msg_cb, 150 (void*)serv->buf_ev); 151 event_add(ev_cmd, NULL); 152 //設置下read和發生錯誤的回調函數。(當socket關閉時會用到回調參數,刪除鍵盤事件) 153 bufferevent_setcb(serv->buf_ev, read_msg_cb, NULL, event_error, (void*)ev_cmd); 154 bufferevent_enable(serv->buf_ev, EV_READ| EV_PERSIST); 155 event_dispatch(); 156 printf("finished \n"); 157 return 0; 158 }
過程
1.運行 ./server
2.運行./client
3.服務端顯示連接成功
4.鍵入abcdefghijklmn回車
5.服務器接收到數據
由於讀緩沖區高水位為10,低水位為0。所以接到abcdefghij后出發用戶事件讀掉緩沖區數據,然后再讀klmn回車。多空一行是鍵盤輸入的回車也讀到了。
6.客戶端回顯
7.在服務端終端中按下ctrl+c
8.客戶端如下
測試了client.c中加入的event_error。event_error執行退出事件循環。