這篇文章給大家分享的內容是關於Swoft 源碼剖析之Swoole和Swoft的一些介紹(Task投遞/定時任務篇),有一定的參考價值,有需要的朋友可以參考一下。
前言
Swoft
的任務功能基於Swoole
的Task機制
,或者說Swoft
的Task
機制本質就是對Swoole
的Task機制
的封裝和加強。
任務投遞
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
|
任務投遞Task::deliver()
將調用參數打包后根據$type
參數通過Swoole
的$server->taskCo()
或$server->task()
接口投遞到Task進程
。Task
本身始終是同步執行的,$type
僅僅影響投遞這一操作的行為,Task::TYPE_ASYNC
對應的$server->task()
是異步投遞,Task::deliver()
調用后馬上返回;Task::TYPE_CO
對應的$server->taskCo()
是協程投遞,投遞后讓出協程控制,任務完成或執行超時后Task::deliver()
才從協程返回。
任務執行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
|
此處是swoole.onTask
的事件回調,其職責僅僅是將將Worker
進程投遞來的打包后的數據轉發給TaskExecutor
。
Swoole
的Task
機制的本質是Worker進程
將耗時任務投遞給同步的Task進程
(又名TaskWorker
)處理,所以swoole.onTask
的事件回調是在Task進程
中執行的。上文說過,Worker進程
是你大部分HTTP
服務代碼執行的環境,但是從TaskEventListener.onTask()
方法開始,代碼的執行環境都是Task進程
,也就是說,TaskExecutor
和具體的TaskBean
都是執行在Task進程
中的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
|
任務執行思路很簡單,將Worker進程
發過來的數據解包還原成原來的調用參數,根據$name
參數找到對應的TaskBean
並調用其對應的task()
方法。其中TaskBean
使用類級別注解@Task(name="TaskName")
或者@Task("TaskName")
聲明。
值得一提的一點是,@Task
注解除了name
屬性,還有一個coroutine
屬性,上述代碼會根據該參數選擇使用協程的runCoTask()
或者同步的runSyncTask()
執行Task
。但是由於而且由於Swoole
的Task進程
的執行是完全同步的,不支持協程,所以目前版本請該參數不要配置為true
。同樣的在TaskBean
中編寫的任務代碼必須的同步阻塞的或者是要能根據環境自動將異步非阻塞和協程降級為同步阻塞的
從Process中投遞任務
前面我們提到:
Swoole
的Task
機制的本質是Worker進程
將耗時任務投遞給同步的Task進程
(又名TaskWorker
)處理。
換句話說,Swoole
的$server->taskCo()
或$server->task()
都只能在Worker進程
中使用。
這個限制大大的限制了使用場景。 如何能夠為了能夠在Process
中投遞任務呢?Swoft
為了繞過這個限制提供了Task::deliverByProcess()
方法。其實現原理也很簡單,通過Swoole
的$server->sendMessage()
方法將調用信息從Process
中投遞到Worker進程
中,然后由Worker進程替其投遞到Task進程
當中,相關代碼如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
|
數據打包后使用$server->sendMessage()
投遞給Worker
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
|
$server->sendMessage
后,Worker進程
收到數據時會觸發一個swoole.pipeMessage
事件的回調,Swoft
會將其轉換成自己的swoft.pipeMessage
事件並觸發.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
|
swoft.pipeMessage
事件最終由PipeMessageListener
處理。在相關的監聽其中,如果發現swoft.pipeMessage
事件由Task::deliverByProcess()
產生的,Worker進程
會替其執行一次Task::deliver()
,最終將任務數據投遞到TaskWorker進程
中。
一道簡單的回顧練習:從Task::deliverByProcess()
到某TaskBean
最終執行任務,經歷了哪些進程,而調用鏈的哪些部分又分別是在哪些進程中執行?
從Command進程或其子進程中投遞任務
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
|
對於Command
進程的任務投遞,情況會更復雜一點。
上文提到的Process
,其往往衍生於Http/Rpc
服務,作為同一個Manager
的子孫進程,他們能夠拿到Swoole\Server
的句柄變量,從而通過$server->sendMessage()
,$server->task()
等方法進行任務投遞。
但在Swoft
的體系中,還有一個十分路人的角色: Command
。Command
的進程從shell
或cronb
獨立啟動,和Http/Rpc
服務相關的進程沒有親緣關系。因此Command
進程以及從Command
中啟動的Process
進程是沒有辦法拿到Swoole\Server
的調用句柄直接通過UnixSocket
進行任務投遞的。
為了為這種進程提供任務投遞支持,Swoft
利用了Swoole
的Task進程
的一個特殊功能----消息隊列。
同一個項目中Command
和Http\RpcServer
通過約定一個message_queue_key
獲取到系統內核中的同一條消息隊列,然后Comand
進程就可以通過該消息隊列向Task進程
投遞任務了。
該機制沒有提供對外的公開方法,僅僅被包含在Task::deliver()
方法中,Swoft
會根據當前環境隱式切換投遞方式。但該消息隊列的實現依賴Semaphore
拓展,如果你想使用,需要在編譯PHP
時加上--enable-sysvmsg
參數。
定時任務
除了手動執行的普通任務,Swoft
還提供了精度為秒的定時任務功能用來在項目中替代Linux的Crontab
功能.
Swoft
用兩個前置Process
---任務計划進程:CronTimerProcess
和任務執行進程CronExecProcess
,和兩張內存數據表-----RunTimeTable
(任務(配置)表)OriginTable
((任務)執行表)用於定時任務的管理調度。
兩張表的每行記錄的結構如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
|
此處為何要使用Swoole的內存Table?
Swoft
的的定時任務管理是分別由 任務計划進程 和 任務執行進程 進程負責的。兩個進程的運行共同管理定時任務,如果使用進程間獨立的array()
等結構,兩個進程必然需要頻繁的進程間通信。而使用跨進程的Table
(本文的Table
,除非特別說明,都指Swoole
的Swoole\Table
結構)直接進行進程間數據共享,不僅性能高,操作簡單 還解耦了兩個進程。
為了Table
能夠在兩個進程間共同使用,Table
必須在Swoole Server
啟動前創建並分配內存。具體代碼在Swoft\Task\Bootstrap\Listeners->onBeforeStart()
中,比較簡單,有興趣的可以自行閱讀。
背景介紹完了,我們來看看這兩個定時任務進程的行為
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
|
CronTimerProcess
是Swoft
的定時任務調度進程,其核心方法是Crontab->initRunTimeTableData()
。
該進程使用了Swoole
的定時器功能,通過Swoole\Timer
在每分鍾首秒時執行的回調,CronTimerProcess
每次被喚醒后都會遍歷任務表計算出當前這一分鍾內的60秒分別需要執行的任務清單,寫入執行表並標記為 未執行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
|
CronExecProcess
作為定時任務的執行者,通過Swoole\Timer
每0.5s
喚醒自身一次,然后把 執行表
遍歷一次,挑選當下需要執行的任務,通過sendMessage()
投遞出去並更新該 任務執行表中的狀態。
該執行進程只負責任務的投遞,任務的實際實際執行仍然在Task進程
中由TaskExecutor
處理。
定時任務的宏觀執行情況如下:
明確的學習思路能更高效的學習