Redis文件事件處理器


一、Redis 文件事件處理器由四個部分組成:套接字、I/O多路復用程序、文件時間分派器(dispatcher)、事件處理器。

文件事件是對套接字操作的抽象,每當一個套接字准備好執行連接應答(accept)、寫入(write)、讀取(read)、關閉(close)等操作時,就會相應產生一個文件事件。

I/O多路復用器負責通過loop循環監聽多個套接字,同時將一系列套接字按循序存儲到一個隊列中,由隊列向文件事件分派器傳送隊列中套接字。這個隊列中套接字是有序的,它會當一個套接字事件被處理完畢后,會立馬向文件事件分配器傳送下一個套接字。

文件事件分配器接受隊列中的套接字並根據套接字產生的事件類型,相應調用不同的事件處理器。

 

圖1  Redis 文件事件處理器過程 

 

 圖2  I/O多路復用程序通過隊列向文件事件分派器傳送套接字 

 

 

圖3 Redis I/O 多路復用調用的多路復用庫

 

 

二、在 Redis 的事件處理器中,服務器中最常用有:

(1)連接應答處理器      (2)命令請求處理器     (3)命令恢復處理器

 

(1)在 networking.c 文件中 acceptTcpHandler 函數實現了 Redis 的連接應答處理器,用於對連接服務器進行監聽,對套接字的客戶端進行應答相應。

#define MAX_ACCEPTS_PER_CALL 1000
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd, max = MAX_ACCEPTS_PER_CALL; char cip[NET_IP_STR_LEN]; UNUSED(el); UNUSED(mask); UNUSED(privdata);
  //循環處理連接應答
while(max--) { cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); if (cfd == ANET_ERR) { if (errno != EWOULDBLOCK) serverLog(LL_WARNING, "Accepting client connection: %s", server.neterr); return; } serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip); } }

 

在Redis 服務器進行初始化的時候,程序會將這個連接應答處理器和服務器監聽套接字的 AE_READABLE 事件關聯起來。當有客戶端用 sys/socket.h/connect 函數連接服務器監聽套接字的時候,套接字就會產生 AE_READABLE 事件,觸發連接應答處理器執行相應的套接字應答操作。

// 在套接字fd 上打開一個連接,並連接到 const的sockaddr,socklen_t_len 代表着字節長度。
// 對於無連接套接字類型的,只需設置默認地址發送以及接受地址,成功會返回0,錯誤會返回-1。
extern int connect (int __fd, __CONST_SOCKADDR_ARG __addr, socklen_t __len);

 圖4 客戶端和服務器之間進行連接請求應答

 

(2)通過 networking.c/readQueryFromClient 這個函數,命令請求處理器負責將套接字讀入客戶端並發送指令內容,相關實現代碼在 unistd.d/read 函數中。

void readQueryFromClient(connection *conn) {
    client *c = connGetPrivateData(conn);
    int nread, readlen;
    size_t qblen;
    if (postponeClientRead(c)) return;

    readlen = PROTO_IOBUF_LEN;
  
    if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= PROTO_MBULK_BIG_ARG)
    {
        ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
        if (remaining > 0 && remaining < readlen) readlen = remaining;
    }

    qblen = sdslen(c->querybuf);
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    
nread
= connRead(c->conn, c->querybuf+qblen, readlen); if (nread == -1) { if (connGetState(conn) == CONN_STATE_CONNECTED) { return; } else { serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn)); freeClientAsync(c); return; } } else if (nread == 0) { serverLog(LL_VERBOSE, "Client closed connection"); freeClientAsync(c); return; } else if (c->flags & CLIENT_MASTER) { c->pending_querybuf = sdscatlen(c->pending_querybuf, c->querybuf+qblen,nread); }
sdsIncrLen(c
->querybuf,nread); c->lastinteraction = server.unixtime; if (c->flags & CLIENT_MASTER) c->read_reploff += nread; server.stat_net_input_bytes += nread; if (sdslen(c->querybuf) > server.client_max_querybuf_len) { sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); bytes = sdscatrepr(bytes,c->querybuf,64); serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); sdsfree(ci); sdsfree(bytes); freeClientAsync(c); return; } processInputBuffer(c); }

 

extern ssize_t read (int __fd, void *__buf, size_t __nbytes) __wur;

 

當一個客戶端通過連接應答處理器成功連接服務器之后,服務器會將客戶端套接字 AE_READABLE 事件和命令請求處理器關聯起來,當客戶端向服務器發送命令請求的時候,套接字就會產生 AE_READABLE 事件,引發命令請求處理器進行處理。

 圖5 服務器接收來自客戶端的命令請求

 

 

(3)networking.c/sendReplyToClient 函數是 Redis 的命令恢復處理器,這個處理器負責將服務器執行命令后得到的命令回復通過套接字返回給客戶端,具體時限為 unistd.h/write 函數的包裝。

void sendReplyToClient(connection *conn) {
    client *c = connGetPrivateData(conn);
    writeToClient(c,1);
}

 

extern ssize_t write (int __fd, const void *__buf, size_t __n) __wur;

當服務器又命令回復需要傳送給客戶端的時候,服務器會將客戶端套接字的 AE_WRITABLE 事件和命令回復處理器關聯起來,當客戶端准備好接受服務器傳回的命令回復時,就會產生 AE_WRITABLE 事件,引發命令回復處理器執行,並執行相應套接字的寫入操作。

 圖6 服務器向客戶端發送命令回復

 

 圖7 服務器與客戶端之間的通信過程

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM