Redis服務器是一個事件驅動程序,服務器需要處理以下兩類事件:
- 文件事件(file event):Redis服務器通過套接字與客戶端(或者其他Redis服務器)進行連接,而文件事件就是服務器對套接字操作的抽象。服務器與客戶端(或者其他服務器)的通信會產生相應的文件事件,而服務器則通過監聽並處理這些事件來完成一系列網絡通信操作;
- 時間事件(time event):Redis服務器中的一些操作(比如serverCron函數)需要在給定的時間點執行,而時間事件就是服務器對這類定時操作的抽象。
文件事件
文件事件處理器
- 文件事件處理器使用I/O多路復用(multiplexing)程序來同時監聽多個套接字,並根據套接字目前執行的任務來為套接字關聯不同的事件處理器;
- 當被監聽的套接字准備好執行連接應答(accept)、讀取(read)、寫入(write)、關閉(close)等操作時,與操作相對應的文件事件就會產生,這時文件事件處理器就會調用套接字之前關聯好的事件處理器來處理這些事件。
組成
文件事件處理器的四個組成部分,它們分別是套接字、I/O多路復用程序、文件事件分派器(dispatcher),以及事件處理器。
文件事件處理器的四個組成部分
文件事件是對套接字操作的抽象,每當一個套接字准備好執行連接應答(accept)、寫入、讀取、關閉等操作時,就會產生一個文件事件。因為一個服務器通常會連接多個套接字,所以多個文件事件有可能會並發地出現。
I/O多路復用程序負責監聽多個套接字,並向文件事件分派器傳送那些產生了事件的套接字。
盡管多個文件事件可能會並發地出現,但I/O多路復用程序總是會將所有產生事件的套接字都放到一個隊列里面,然后通過這個隊列,以有序(sequentially)、同步(synchronously)、每次一個套接字的方式向文件事件分派器傳送套接字。
文件事件分派器接收I/O多路復用程序傳來的套接字,並根據套接字產生的事件的類型,調用相應的事件處理器;
事件處理器是一個個函數,它們定義了某個事件發生時,服務器應該執行的動作。
I/O多路復用程序的實現
Redis的I/O多路復用程序的所有功能都是通過包裝常見的select、epoll、evport和kqueue這些I/O多路復用函數庫來實現的,每個I/O多路復用函數庫在Redis源碼中都對應一個單獨的文件,比如ae_select.c、ae_epoll.c、ae_kqueue.c,諸如此類。
因為Redis為每個I/O多路復用函數庫都實現了相同的API,所以I/O多路復用程序的底層實現是可以互換,如圖:
Redis在I/O多路復用程序的實現源碼中用#include宏定義了相應的規則,程序會在編譯時自動選擇系統中性能最高的I/O多路復用函數庫來作為Redis的I/O多路復用程序的底層實現:
# ifdef HAVE_EVPORT # include "ae_evport.c" # else # ifdef HAVE_EPOLL # include "ae_epoll.c" # else # ifdef HAVE_KQUEUE # include "ae_kqueue.c" # else # include "ae_select.c" # endif # endif # endif
事件的類型
I/O多路復用程序可以監聽多個套接字的ae.h/AE_READABLE事件和ae.h/AE_WRITABLE事件,這兩類事件和套接字操作之間的對應關系如下:
- 當套接字變得可讀時(客戶端對套接字執行write操作,或者執行close操作),或者有新的可應答(acceptable)套接字出現時(客戶端對服務器的監聽套接字執行connect操作),套接字產生AE_READABLE事件;
- 當套接字變得可寫時(客戶端對套接字執行read操作),套接字產生AE_WRITABLE事件。
I/O多路復用程序允許服務器同時監聽套接字的AE_READABLE事件和AE_WRITABLE事件,如果一個套接字同時產生了這兩種事件,那么文件事件分派器會優先處理AE_READABLE事件,等到AE_READABLE事件處理完之后,才處理AE_WRITABLE事件。
API
- ae.c/aeCreateFileEvent函數接受一個套接字描述符、一個事件類型,以及一個事件處理器作為參數,將給定套接字的給定事件加入到I/O多路復用程序的監聽范圍之內,並對事件和事件處理器進行關聯;
- ae.c/aeDeleteFileEvent函數接受一個套接字描述符和一個監聽事件類型作為參數,讓I/O多路復用程序取消對給定套接字的給定事件的監聽,並取消事件和事件處理器之間的關聯;
- ae.c/aeGetFileEvents函數接受一個套接字描述符,返回該套接字正在被監聽的事件類型:
- 如果套接字沒有任何事件被監聽,那么函數返回AE_NONE;
- 如果套接字的讀事件正在被監聽,那么函數返回AE_READABLE;
- 如果套接字的寫事件正在被監聽,那么函數返回AE_WRITABLE;
- 如果套接字的讀事件和寫事件正在被監聽,那么函數返回AE_READABLE|AE_WRITABLE。
- ae.c/aeWait函數接受一個套接字描述符、一個事件類型和一個毫秒數為參數,在給定的時間內阻塞並等待套接字的給定類型事件產生,當事件成功產生,或者等待超時之后,函數返回。
- ae.c/aeApiPoll函數接受一個sys/time.h/struct timeval結構為參數,並在指定的時間內,阻塞並等待所有被aeCreateFileEvent函數設置為監聽狀態的套接字產生文件事件,當有至少一個事件產生,或者等待超時后,函數返回;
- ae.c/aeProcessEvents函數是文件事件分派器,它先調用aeApiPoll函數來等待事件產生,然后遍歷所有已產生的事件,並調用相應的事件處理器來處理這些事件;
- ae.c/aeGetApiName函數返回I/O多路復用程序底層所使用的I/O多路復用函數庫的名稱:返回"select"表示底層為select函數庫,諸如此類。
文件事件的處理器
連接應答處理器
networking.c/acceptTcpHandler函數是Redis的連接應答處理器,這個處理器用於對連接服務器監聽套接字的客戶端進行應答,其主要調用anet.c中anetTcpAccept函數實現,具體實現為sys/socket.h/accept函數的包裝。
當Redis服務器進行初始化的時候,程序會將這個連接應答處理器和服務器監聽套接字的AE_READABLE事件關聯起來,當有客戶端用sys/socket.h/connect函數連接服務器監聽套接字的時候,套接字就會產生AE_READABLE事件,引發連接應答處理器執行,並執行相應的套接字應答操作。
命令請求處理器
networking.c/readQueryFromClient函數是Redis的命令請求處理器,這個處理器負責從套接字中讀入客戶端發送的命令請求內容,具體實現為unistd.h/read函數的包裝。
當一個客戶端通過連接應答處理器成功連接到服務器之后,服務器會將客戶端套接字的AE_READABLE事件和命令請求處理器關聯起來,當客戶端向服務器發送命令請求的時候,套接字就會產生AE_READABLE事件,引發命令請求處理器執行,並執行相應的套接字讀入操作;
在客戶端連接服務器的整個過程中,服務器都會一直為客戶端套接字的AE_READABLE事件關聯命令請求處理器。
命令回復處理器
networking.c/sendReplyToClient函數是Redis的命令回復處理器,這個處理器負責將服務器執行命令后得到的命令回復通過套接字返回給客戶端,具體實現為unistd.h/write函數的包裝。
當服務器有命令回復需要傳送給客戶端的時候,服務器會將客戶端套接字的AE_WRITABLE事件和命令回復處理器關聯起來,當客戶端准備好接收服務器傳回的命令回復時,就會產生AE_WRITABLE事件,引發命令回復處理器執行,並執行相應的套接字寫入操作。
時間事件
時間事件分為以下兩類:
- 定時事件:讓一段程序在指定的時間之后執行一次。比如說,讓程序X在當前時間的30毫秒之后執行一次;
- 周期性事件:讓一段程序每隔指定時間就執行一次。比如說,讓程序Y每隔30毫秒就執行一次。
時間事件主要由以下三個屬性組成:
- id:服務器為時間事件創建的全局唯一ID(標識號)。ID號按從小到大的順序遞增,新事件的ID號比舊事件的ID號要大;
- when:毫秒精度的UNIX時間戳,記錄了時間事件的到達(arrive)時間;
- timeProc:時間事件處理器,一個函數。當時間事件到達時,服務器就會調用相應的處理器來處理事件。
一個時間事件是定時事件還是周期性事件取決於時間事件處理器的返回值:
- 如果事件處理器返回ae.h/AE_NOMORE,那么這個事件為定時事件:該事件在達到一次之后就會被刪除,之后不再到達;
- 如果事件處理器返回一個非AE_NOMORE的整數值,那么這個事件為周期性時間:當一個時間事件到達之后,服務器會根據事件處理器返回的值,對時間事件的when屬性進行更新,讓這個事件在一段時間之后再次到達,並以這種方式一直更新並運行下去。比如說,如果一個時間事件的處理器返回整數值30,那么服務器應該對這個時間事件進行更新,讓這個事件在30毫秒之后再次到達。(現在的Redis主要使用這個)
API
ae.c/aeCreateTimeEvent函數接受一個毫秒數milliseconds和一個時間事件處理器proc作為參數,將一個新的時間事件添加到服務器,這個新的時間事件將在當前時間的milliseconds毫秒之后到達,而事件的處理器為proc。
ae.c/aeDeleteFileEvent函數接受一個時間事件ID作為參數,然后從服務器中刪除該ID所對應的時間事件;
ae.c/aeSearchNearestTimer函數返回到達時間距離當前時間最接近的那個時間事件;
ae.c/processTimeEvents函數是時間事件的執行器,這個函數會遍歷所有已到達的時間事件,並調用這些事件的處理器。已到達指的是,時間事件的when屬性記錄的UNIX時間戳等於或小於當前時間的UNIX時間戳。
processTimeEvents函數的定義可以用以下偽代碼來描述:
def processTimeEvents(): # 遍歷服務器中的所有時間事件 for time_event in all_time_event(): # 檢查事件是否已經到達 if time_event.when <= unix_ts_now(): # 事件已到達 # 執行事件處理器,並獲取返回值 retval = time_event.timeProc() # 如果這是一個定時事件 if retval == AE_NOMORE: # 那么將該事件從服務器中刪除 delete_time_event_from_server(time_event) # 如果這是一個周期性事件 else: # 那么按照事件處理器的返回值更新時間事件的 when 屬性 # 讓這個事件在指定的時間之后再次到達 update_when(time_event, retval)
時間事件應用實例:serverCron函數
持續運行的Redis服務器需要定期對自身的資源和狀態進行檢查和調整,從而確保服務器可以長期、穩定地運行,這些定期操作由redis.c/serverCron函數負責執行,它的主要工作包括:
- 更新服務器的各類統計信息,比如時間、內存占用、數據庫占用情況等;
- 清理數據庫中的過期鍵值對;
- 關閉和清理連接失效的客戶端;
- 嘗試進行AOF或RDB持久化操作;
- 如果服務器是主服務器,那么對從服務器進行定期同步;
- 如果處於集群模式,對集群進行定期同步和連接測試;
事件的調度與執行
事件的調度和執行由ae.c/aeProcessEvents函數負責。可以用一下源代碼完成:
def aeProcessEvents(): # 獲取到達時間離當前時間最接近的時間事件 time_event = aeSearchNearestTimer() # 計算最接近的時間事件距離到達還有多少毫秒 remaind_ms = time_event.when - unix_ts_now() # 如果事件已到達,那么remaind_ms 的值可能為負數,將它設定為0 if remaind_ms < 0: remaind_ms = 0 # 根據remaind_ms 的值,創建timeval 結構 timeval = create_timeval_with_ms(remaind_ms) # 阻塞並等待文件事件產生,最大阻塞時間由傳入的timeval 結構決定 # 如果remaind_ms 的值為0 ,那么aeApiPoll 調用之后馬上返回,不阻塞 aeApiPoll(timeval) # 處理所有已產生的文件事件(其實並沒有這個函數) processFileEvents() # 處理所有已到達的時間事件 processTimeEvents()