這篇文章給大家分享的內容是關於Swoft 源碼剖析之Swoole和Swoft的一些介紹(Task投遞/定時任務篇),有一定的參考價值,有需要的朋友可以參考一下。
前言
Swoft的任務功能基於Swoole的Task機制,或者說Swoft的Task機制本質就是對Swoole的Task機制的封裝和加強。
任務投遞
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
|
任務投遞Task::deliver()將調用參數打包后根據$type參數通過Swoole的$server->taskCo()或$server->task()接口投遞到Task進程。Task本身始終是同步執行的,$type僅僅影響投遞這一操作的行為,Task::TYPE_ASYNC對應的$server->task()是異步投遞,Task::deliver()調用后馬上返回;Task::TYPE_CO對應的$server->taskCo()是協程投遞,投遞后讓出協程控制,任務完成或執行超時后Task::deliver()才從協程返回。
任務執行
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
|
此處是swoole.onTask的事件回調,其職責僅僅是將將Worker進程投遞來的打包后的數據轉發給TaskExecutor。
Swoole的Task機制的本質是Worker進程將耗時任務投遞給同步的Task進程(又名TaskWorker)處理,所以swoole.onTask的事件回調是在Task進程中執行的。上文說過,Worker進程是你大部分HTTP服務代碼執行的環境,但是從TaskEventListener.onTask()方法開始,代碼的執行環境都是Task進程,也就是說,TaskExecutor和具體的TaskBean都是執行在Task進程中的。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
|
任務執行思路很簡單,將Worker進程發過來的數據解包還原成原來的調用參數,根據$name參數找到對應的TaskBean並調用其對應的task()方法。其中TaskBean使用類級別注解@Task(name="TaskName")或者@Task("TaskName")聲明。
值得一提的一點是,@Task注解除了name屬性,還有一個coroutine屬性,上述代碼會根據該參數選擇使用協程的runCoTask()或者同步的runSyncTask()執行Task。但是由於而且由於Swoole的Task進程的執行是完全同步的,不支持協程,所以目前版本請該參數不要配置為true。同樣的在TaskBean中編寫的任務代碼必須的同步阻塞的或者是要能根據環境自動將異步非阻塞和協程降級為同步阻塞的
從Process中投遞任務
前面我們提到:
Swoole的Task機制的本質是Worker進程將耗時任務投遞給同步的Task進程(又名TaskWorker)處理。
換句話說,Swoole的$server->taskCo()或$server->task()都只能在Worker進程中使用。
這個限制大大的限制了使用場景。 如何能夠為了能夠在Process中投遞任務呢?Swoft為了繞過這個限制提供了Task::deliverByProcess()方法。其實現原理也很簡單,通過Swoole的$server->sendMessage()方法將調用信息從Process中投遞到Worker進程中,然后由Worker進程替其投遞到Task進程當中,相關代碼如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
|
數據打包后使用$server->sendMessage()投遞給Worker:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
|
$server->sendMessage后,Worker進程收到數據時會觸發一個swoole.pipeMessage事件的回調,Swoft會將其轉換成自己的swoft.pipeMessage事件並觸發.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
|
swoft.pipeMessage事件最終由PipeMessageListener處理。在相關的監聽其中,如果發現swoft.pipeMessage事件由Task::deliverByProcess()產生的,Worker進程會替其執行一次Task::deliver(),最終將任務數據投遞到TaskWorker進程中。
一道簡單的回顧練習:從Task::deliverByProcess()到某TaskBean 最終執行任務,經歷了哪些進程,而調用鏈的哪些部分又分別是在哪些進程中執行?
從Command進程或其子進程中投遞任務
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
|
對於Command進程的任務投遞,情況會更復雜一點。
上文提到的Process,其往往衍生於Http/Rpc服務,作為同一個Manager的子孫進程,他們能夠拿到Swoole\Server的句柄變量,從而通過$server->sendMessage(),$server->task()等方法進行任務投遞。
但在Swoft的體系中,還有一個十分路人的角色: Command。Command的進程從shell或cronb獨立啟動,和Http/Rpc服務相關的進程沒有親緣關系。因此Command進程以及從Command中啟動的Process進程是沒有辦法拿到Swoole\Server的調用句柄直接通過UnixSocket進行任務投遞的。
為了為這種進程提供任務投遞支持,Swoft利用了Swoole的Task進程的一個特殊功能----消息隊列。

同一個項目中Command和Http\RpcServer 通過約定一個message_queue_key獲取到系統內核中的同一條消息隊列,然后Comand進程就可以通過該消息隊列向Task進程投遞任務了。
該機制沒有提供對外的公開方法,僅僅被包含在Task::deliver()方法中,Swoft會根據當前環境隱式切換投遞方式。但該消息隊列的實現依賴Semaphore拓展,如果你想使用,需要在編譯PHP時加上--enable-sysvmsg參數。
定時任務
除了手動執行的普通任務,Swoft還提供了精度為秒的定時任務功能用來在項目中替代Linux的Crontab功能.
Swoft用兩個前置Process---任務計划進程:CronTimerProcess和任務執行進程CronExecProcess
,和兩張內存數據表-----RunTimeTable(任務(配置)表)OriginTable((任務)執行表)用於定時任務的管理調度。
兩張表的每行記錄的結構如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
|
此處為何要使用Swoole的內存Table?
Swoft的的定時任務管理是分別由 任務計划進程 和 任務執行進程 進程負責的。兩個進程的運行共同管理定時任務,如果使用進程間獨立的array()等結構,兩個進程必然需要頻繁的進程間通信。而使用跨進程的Table(本文的Table,除非特別說明,都指Swoole的Swoole\Table結構)直接進行進程間數據共享,不僅性能高,操作簡單 還解耦了兩個進程。
為了Table能夠在兩個進程間共同使用,Table必須在Swoole Server啟動前創建並分配內存。具體代碼在Swoft\Task\Bootstrap\Listeners->onBeforeStart()中,比較簡單,有興趣的可以自行閱讀。
背景介紹完了,我們來看看這兩個定時任務進程的行為
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
|
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
|
CronTimerProcess是Swoft的定時任務調度進程,其核心方法是Crontab->initRunTimeTableData()。
該進程使用了Swoole的定時器功能,通過Swoole\Timer在每分鍾首秒時執行的回調,CronTimerProcess每次被喚醒后都會遍歷任務表計算出當前這一分鍾內的60秒分別需要執行的任務清單,寫入執行表並標記為 未執行。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
|
CronExecProcess作為定時任務的執行者,通過Swoole\Timer每0.5s喚醒自身一次,然后把 執行表 遍歷一次,挑選當下需要執行的任務,通過sendMessage()投遞出去並更新該 任務執行表中的狀態。
該執行進程只負責任務的投遞,任務的實際實際執行仍然在Task進程中由TaskExecutor處理。
定時任務的宏觀執行情況如下:

明確的學習思路能更高效的學習

