laravel框架的rabbitmq使用示例[多隊列封裝]


    RabbitMQ是實現了高級消息隊列協議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)。RabbitMQ服務器是用Erlang語言編寫的,而集群和故障轉移是構建在開放電信平台框架上的。所有主要的編程語言均有與代理接口通訊的客戶端庫。

    消息隊列工作示意圖

    

     生產者P--[發布消息]--> 交換機X--[根據路由綁定分發]-->隊列<--[訂閱消息]--消費者C

 

     關系

    1 生產者和交換機

         創建消息,並且發送到對應的交換機,發送的時候可以帶上特定的routeKey

    2 交換機和隊列

       隊列 綁定到 交換機  (可以設置路由 routeKey  direct 和 topic 模式下 有效,廣播模式只要綁定就分發到隊列)

       假如交換機上的消息分發不到隊列,則此消息就自動刪除了

       a 直連交換機(Direct exchange)

           交換機 --[所有綁定在自己上的隊列中找出設置和消息routeKey一樣的]--> 隊列 ,根據綁定的routeKey 來找隊列 分發消息
       b 廣播交換機 (Fanout Exchange)
           交換機 --[所有綁定在自己上的]--> 隊列,  只要隊列綁定到交換機 隊列分發消息
       c 主題交換機 (Topic Exchange)
           交換機 ---[所有綁定在自己上的隊列中找出 消息routeKey 滿足隊列匹配的]--> 隊列, routeKey 滿足匹配要求的隊列就會分發消息.

     3 隊列和消費者

        消費者綁定到對應的隊列就能得到隊列中的消息,  假如多位消費者同事消費一個隊列 可以通過 prefetchCount 來設置 最多同時消費個數, 握手后再發送新的消息過來

 

    示例代碼說明

    消息隊列雖然是持久化,可以通過握手機制來實現是否正真消費。示例代碼中采用了默認握手,通過數據庫記錄中存放對應執行記錄來實現隊列的執行情況監控。

    1 rabbitmq操作

        新建demo 賬號
        rabbitmqctl add_user demo 181219

        新建demo 虛擬主機
        rabbitmqctl add_vhost demo

        設置 demo 賬號在 demo 虛擬主機 權限
        rabbitmqctl set_permissions -p demo demo ".*" ".*" ".*"


        web界面插件開啟
        rabbitmq-plugins enable rabbitmq_management

        設置demo 賬號 角色
        rabbitmqctl set_user_tags demo administrator

     2 數據庫表

    表一共2張 一張記錄消息內容以及執行情況, 另一張記錄執行失敗的錯誤信息

CREATE TABLE `mq_process_error_log` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `msg_id` bigint(20) NOT NULL,
  `process_msg` varchar(255) NOT NULL DEFAULT '' COMMENT '執行返回信息',
  `create_time` int(10) NOT NULL DEFAULT '0' COMMENT '創建時間',
  PRIMARY KEY (`id`),
  KEY `idx_msg_id` (`msg_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='消息處理失敗日志表';

-- ----------------------------
-- Table structure for mq_process_log
-- ----------------------------
CREATE TABLE `mq_process_log` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `msg_str` varchar(200) NOT NULL DEFAULT '' COMMENT '消息請求內容 json字符串',
  `msg_type` tinyint(2) NOT NULL DEFAULT '0' COMMENT '消息類型',
  `find_keyword` varchar(32) NOT NULL DEFAULT '' COMMENT '查找消息內容的關鍵字',
  `create_time` int(10) NOT NULL DEFAULT '0' COMMENT '創建時間',
  `process_num` tinyint(2) NOT NULL DEFAULT '0' COMMENT '執行次數',
  `process_start_time` int(10) NOT NULL DEFAULT '0' COMMENT '執行開始時間',
  `process_end_time` int(10) NOT NULL DEFAULT '0' COMMENT '執行結束時間',
  `process_status` tinyint(1) NOT NULL DEFAULT '0' COMMENT '執行狀態 0 未執行 1 成功 2 失敗',
  `process_msg` varchar(255) NOT NULL DEFAULT '' COMMENT '執行返回信息',
  PRIMARY KEY (`id`),
  KEY `idx_find_keyword` (`find_keyword`) USING BTREE,
  KEY `idx_msg_type` (`msg_type`) USING BTREE,
  KEY `idx_process_status` (`process_status`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='消息處理日志表';

     3 代碼說明

       創建3個隊列來處理不同的消息, 來滿足不同優先級消息處理

       發送消息,以及處理消息監聽,重發消息都有封裝

      代碼中已經創建了抽象父類,不同的vhost配置只需要繼承父類即可, 如下代碼

     

/**
 * demo-rabbitMq類
 * Class DemoRabbitMq
 * @package App\Service\Amqp
 * @author zxqc2018
 */
class DemoRabbitMq extends AbstractRabbitMq
{
    use Singleton;
    //mq配置
    protected $configName = 'demo';
    //默認交換機
    protected $defaultExchangeName = 'demo-exchange';
    //快中慢-隊列配置
    protected $queuePriorityConfig = [
        'fast' => ['demo-fast-queue', 'demo.fast.#'],
        'middle' => ['demo-middle-queue', 'demo.middle.#'],
        'slow' => ['demo-slow-queue', 'demo.slow.#'],
    ];

    /**
     * 設置routeKey對應處理方法
     * @author zxqc2018
     */
    function settingRouteKeyProcessFunc()
    {
        $this->routeKeyProcessFunc[self::DEMO_FAST_TEST] = function ($msgData) {
            print_r($msgData);
            $res = \resultData();
            $res->setMessage('fast');
            return $res;
        };
        $this->routeKeyProcessFunc[self::DEMO_SLOW_TEST] = function ($msgData) {
            print_r($msgData);
            if (rand(1,10) > 5) {
                return resultData([], ErrorCode::ERROR_RABBIT_MQ, '測試處理失敗');
            }
            $res = \resultData();
            $res->setMessage('slow');
            return $res;
        };
    }
}

 

     代碼地址:

     https://gitee.com/zxqc2014/laravel_rabbitmq_demo


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM