前言
我們都知道redis 6.x引入了多線程技術,之前都是單線程。那么為什么要引入多線程呢,這個多線程在哪里應用的,單線程和多線程應用的場景又是什么呢?
提問
- redis單線程的實現流程是怎樣的?
- redis哪些地方用到了多線程,哪些地方是單線程?
- redis多線程是怎么實現的?
- redis多線程是怎么做到無鎖的?
redis單線程的實現流程是怎樣的?
Redis一開始是單線程模型,在一個線程中要同時處理兩種事件:文件事件和時間事件。
文件事件:主要是網絡I/O的讀寫,請求的接收和回復
時間事件:單次/多次執行的定時器,如主從復制、定時刪除過期數據、字典rehash等
redis所有核心功能都是跑在主線程中的,像aof文件落盤操作是在子線程中執行的,那么在高並發情況下它是怎么做到高性能的呢?
由於這兩種事件在同一個線程中執行,就會出現互相影響的問題,如時間事件到了還在等待/執行文件事件,或者文件事件已經就緒卻在執行時間事件,這就是單線程的缺點,所以在實現上要將這些影響降到最低。那么redis是怎么實現的呢?
定時執行的時間事件保存在一個鏈表中,由於鏈表中任務沒有按照執行時間排序,所以每次需要掃描單鏈表,找到最近需要執行的任務,時間復雜度是O(N),redis敢這么實現就是因為這個鏈表很短,大部分定時任務都是在serverCron方法中被調用。從現在開始到最近需要執行的任務的開始時間,時長定位T,這段時間就是屬於文件事件的處理時間,以epoll為例,執行epoll_wait最多等待的時長為T,如果有就緒任務epoll會返回所有就緒的網絡任務,存在一個數組中,這時我們知道了所有就緒的socket和對應的事件(讀、寫、錯誤、掛斷),然后就可以接收數據,解析,執行對應的命令函數。
如果最近要執行的定時任務時間已經過了,那么epoll就不會阻塞,直接返回已經就緒的網絡事件,即不等待。
總之單線程,定時事件和網絡事件還是會互相影響的,正在處理定時事件網絡任務來了,正在處理網絡事件定時任務的時間到了。所以redis必須保證每個任務的處理時間不能太長。
redis處理流程如下:
- 服務啟動,開始網絡端口監聽,等待客戶端請求
- 客戶端想服務端發起連接請求,創建客戶端連接對象,完成連接
- 將socket信息注冊到epoll,設置超時時間為時間事件的周期時長,等待客戶端發起請求
- 客戶端發起操作數據庫請求(如GET)
- epoll收到客戶端的請求,可能多個,按照順序處理請求
- 接收請求參數,接收完成后解析請求協議,得到請求命令
- 執行請求命令,即操作redis數據庫
- 將結果返回給客戶端
redis哪些地方用到了多線程,哪些地方是單線程?
從上圖中可以看出只有以下3個地方用的是多線程,其他地方都是單線程:
- 接收請求參數
- 解析請求參數
- 請求響應,即將結果返回給client
很明顯以上3點各個請求都是互相獨立互不影響的,很適合用多線程,特別是請求體/響應體很大的時候,更能體現多線程的威力。而操作數據庫是請求之間共享的,如果使用多線程的話適合讀寫鎖。而操作數據庫本身是很快的(就是對map的增刪改查),單線程不一定就比多線程慢,當然也有可能是作者偷懶,懶得實現罷了,但這次的多線程模型還是值得我們學習一下的。
redis多線程是怎么實現的?
先大致說一下多線程的流程:
- 服務器啟動時啟動一定數量線程,服務啟動的時候可以指定線程數,每個線程對應一個隊列(list *io_threads_list[128]),最多128個線程。
- 服務器收到的每個請求都會放入全局讀隊列clients_pending_read,同時將隊列中的元素分發到每個線程對應的隊列io_threads_list中,這些工作都是在主線程中執行的。
- 每個線程(包括主線程和子線程)接收請求參數並做解析,完事后在client中設置一個標記CLIENT_PENDING_READ,標識參數解析完成,可以操作數據庫了。(主線程和子線程都會執行這個步驟)
- 主線程遍歷隊列clients_pending_read,發現設有CLIENT_PENDING_READ標記的,就操作數據庫
- 操作完數據庫就是響應client了,響應是一組函數addReplyXXX,在client中設置標記CLIENT_PENDING_WRITE,同時將client加入全局寫隊列clients_pending_write
- 主線程將全局隊列clients_pending_write以輪訓的方式將任務分發到每個線程對應的隊列io_threads_list
- 所有線程將遍歷自己的隊列io_threads_list,將結果發送給client
redis多線程是怎么做到無鎖的?
上面說了多線程的地方都是互相獨立互不影響的。但是每個線程的隊列就存在兩個兩個線程訪問的情況:主線程向隊列中寫數據,子線程消費,redis的實現有點反直覺。按正常思路來說,主線程在往隊列中寫數據的時候加鎖;子線程復制隊列&並將隊列清空,這個兩個動作是加鎖的,子線程消費復制后的隊列,這個過程是不需要加鎖的,按理來說主線程和子線程的加鎖動作都是非常快的。但是redis並沒有這么實現,那么他是怎么實現的呢?
redis多線程的模型是主線程負責搜集任務,放入全局讀隊列clients_pending_read和全局寫隊列clients_pending_write,主線程在將隊列中的任務以輪訓的方式分發到每個線程對應的隊列(list *io_threads_list[128])
- 一開始子線程的隊列都是空,主線程將全對隊列中的任務分發到每個線程的隊列,並設置一個隊列有數據的標記(_Atomic unsigned long io_threads_pending[128]),io_threads_pending[1]=5表示第一個線程的隊列中有5個元素
- 子線程死循環輪訓檢查io_threads_pending[index] > 0,有數據就開始處理,處理完成之后將io_threads_pending[index] = 0,沒數據繼續檢查
- 主線程將任務分發到子線程的隊列中,自己處理自己隊列中的任務,處理完成后,等待所有子線程處理完所有任務,繼續收集任務到全局隊列,在將任務分發給子線程,這樣就避免了主線程和子線程同時訪問隊列的情況,主線程向隊列寫的時候子線程還沒開始消費,子線程在消費的時候主線程在等待子線程消費完,子線程消費完后主線程才會往隊列中繼續寫,就必須加鎖了。因為任務是平均分配到每個隊列的,所以每個隊列的處理時間是接近的,等待的時間會很短。
源碼執行流程
為了方便你看源碼,這里加上一些代碼的執行流程啟動socket監聽,注冊連接處理函數,連接成功后創建連接對象connection,創建client對象,通過aeCreateFileEvent注冊client的讀事件
main -> initServer -> acceptTcpHandler -> anetTcpAccept -> anetGenericAccept -> accept(獲取到socket連接句柄) connCreateAcceptedSocket -> connCreateSocket -> 創建一個connection對象 acceptCommonHandler -> createClient創建client連接對象 -> connSetReadHandler -> aeCreateFileEvent -> readQueryFromClient main -> aeMain -> aeProcessEvents -> aeApiPoll(獲取可讀寫的socket) -> readQueryFromClient(如果可讀) -> processInputBuffer -> processCommandAndResetClient(多線程下這個方法在當前流程下不會執行,而由主線程執行)
在多線程模式下,readQueryFromClient會將client信息加入server.clients_pending_read隊列,listAddNodeHead(server.clients_pending_read,c);
主線程會將server.clients_pending_read中的數據分發到子線程的隊列(io_threads_list)中,子線程會調用readQueryFromClient就行參數解析,主線程分發完任務后,會執行具體的操作數據庫的命令,這塊是單線程
如果參數解析完成會在client->flags中加一個標記CLIENT_PENDING_COMMAND,在主線程中先判斷client->flags & CLIENT_PENDING_COMMAND > 0,說明參數解析完成,才會調用processCommandAndResetClient,之前還擔心如果子線程還在做參數解析,主線程就開始執行命令難道不會有問題嗎?現在一切都清楚了
main -> aeMain -> aeProcessEvents -> beforeSleep -> handleClientsWithPendingReadsUsingThreads -> processCommandAndResetClient -> processCommand -> call
讀是多次讀:socket讀緩沖區有數據,epoll就會一直觸發讀事件,所以讀可能是多次的
寫是一次寫:往socket寫數據是在子線程中執行的,直接循環直到數據寫完位置,就算某個線程阻塞了,也不會像單線程那樣導致所有任務都阻塞
執行完相關命令后,就是將結果返回給client,回復client是一組函數,我們以addReply為例,說一下執行流程,執行addReply還是單線程的,將client信息插入全局隊列server.clients_pending_write。 addReply -> prepareClientToWrite -> clientInstallWriteHandler -> listAddNodeHead(server.clients_pending_write,c) 在主線程中將server.clients_pending_write中的數據以輪訓的方式分發到多個子線程中 beforeSleep -> handleClientsWithPendingWritesUsingThreads -> 將server.clients_pending_write中的數據以輪訓的方式分發到多個線程的隊列中io_threads_list list *io_threads_list[IO_THREADS_MAX_NUM];是數組雙向鏈表,一個線程對應其中一個隊列 子線程將client中的數據發給客戶端,所以是多線程 server.c -> main -> initThreadedIO(啟動一定數量的線程) -> IOThreadMain(線程執行的方法) -> writeToClient -> connWrite -> connSocketWrite
網絡操作對應的一些方法,所有connection對象的type字段都是指向CT_Socket
ConnectionType CT_Socket = { .ae_handler = connSocketEventHandler, .close = connSocketClose, .write = connSocketWrite, .read = connSocketRead, .accept = connSocketAccept, .connect = connSocketConnect, .set_write_handler = connSocketSetWriteHandler, .set_read_handler = connSocketSetReadHandler, .get_last_error = connSocketGetLastError, .blocking_connect = connSocketBlockingConnect, .sync_write = connSocketSyncWrite, .sync_read = connSocketSyncRead, .sync_readline = connSocketSyncReadLine };