redis源碼筆記-ae.c


ae.c是redis事件框架的具體實現,這篇blog對這份源碼進行簡單說明。其中談到了作者已經標記的一些未來可能做的改進。

ae.c

  1 #include <stdio.h>
  2 #include <sys/time.h>
  3 #include <sys/types.h>
  4 #include <unistd.h>
  5 #include <stdlib.h>
  6 
  7 #include "ae.h"
  8 #include "zmalloc.h"
  9 #include "config.h"
 10 
 11 /* Include the best multiplexing layer supported by this system.
 12  * The following should be ordered by performances, descending. */
//為了支持不同的平台,redis用相同的接口封裝了系統提供的多路復用層代碼。接口共提供如下函數:aeApiCreate\aeApiFree\aeApiAddEvent\aeApiDelEvent\aeApiPoll\aeApiName函數,以及一個struct aeApiState
//通過包含不同的頭文件,選擇不同的底層實現
//按如下順序,效率遞減: epoll > kqueue > select 13 #ifdef HAVE_EPOLL 14 #include "ae_epoll.c" 15 #else 16 #ifdef HAVE_KQUEUE 17 #include "ae_kqueue.c" 18 #else 19 #include "ae_select.c" 20 #endif 21 #endif 22
//初始化函數,創建事件循環,函數內部alloc一個結構,用於表示事件狀態,供后續其他函數作為參數使用 23 aeEventLoop *aeCreateEventLoop(void) { 24 aeEventLoop *eventLoop; 25 int i; 26 27 eventLoop = zmalloc(sizeof(*eventLoop)); 28 if (!eventLoop) return NULL;
//時間event用鏈表存儲
29 eventLoop->timeEventHead = NULL; 30 eventLoop->timeEventNextId = 0;
//表示是否停止事件循環
31 eventLoop->stop = 0;
//maxfd只由ae_select.c使用,后續有些相關的處理,如果使用epoll的話,其實可以進行簡化
32 eventLoop->maxfd = -1;
//每次調用epoll\select前調用的函數,由框架使用者注冊
33 eventLoop->beforesleep = NULL; 34 if (aeApiCreate(eventLoop) == -1) { 35 zfree(eventLoop); 36 return NULL; 37 } 38 /* Events with mask == AE_NONE are not set. So let's initialize the 39 * vector with it. */

//將所有的文件描述符的mask設置為無效值,作為初始化 40 for (i = 0; i < AE_SETSIZE; i++) 41 eventLoop->events[i].mask = AE_NONE; 42 return eventLoop; 43 } 44
//底層實現執行釋放操作后,釋放state的內存 45 void aeDeleteEventLoop(aeEventLoop *eventLoop) { 46 aeApiFree(eventLoop); 47 zfree(eventLoop); 48 } 49
//停止事件循環,redis作為一個無限循環的server,在redis.c中並沒有任何一處調用此函數 50 void aeStop(aeEventLoop *eventLoop) { 51 eventLoop->stop = 1; 52 } 53
//創建文件fd事件,加入事件循環監控列表,使得后續epoll\select時將會測試這個文件描述符的可讀性(可寫性) 54 int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, 55 aeFileProc *proc, void *clientData) 56 { 57 if (fd >= AE_SETSIZE) return AE_ERR; 58 aeFileEvent *fe = &eventLoop->events[fd]; 59 60 if (aeApiAddEvent(eventLoop, fd, mask) == -1) 61 return AE_ERR; 62 fe->mask |= mask;
//注冊函數
63 if (mask & AE_READABLE) fe->rfileProc = proc; 64 if (mask & AE_WRITABLE) fe->wfileProc = proc; 65 fe->clientData = clientData;
//更新maxfd
66 if (fd > eventLoop->maxfd) 67 eventLoop->maxfd = fd; 68 return AE_OK; 69 } 70 71 void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) 72 { 73 if (fd >= AE_SETSIZE) return; 74 aeFileEvent *fe = &eventLoop->events[fd]; 75 76 if (fe->mask == AE_NONE) return; 77 fe->mask = fe->mask & (~mask); 78 if (fd == eventLoop->maxfd && fe->mask == AE_NONE) { 79 /* Update the max fd */ 80 int j; 81 82 for (j = eventLoop->maxfd-1; j >= 0; j--) 83 if (eventLoop->events[j].mask != AE_NONE) break; 84 eventLoop->maxfd = j; 85 } 86 aeApiDelEvent(eventLoop, fd, mask); 87 } 88
//返回值為該文件描述符關注的事件類型(可讀、可寫) 89 int aeGetFileEvents(aeEventLoop *eventLoop, int fd) { 90 if (fd >= AE_SETSIZE) return 0; 91 aeFileEvent *fe = &eventLoop->events[fd]; 92 93 return fe->mask; 94 } 95 96 static void aeGetTime(long *seconds, long *milliseconds) 97 { 98 struct timeval tv; 99 100 gettimeofday(&tv, NULL); 101 *seconds = tv.tv_sec; 102 *milliseconds = tv.tv_usec/1000; 103 } 104 105 static void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms) { 106 long cur_sec, cur_ms, when_sec, when_ms; 107 108 aeGetTime(&cur_sec, &cur_ms); 109 when_sec = cur_sec + milliseconds/1000; 110 when_ms = cur_ms + milliseconds%1000; 111 if (when_ms >= 1000) { 112 when_sec ++; 113 when_ms -= 1000; 114 } 115 *sec = when_sec; 116 *ms = when_ms; 117 } 118 119 long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, 120 aeTimeProc *proc, void *clientData, 121 aeEventFinalizerProc *finalizerProc) 122 { 123 long long id = eventLoop->timeEventNextId++; 124 aeTimeEvent *te; 125 126 te = zmalloc(sizeof(*te)); 127 if (te == NULL) return AE_ERR; 128 te->id = id;
//time event執行時間為絕對時間點
129 aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms); 130 te->timeProc = proc; 131 te->finalizerProc = finalizerProc; 132 te->clientData = clientData;
//time event鏈表為無序表,直接插入到鏈表頭
133 te->next = eventLoop->timeEventHead; 134 eventLoop->timeEventHead = te; 135 return id; 136 } 137
//從鏈表中刪除,是以id作為key查找的(順序遍歷) 138 int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id) 139 { 140 aeTimeEvent *te, *prev = NULL; 141 142 te = eventLoop->timeEventHead; 143 while(te) { 144 if (te->id == id) { 145 if (prev == NULL) 146 eventLoop->timeEventHead = te->next; 147 else 148 prev->next = te->next; 149 if (te->finalizerProc) 150 te->finalizerProc(eventLoop, te->clientData); 151 zfree(te); 152 return AE_OK; 153 } 154 prev = te; 155 te = te->next; 156 } 157 return AE_ERR; /* NO event with the specified ID found */ 158 } 159 160 /* Search the first timer to fire. 161 * This operation is useful to know how many time the select can be 162 * put in sleep without to delay any event. (沒大看懂),也許是給一個阻塞時間的上限值 163 * If there are no timers NULL is returned. 164 * 165 * Note that's O(N) since time events are unsorted. 166 * Possible optimizations (not needed by Redis so far, but...): 167 * 1) Insert the event in order, so that the nearest is just the head. 168 * Much better but still insertion or deletion of timers is O(N). 169 * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)). 170 */ 171 static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop) 172 { 173 aeTimeEvent *te = eventLoop->timeEventHead; 174 aeTimeEvent *nearest = NULL; 175 176 while(te) { 177 if (!nearest || te->when_sec < nearest->when_sec || 178 (te->when_sec == nearest->when_sec && 179 te->when_ms < nearest->when_ms)) 180 nearest = te; 181 te = te->next; 182 } 183 return nearest; 184 } 185
//對time event進行處理 186 /* Process time events */ 187 static int processTimeEvents(aeEventLoop *eventLoop) {
//處理的事件數
188 int processed = 0; 189 aeTimeEvent *te; 190 long long maxId; 191 192 te = eventLoop->timeEventHead; 193 maxId = eventLoop->timeEventNextId-1; 194 while(te) { 195 long now_sec, now_ms; 196 long long id; 197 198 if (te->id > maxId) { 199 te = te->next; 200 continue; 201 } 202 aeGetTime(&now_sec, &now_ms); 203 if (now_sec > te->when_sec || 204 (now_sec == te->when_sec && now_ms >= te->when_ms)) 205 { 206 int retval; 207 208 id = te->id;
//如果執行時間到或已超出,則執行對應的時間處理函數
209 retval = te->timeProc(eventLoop, id, te->clientData); 210 processed++; 211 /* After an event is processed our time event list may 212 * no longer be the same, so we restart from head. //因為時間處理函數timeProc可能改變此鏈表 213 * Still we make sure to don't process events registered 214 * by event handlers itself in order to don't loop forever.? 215 * To do so we saved the max ID we want to handle. 216 * 217 * FUTURE OPTIMIZATIONS: 218 * Note that this is NOT great algorithmically. Redis uses 219 * a single time event so it's not a problem but the right 220 * way to do this is to add the new elements on head, and 221 * to flag deleted elements in a special way for later 222 * deletion (putting references to the nodes to delete into 223 * another linked list). */

//如果這個時間處理函數不再繼續執行,則從time event的鏈表中刪除事件;否則,retval為繼續執行的時間間隔(單位為ms),在當前的timeevent struct的時間值上進行增加 224 if (retval != AE_NOMORE) { 225 aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms); 226 } else { 227 aeDeleteTimeEvent(eventLoop, id); 228 } 229 te = eventLoop->timeEventHead; 230 } else { 231 te = te->next; 232 } 233 } 234 return processed; 235 } 236 237 /* Process every pending(懸而未決) time event, then every pending file event 238 * (that may be registered by time event callbacks just processed). 239 * Without special flags the function sleeps until some file event 240 * fires, or when the next time event occurrs (if any). 241 * 242 * If flags is 0, the function does nothing and returns. 243 * if flags has AE_ALL_EVENTS set, all the kind of events are processed. 244 * if flags has AE_FILE_EVENTS set, file events are processed. 245 * if flags has AE_TIME_EVENTS set, time events are processed. 246 * if flags has AE_DONT_WAIT set the function returns ASAP until all 247 * the events that's possible to process without to wait are processed. 248 * 249 * The function returns the number of events processed. */ 250 int aeProcessEvents(aeEventLoop *eventLoop, int flags) 251 { 252 int processed = 0, numevents; 253 254 /* Nothing to do? return ASAP */ 255 if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; 256 257 /* Note that we want call select() even if there are no 258 * file events to process as long as we want to process time 259 * events, in order to sleep until the next time event is ready 260 * to fire. */ 261 if (eventLoop->maxfd != -1 || 262 ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { 263 int j; 264 aeTimeEvent *shortest = NULL; 265 struct timeval tv, *tvp; 266 267 if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) 268 shortest = aeSearchNearestTimer(eventLoop); 269 if (shortest) { 270 long now_sec, now_ms; 271 272 /* Calculate the time missing for the nearest 273 * timer to fire. */ 274 aeGetTime(&now_sec, &now_ms); 275 tvp = &tv; 276 tvp->tv_sec = shortest->when_sec - now_sec; 277 if (shortest->when_ms < now_ms) { 278 tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000; 279 tvp->tv_sec --; 280 } else { 281 tvp->tv_usec = (shortest->when_ms - now_ms)*1000; 282 } 283 if (tvp->tv_sec < 0) tvp->tv_sec = 0; 284 if (tvp->tv_usec < 0) tvp->tv_usec = 0; 疑似有bug,比如當前時間為 1s +2ms ,shortest時間為0s+1ms的case 285 } else { 286 /* If we have to check for events but need to return 287 * ASAP because of AE_DONT_WAIT we need to se the timeout 288 * to zero */ 289 if (flags & AE_DONT_WAIT) { 290 tv.tv_sec = tv.tv_usec = 0; 291 tvp = &tv; 292 } else { 293 /* Otherwise we can block */ 294 tvp = NULL; /* wait forever */ 295 } 296 } 297
//指定這個tvp是說這個tvp到期時,至少由time event可以執行 298 numevents = aeApiPoll(eventLoop, tvp); 299 for (j = 0; j < numevents; j++) { 300 aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; 301 int mask = eventLoop->fired[j].mask; 302 int fd = eventLoop->fired[j].fd; 303 int rfired = 0; 304 305 /* note the fe->mask & mask & ... code: maybe an already processed 306 * event removed an element that fired and we still didn't 307 * processed, so we check if the event is still valid. */ 308 if (fe->mask & mask & AE_READABLE) { 309 rfired = 1; 310 fe->rfileProc(eventLoop,fd,fe->clientData,mask); 311 }
//避免同一個注冊函數被調用兩次
312 if (fe->mask & mask & AE_WRITABLE) { 313 if (!rfired || fe->wfileProc != fe->rfileProc) 314 fe->wfileProc(eventLoop,fd,fe->clientData,mask); 315 } 316 processed++; 317 } 318 } 319 /* Check time events */ 320 if (flags & AE_TIME_EVENTS) 321 processed += processTimeEvents(eventLoop); 322 323 return processed; /* return the number of processed file/time events */ 324 } 325
//對select的一個簡單封裝,沒有太大意義 326 /* Wait for millseconds until the given file descriptor becomes 327 * writable/readable/exception */ 328 int aeWait(int fd, int mask, long long milliseconds) { 329 struct timeval tv; 330 fd_set rfds, wfds, efds; 331 int retmask = 0, retval; 332 333 tv.tv_sec = milliseconds/1000; 334 tv.tv_usec = (milliseconds%1000)*1000; 335 FD_ZERO(&rfds); 336 FD_ZERO(&wfds); 337 FD_ZERO(&efds); 338 339 if (mask & AE_READABLE) FD_SET(fd,&rfds); 340 if (mask & AE_WRITABLE) FD_SET(fd,&wfds); 341 if ((retval = select(fd+1, &rfds, &wfds, &efds, &tv)) > 0) { 342 if (FD_ISSET(fd,&rfds)) retmask |= AE_READABLE; 343 if (FD_ISSET(fd,&wfds)) retmask |= AE_WRITABLE; 344 return retmask; 345 } else { 346 return retval; 347 } 348 } 349 350 void aeMain(aeEventLoop *eventLoop) { 351 eventLoop->stop = 0; 352 while (!eventLoop->stop) { 353 if (eventLoop->beforesleep != NULL) 354 eventLoop->beforesleep(eventLoop); 355 aeProcessEvents(eventLoop, AE_ALL_EVENTS); 356 } 357 } 358 359 char *aeGetApiName(void) { 360 return aeApiName(); 361 } 362 363 void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) { 364 eventLoop->beforesleep = beforesleep; 365 }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM