基於RabbitMQ和Swoole實現的一個完整的異步任務系統


從最開始的使用redis實現的單進程消費的異步任務系統到加入swoole的多進程消費模式,現在,我們的異步任務系統終於又能邁進一步。

 

因為有了前面兩個簡單系統的經驗,這回基於RabbitMQ的異步任務系統設計的的更加完善,包括多進程消費,異常重試等。

系統介紹

消費端架構圖

從圖中可以看到,我們這個系統是一個基於事件的異步任務系統。就是說當一個事件產生時,生產者將事件拋給調度器,調度器負責查詢事件下有哪些任務,然后將這些任務丟到相應的隊列中,最后由消費者消費任務隊列中的任務。

在整個系統中主要分為三大部分

1.事件生產者,即產生消息事件的一方。
2.任務調度器(Scheduler),負責注冊事件並調度任務。
3.消費者(Worker),負責消費任務隊列中的任務。

事件生產者

事件生產者很簡單,在業務系統中直接調用即可,代碼如下。

<?php

require_once DIR.'/../autoload.php';

use Asynclib\Ebats\Event;

try{

    $event = new Event('order_paied');  //定義事件

    $event->setOptions(['order_id' => 'FB138020392193312']); //事件產生的參數

    $event->publish();

}catch (Exception $exc){

    echo $exc->getMessage();

}

  

任務調度器

調度器主要做兩件事,一是注冊事件,另一個是調度任務。

注冊事件代碼如下:

//注冊事件

EventManager::register('order_create', 'closeOrder', 'demo', 10);//關閉未付款訂單(延遲任務)

EventManager::register('order_paied', 'virtualShipping', 'demo'); //虛擬商品自動發貨

  

這樣就注冊了兩個事件,事件下各有一個任務。

具體調度部分代碼很簡單,就不多贅述,有興趣的可以去看代碼。

消費者

重頭戲來了,一個異步任務系統最重要的就是消費端了,現在讓我們來看下Worker的流程圖。

一個完整的消費進程

可以看到,在這里我們采用了兩個交換器和兩個隊列,一個負責處理正常的任務即ntask,另一個負責處理需要延遲執行的任務即dtask。簡單描述下一個任務的生命周期。

正常任務

1、task產生,進入正常任務的交換器Exchange[ebats_core_ntask]
2、交換器根據topic將任務分發到對應的隊列中
3、子進程ntask阻塞等待成功獲取到task,並執行該任務
4、執行失敗,需要重試時拋出RetryException,不需要重試時拋出TaskException
5、子進程ntask捕獲到重試異常將任務拋給延遲任務的交換器Exchange[ebats_core_dtask]
6、將任務執行信息回調給上層開發者以便保存查看

延遲任務

1、子進程dtask阻塞等待成功獲取到task,並執行該任務
2、執行失敗,需要重試時拋出RetryException,不需要重試時拋出TaskException
3、子進程dtask捕獲到重試異常將任務拋給延遲任務的交換器Exchange[ebats_core_dtask]
4、將任務執行信息回調給上層開發者以便保存查看

消費者代碼如下:

require_once DIR.'/../autoload.php';

require_once DIR.'/task/TaskDemoModel.php';

use Asynclib\Ebats\Worker;

 

//執行結果回調函數

$callback = function ($topic, $taskid, $taskname, $params, $timeuse, $message){

 

};

$worker = new Worker($callback);  //支持多進程消費默認為1

$worker->setQueue('demo');  //隊列名和事件的topic一一對應

$worker->run();

  

自定義調度器

一般來說這是一個基於事件的任務系統,那么能不能直接產生任務呢。答案是肯定的。

只需要創建一個自定義調度器,由您自行實現調度邏輯,最終生成一個任務即可。代碼如下:

<?php

require_once DIR.'/../autoload.php';

use Asynclib\Ebats\Task;

use Asynclib\Core\Consumer;

use Asynclib\Amq\ExchangeTypes;

use Asynclib\Exception\ExceptionInterface;

 

/** 

 * 本示例演示了如何創建一個自定義調度器,開發者可以根據自身需求開發自己的任務調度器

 */

try{

    $worker = new Consumer();

    $worker->setExchange('order_fanout', ExchangeTypes::TOPIC);

    $worker->setQueue('shzf_order_paied', ['*.*.WAIT_SELLER_SEND_GOODS']);

    $worker->run(function($key, $msg){

        $order_data = json_encode($msg);

        echo " [$key] $order_data \n";

        Task::create('demo', 'orderAsync', $msg);//創建任務,之后消息將作為參數由任務接管處理

    });

}catch (ExceptionInterface $exc){

    echo $exc->getMessage();

}

  

這樣,當接收到消息時就會產生一個orderAsync的任務,您只需要啟動一個用來消費這個Topic的Worker即可。

也許你會覺得這里直接寫業務邏輯的代碼就可以了,實際上也確實可以。當你可以忍受一個進程慢慢消費的時候是可以這樣做的。但大多數情況下我們還是希望它能夠盡快的消費掉,所以建議這里只負責創建任務,具體任務的業務邏輯由worker去執行。

以上就是基於RabbitMQ和Swoole實現的一個完整的異步任務系統的詳細內容

視頻教學:九年架構師講解php+swoole+rabbitMQ實現異步任務多進程消費

 

更多內容請訪問

怎么從一名碼農成為架構師的必看知識點:目錄大全(持續更新)50W年薪挑戰!

 我的官方群點擊此處加入群聊【PHP高級學習交流群】,一起學習,相互討論。

群內已經有管理將知識體系整理好(源碼,學習視頻等資料),歡迎加群免費領取。

這套精品PHP教程絕不是市場上的那些妖艷賤貨可比,作為web開發的佼佼者PHP並不遜色其他語言,加上Swoole后更加是如虎添翼!進軍通信 、物聯網行業開發百度地圖、百度訂單中心、虎牙、戰旗TV等!寒冬裁員期過后正是各大企業擴大招人的時期,現在市場初級程序員泛濫,進階中高級程序員絕對是各大企業急需的人才,這套學習教程適合那些1-5年以內的PHP開發者正處於瓶頸期,想要突破自己進階中高級、架構師!名額有限,先到先得!

騰訊T3-T4標准精品PHP架構師教程目錄大全,只要你看完保證薪資上升一個台階(持續更新)​

部分資料截圖:

 

 

 

 

還有限時精品福利:

★騰訊高級PHP工程師筆試題目

★億級PV高並發場景訂單的處理

★laravel開發天貓商城組件服務

★戰旗TV視頻直播的架構項目實戰

掃描下面二維碼領取

 

對PHP后端技術,對PHP架構技術感興趣的朋友,我的官方群點擊此處,一起學習,相互討論。

群內已經有管理將知識體系整理好(源碼,學習視頻等資料),歡迎加群免費領取。

本課程深度對標騰訊T3-T4標准,貼身打造學習計划為web開發人員進階中高級、架構師提升技術,為自己增值漲薪!加入BAT特訓營還可以獲得內推大廠名額以及GO語言學習權限!!!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM