RabbitMQ是實現了高級消息隊列協議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)。RabbitMQ服務器是用Erlang語言編寫的,而集群和故障轉移是構建在開放電信平台框架上的。所有主要的編程語言均有與代理接口通訊的客戶端庫。
消息隊列工作示意圖
生產者P--[發布消息]--> 交換機X--[根據路由綁定分發]-->隊列<--[訂閱消息]--消費者C
關系
1 生產者和交換機
創建消息,並且發送到對應的交換機,發送的時候可以帶上特定的routeKey
2 交換機和隊列
隊列 綁定到 交換機 (可以設置路由 routeKey direct 和 topic 模式下 有效,廣播模式只要綁定就分發到隊列)
假如交換機上的消息分發不到隊列,則此消息就自動刪除了
a 直連交換機(Direct exchange)
交換機 --[所有綁定在自己上的隊列中找出設置和消息routeKey一樣的]--> 隊列 ,根據綁定的routeKey 來找隊列 分發消息
b 廣播交換機 (Fanout Exchange)
交換機 --[所有綁定在自己上的]--> 隊列, 只要隊列綁定到交換機 隊列分發消息
c 主題交換機 (Topic Exchange)
交換機 ---[所有綁定在自己上的隊列中找出 消息routeKey 滿足隊列匹配的]--> 隊列, routeKey 滿足匹配要求的隊列就會分發消息.
3 隊列和消費者
消費者綁定到對應的隊列就能得到隊列中的消息, 假如多位消費者同事消費一個隊列 可以通過 prefetchCount 來設置 最多同時消費個數, 握手后再發送新的消息過來
示例代碼說明
消息隊列雖然是持久化,可以通過握手機制來實現是否正真消費。示例代碼中采用了默認握手,通過數據庫記錄中存放對應執行記錄來實現隊列的執行情況監控。
1 rabbitmq操作
新建demo 賬號
rabbitmqctl add_user demo 181219
新建demo 虛擬主機
rabbitmqctl add_vhost demo
設置 demo 賬號在 demo 虛擬主機 權限
rabbitmqctl set_permissions -p demo demo ".*" ".*" ".*"
web界面插件開啟
rabbitmq-plugins enable rabbitmq_management
設置demo 賬號 角色
rabbitmqctl set_user_tags demo administrator
2 數據庫表
表一共2張 一張記錄消息內容以及執行情況, 另一張記錄執行失敗的錯誤信息
CREATE TABLE `mq_process_error_log` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `msg_id` bigint(20) NOT NULL, `process_msg` varchar(255) NOT NULL DEFAULT '' COMMENT '執行返回信息', `create_time` int(10) NOT NULL DEFAULT '0' COMMENT '創建時間', PRIMARY KEY (`id`), KEY `idx_msg_id` (`msg_id`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='消息處理失敗日志表'; -- ---------------------------- -- Table structure for mq_process_log -- ---------------------------- CREATE TABLE `mq_process_log` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `msg_str` varchar(200) NOT NULL DEFAULT '' COMMENT '消息請求內容 json字符串', `msg_type` tinyint(2) NOT NULL DEFAULT '0' COMMENT '消息類型', `find_keyword` varchar(32) NOT NULL DEFAULT '' COMMENT '查找消息內容的關鍵字', `create_time` int(10) NOT NULL DEFAULT '0' COMMENT '創建時間', `process_num` tinyint(2) NOT NULL DEFAULT '0' COMMENT '執行次數', `process_start_time` int(10) NOT NULL DEFAULT '0' COMMENT '執行開始時間', `process_end_time` int(10) NOT NULL DEFAULT '0' COMMENT '執行結束時間', `process_status` tinyint(1) NOT NULL DEFAULT '0' COMMENT '執行狀態 0 未執行 1 成功 2 失敗', `process_msg` varchar(255) NOT NULL DEFAULT '' COMMENT '執行返回信息', PRIMARY KEY (`id`), KEY `idx_find_keyword` (`find_keyword`) USING BTREE, KEY `idx_msg_type` (`msg_type`) USING BTREE, KEY `idx_process_status` (`process_status`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='消息處理日志表';
3 代碼說明
創建3個隊列來處理不同的消息, 來滿足不同優先級消息處理
發送消息,以及處理消息監聽,重發消息都有封裝
代碼中已經創建了抽象父類,不同的vhost配置只需要繼承父類即可, 如下代碼
/** * demo-rabbitMq類 * Class DemoRabbitMq * @package App\Service\Amqp * @author zxqc2018 */ class DemoRabbitMq extends AbstractRabbitMq { use Singleton; //mq配置 protected $configName = 'demo'; //默認交換機 protected $defaultExchangeName = 'demo-exchange'; //快中慢-隊列配置 protected $queuePriorityConfig = [ 'fast' => ['demo-fast-queue', 'demo.fast.#'], 'middle' => ['demo-middle-queue', 'demo.middle.#'], 'slow' => ['demo-slow-queue', 'demo.slow.#'], ]; /** * 設置routeKey對應處理方法 * @author zxqc2018 */ function settingRouteKeyProcessFunc() { $this->routeKeyProcessFunc[self::DEMO_FAST_TEST] = function ($msgData) { print_r($msgData); $res = \resultData(); $res->setMessage('fast'); return $res; }; $this->routeKeyProcessFunc[self::DEMO_SLOW_TEST] = function ($msgData) { print_r($msgData); if (rand(1,10) > 5) { return resultData([], ErrorCode::ERROR_RABBIT_MQ, '測試處理失敗'); } $res = \resultData(); $res->setMessage('slow'); return $res; }; } }
代碼地址: