pika的線程模型有官方的wiki介紹https://github.com/Qihoo360/pika/wiki/pika-%E7%BA%BF%E7%A8%8B%E6%A8%A1%E5%9E%8B,這里主要介紹了pika都有哪些線程,這些線程用來干嘛。本篇文章主要涉及監聽線程DispatchThread、IO工作線程WorkerThread和工作線程池ThreadPool,結合代碼介紹里面實現的一些細節。
- 1.監聽線程DispatchThread
在創建PikaServer的時候,會構造一個PikaDispatchThread,這個PikaDispatchThread,實際上是用了pink網絡庫的DispatchThread::DispatchThread
DispatchThread構造函數里面會初始化好若干個WorkerThread
DispatchThread繼承自ServerThread,ServerThread繼承了Thread,線程啟動時實際上運行的是子類的ThreadMain方法,繼承了Thread類的子類需要有自己的ThreadMain,監聽線程start的時候,入口是ServerThread::ThreadMain()。線程啟動會先ServerThread::InitHandle(),綁定和監聽端口,下面看看ServerThread::ThreadMain()里面做了啥。
ServerThread::ThreadMain()主要邏輯是一個epoll,當有新的連接事件來的時候,accept,然后調用DispatchThread::HandleNewConn來處理這個新的連接
DispatchThread::HandleNewConn如何處理連接呢?實際上監聽線程會把連接分發給IO工作線程WorkerThread來處理。每個WorkerThread都有一個PinkEpoll,PinkEpoll有一個notify_queue_,新的連接會以PinkItem的形式push到這個隊列里面,然后通知WorkerThread來處理。分發的方式類似輪訓,會按順序分發給notify_queue_沒有滿的WorkerThread。
那么監聽線程如何通知WorkerThread來處理新的連接呢?使用的是管道的方式,PinkEpoll會創建一個管道用來通知,並且把這個管道加到Epoll里面。在確定好要分發的WorkerThread后,往這個WorkerThread的管道寫進去一個1字節的內容,來觸發這個管道的讀事件。
- 2.IO工作線程WorkerThread
DispatchThread::StartThread的時候會起WorkerThread線程,WorkerThread也是繼承了Thread,因此工作線程的入口是WorkerThread::ThreadMain。上文說到監聽線程把新的連接放到WorkerThread的隊列里面后,通知了WorkerThread進行處理。下面我們看看WorkerThread怎么處理的。
WorkerThread同樣是一個Epoll,這里會處理新連接請求事件和已連接請求的事件,如果Epoll返回的fd是notify_receive_fd,即管道的接收fd,說明是內部的通知事件,一次性讀取多個字節的內容,因為前面已知每個通知是1個字節,因此這里讀到了多少個字節就說明有多少個通知,然后在一個循環里面處理這些請求。類型為kNotiConnect則是新的連接,這里會把監聽線程push的PinkItem取出來,然后創建一個NewPinkConn,加到conns_里面,並且把這個fd加到WorkerThread的epoll,后續的消息事件就可以在這個epoll被處理。這里conn_factory用的是ClientConnFactory,返回的是PikaClientConn,繼承了pink::RedisConn。
連接綁定到WorkerThread后,已建立連接的客戶端發送請求過來,則是走的下面的分支,根據fd在conns_里面找到PinkConn,我們先只看讀請求部分,回響應部分后面再看。
在conns_里面找到的是對應fd的PikaClientConn,使用RedisConn::GetRequest來讀取客戶端的的請求,此處有一個細節,如果read_status為kReadAll,則一次完整的請求被讀取,會先把這個請求fd的讀寫事件給刪除。這是為啥呢?刪除了不是后續就處理不了這個請求的讀寫嗎,這個我們后面講到了再說明。
RedisConn::GetRequest里面,使用RedisParser::ProcessRequestBuffer來解析讀取到的內容,然后有2種處理方式,DealMessage和Complete
先看下這兩個函數的初始化,DealMessage對應着ParserDealMessageCb,Complete對應着ParserCompleteCb
我們看這兩個方法,原來一個是同步處理,一個是異步,同步的話就是一個個命令調用DealMessage來處理,異步的話是解析完合成一組命令統一調Complete處理。異步的處理方式是將請求的命令提交給線程池來處理PikaClientConn::AsynProcessRedisCmds,怎么提交的我們在工作線程池里面介紹。
- 3.工作線程池ThreadPool
PikaServer構造的時候會創建一個PikaClientProcessor,PikaClientProcessor里面有一個ThreadPool,ThreadPool啟動時會創建Worker線程,Worker線程實際的處理函數是ThreadPool::runInThread()
前面講到WorkerThread解析完redis命令后會把命令提交給ThreadPool來處理,實際上是調用了線程池的ThreadPool::Schedule方法,Schedule需要一個TaskFunc來真正處理命令,這里使用的是DoBackgroundTask
ThreadPool::Schedule里面,把參數封裝成Task,然后push到線程池的任務隊列,接着通知線程池處理,這里WorkerThread是生產者,線程池是消費者。
而線程池的工作線程,則是不斷地在隊列里面取出Task進行處理。
- 4.命令處理和響應流程
線程池里面實際處理命令的是DoBackgroundTask,我們先來看看命令是怎么被處理的。DoBackgroundTask里面調用的是PikaClientConn::BatchExecRedisCmd
BatchExecRedisCmd里面是命令一個一個取出來ExecRedisCmd,然后PikaClientConn::DoCmd,響應消息先塞到resp_array,在TryWriteResp里面又把響應一個個取出來塞到response_里,並且把is_reply_置為true,然后做了一個NotifyEpoll的操作。
可以看到,這里把處理結果又封裝成一個PinkItem,然后和前面介紹的監聽線程把連接請求分發給WorkerThread一樣,把PinkItem放到PinkEpoll的隊列里面,然后通過在管道里面寫了一個字節的字符觸發epoll的讀事件。所以我們回過頭來看看WorkerThread的處理WorkerThread::ThreadMain
這里的流程和前面介紹的差不多,可以看到這里把這個連接的fd的讀寫事件重新加到epoll里面,前面我們留了一個疑問,在一次命令讀取結束后,把連接fd的讀寫事件從epoll里面刪除了,這是為啥呢?這里我們看到命令處理結束后又把讀寫事件加回來了。應該是因為pika用的是異步處理,一個連接的命令是異步地交給線程池處理,如果同個連接發了2個命令,因為是異步處理,沒有辦法保證2個命令滿足FIFO,即先來的命令需要先回復,后來的命令后回復,redis是單線程模型,因此天然滿足,pika是多線程異步處理,所以這里在讀取了第一個命令后,把連接的讀寫事件刪除了,等前一個命令處理完了才加回來,讀取第二個命令來處理。
連接的fd加進epoll后,fd可寫了,那么epoll會返回可寫事件,用RedisConn::SendReply來發送響應給客戶端,如果寫完了會把fd的寫事件給刪掉,如果沒寫完,則等fd可寫了會繼續觸發寫事件來寫回復。
- 5.總結
通過上面的分析可以知道,監聽線程是用來監聽新的連接,連接來了會交由WorkerThread處理,已建立連接的請求會由WorkerThread封裝成Task交給線程池ThreadPool處理,ThreadPool處理完了后,還是由WorkerThread來回復。WorkerThread就是做接收消息,回復消息的,而ThreadPool只是處理消息,不涉及接收和回復的IO操作。這3者的關系大概如下圖所示: