1、MQ的應用場景
優點:
a)主要解決異步消息
b)應用解耦
c)流量消峰等問題
d)日志處理(kafka)
缺點:
a)系統可用性降低:你想啊,本來其他系統只要運行好好的,那你的系統就是正常的。現在你非要加個消息隊列進去,那消息隊列掛了,你的系統不是呵呵了。因此,系統可用性降低
b)系統復雜性增加:要多考慮很多方面的問題,比如一致性問題、如何保證消息不被重復消費,如何保證保證消息可靠傳輸。因此,需要考慮的東西更多,系統復雜性增大。
1、為什么會造成重復消費?
因為網絡傳輸等等故障,確認信息沒有傳送到消息隊列,導致消息隊列不知道自己已經消費過該消息了,再次將該消息分發給其他的消費者。
2、解決重復消費的方案:
(1)比如,你拿到這個消息做數據庫的insert操作。那就容易了,給這個消息做一個唯一主鍵,那么就算出現重復消費的情況,就會導致主鍵沖突,避免數據庫出現臟數據。
(2)再比如,你拿到這個消息做redis的set的操作,那就容易了,不用解決,因為你無論set幾次結果都是一樣的,set操作本來就算冪等操作。
(3)如果上面兩種情況還不行,上大招。准備一個第三方介質,來做消費記錄。以redis為例,給消息分配一個全局id,只要消費過該消息,將<id,message>以K-V形式寫入redis。那消費者開始消費前,先去redis中查詢有沒消費記錄即可。
2、消息模型
a)P2P(Point to Point)點對點模式(也就是一個任務只能被一個消費者消費)
1、包含三個角色:消息隊列(Queue),發送者(Sender),接受者(Receiver)
PHP實現:
安裝rabbitMQ擴展:
在你的項目中添加一個 composer.json文件:
{ "require": { "php-amqplib/php-amqplib": "2.6.1" } }
2、簡單模式(一對一)
<?php # @File : sample-send.php # @Author: Liugp # @Date : 2019/7/22 # @Desc : 生產者 require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; // 建立AMQP連接 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); // 定義隊列名稱 $channel->queue_declare('hello', false, false, false, false); // 定義要發送的信息 $msg = new AMQPMessage('Hello World!'.time()); // 發送消息 $channel->basic_publish($msg, '', 'hello'); echo " [x] Sent 'Hello World!'\n"; $channel->close(); $connection->close();
<?php # @File : sample-reciver.php # @Author: Liugp # @Date : 2019/7/22 # @Desc : 消費者 require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; // 建立連接 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); // 創建信息通道 $channel = $connection->channel(); // 聲明隊列 $channel->queue_declare('hello', false, false, false, false); echo " [*] Waiting for messages. To exit press CTRL+C\n"; // 接受生產者的消息回調函數 $callback = function ($msg) { echo ' [x] Received ', $msg->body, "\n"; }; // 消費信息 $channel->basic_consume('hello', '', false, true, false, false, $callback); // 正在消費時,則等待 while ($channel->is_consuming()) { $channel->wait(); } $channel->close(); $connection->close();
3、Work模式(輪循隊列,每個消費者消費的數量是一樣的)(一對多)
4、Work模式(能者多勞)(一對多)
<?php # @File : work-send.php # @Author: Liugp # @Date : 2019/7/22 # @Desc : [work模式]生產者 require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); // 定義隊列名稱 // 隊列聲明為持久化(durable); 通過queue_declare的第三參數為true $channel->queue_declare('task_queue', false, true, false, false); $data = implode(' ', array_slice($argv, 1)); if (empty($data)) { $data = "Hello World!"; } $msg = new AMQPMessage( $data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) ); $channel->basic_publish($msg, '', 'task_queue'); echo ' [x] Sent ', $data, "\n"; $channel->close(); $connection->close();
<?php # @File : work-reciver.php # @Author: Liugp # @Date : 2019/7/22 # @Desc : [work模式]消費者 require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; // 建立連接 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); // 創建信息通道 $channel = $connection->channel(); // 隊列聲明為持久化(durable); 通過queue_declare的第三參數為true $channel->queue_declare('task_queue', false, true, false, false); echo " [*] Waiting for messages. To exit press CTRL+C\n"; $callback = function ($msg) { echo ' [x] Received ', $msg->body, "\n"; sleep(substr_count($msg->body, '.')); echo " [x] Done\n"; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; // 公平調度(即能者多勞) $channel->basic_qos(null, 1, null); // 第四個參數basic_consume為false (true 意味着不響應ack);消費者掛掉這后,所有沒有響應的消息都會重新發送,減小消息丟失的概率,改為false后,則是手動確認,默認是自動確認 $channel->basic_consume('task_queue', '', false, false, false, false, $callback); while ($channel->is_consuming()) { $channel->wait(); } $channel->close(); $connection->close();
5、每個消息只有一個消費者
6、發送者和接受者沒有時間依賴
7、接受者確認消息接受和處理成功
b)Publish/Subscribe(Pub/Sub)發布訂閱模式
1、包含三個角色:主題(Topic),發布者(Publisher),訂閱者(Subscriber)
2、一個生產者,多個消息者;客戶端只有訂閱后才能收到消息;持久化和非持久化
3、每個消費者都有自己的隊列
4、生產者沒有直接把消息發送到隊列,而是發送到交換機 轉發器exchange
5、每個隊列都要綁定到交換機上
6、生產者發送的消息經過交換機到達隊列,就能實現一個消息被多個消費者消費
7、Exchange(交換機 轉發器)
1、一方面是接受生產者的消息,另一方面是向隊列推送消息
2、匿名轉發
3、Fanout(訂閱模式;不處理路由鍵,廣播)
<?php # @File : subscribe-send.php # @Author: Liugp # @Date : 2019/7/23 # @Desc : [發布/訂閱模式]生產者 require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; // 建立連接 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); // 創建信息通道 $channel = $connection->channel(); // 定義交換機,第一個參數是交換機名稱,第二參數是交換機類型 $channel->exchange_declare('logs', 'fanout', false, false, false); $data = implode(' ', array_slice($argv, 1)); if (empty($data)) { $data = "info: Hello World!"; } $msg = new AMQPMessage($data); $channel->basic_publish($msg, 'logs'); echo ' [x] Sent ', $data, "\n"; $channel->close(); $connection->close();
<?php # @File : subscribe-reciver.php # @Author: Liugp # @Date : 2019/7/23 # @Desc : [發布/訂閱模式]消費者 # require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; // 建立連接 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); // 創建信息通道 $channel = $connection->channel(); // 定義交換機,第一個參數是交換機名稱,第二參數是交換機類型 $channel->exchange_declare('logs', 'fanout', false, false, false); // 定義隊列
// 在 php-amqplib 客戶端,當我們提供隊列名稱為空字符串時,我們創建了一個具有生成名稱的非持久隊列:
// list($queue_name, ,) = $channel->queue_declare("");
// 方法返回時,$queue_name變量包含一個隨機生成的RabbitMQ隊列名稱。例如,類似amq.gen-jzty20brgko-hjmujj0wlg。
list($queue_name, ) = $channel->queue_declare("", false, false, true, false); // 將交換機綁定到隊列 $channel->queue_bind($queue_name, 'logs'); echo " [*] Waiting for logs. To exit press CTRL+C\n"; $callback = function ($msg) { echo ' [x] ', $msg->body, "\n"; }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); while ($channel->is_consuming()) { $channel->wait(); } $channel->close(); $connection->close();
4、Direct(路由模式;處理路由鍵,發布與訂閱,完全匹配)
多個綁定(Multiple bindings)
整合
<?php # @File : routing-send.php # @Author: Liugp # @Date : 2019/7/23 # @Desc : [路由模式]生產者 require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; // 建立連接 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); // 創建信息通道 $channel = $connection->channel(); // 聲明交換機,第一參數為交換機名稱,第二參數為交換機類型 $channel->exchange_declare('direct_logs', 'direct', false, false, false); $severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info'; $data = implode(' ', array_slice($argv, 2)); if(empty($data)) $data = "Hello World!"; $msg = new AMQPMessage($data); $channel->basic_publish($msg, 'direct_logs', $severity); echo " [x] Sent ",$severity,':',$data," \n"; $channel->close(); $connection->close(); ?>
<?php # @File : routing-reciver.php # @Author: Liugp # @Date : 2019/7/23 # @Desc : [路由模式]消費者 require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('direct_logs', 'direct', false, false, false); // 定義隊列,第一個參數為隊列名稱,為空則隨機生成 list($queue_name, ) = $channel->queue_declare("", false, false, true, false); $severities = array_slice($argv, 1); if (empty($severities)) { file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n"); exit(1); } foreach ($severities as $severity) { // 第二參數是交換機名稱,第三個參數是路由鍵名稱 $channel->queue_bind($queue_name, 'direct_logs', $severity); } echo ' [*] Waiting for logs. To exit press CTRL+C', "\n"; $callback = function ($msg) { echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n"; }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); while (count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close();
5、Topic(Topic模式,規則匹配)
1、將路由鍵和某模式匹配
2、"#"匹配零個或者多個
3、“*”匹配任意一個
<?php # @File : topic-send.php # @Author: Liugp # @Date : 2019/7/23 # @Desc : [主題模式]生產者 require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; // 建立連接 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); // 創建信息通道 $channel = $connection->channel(); // 聲明交換機,第一參數為交換機名稱,第二參數為交換機類型 $channel->exchange_declare('topic_logs', 'topic', false, false, false); $routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info'; $data = implode(' ', array_slice($argv, 2)); if (empty($data)) { $data = "Hello World!"; } $msg = new AMQPMessage($data); $channel->basic_publish($msg, 'topic_logs', $routing_key); echo " [x] Sent ", $routing_key, ':', $data, " \n"; $channel->close(); $connection->close();
<?php # @File : topic-reciver.php # @Author: Liugp # @Date : 2019/7/23 # @Desc : [主題模式]消費者 require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); // 聲明交換機,第一參數為交換機名稱,第二參數為交換機類型 $channel->exchange_declare('topic_logs', 'topic', false, false, false); list($queue_name, ) = $channel->queue_declare("", false, false, true, false); $binding_keys = array_slice($argv, 1); if (empty($binding_keys)) { file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n"); exit(1); } foreach ($binding_keys as $binding_key) { $channel->queue_bind($queue_name, 'topic_logs', $binding_key); } echo ' [*] Waiting for logs. To exit press CTRL+C', "\n"; $callback = function ($msg) { echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n"; }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); while (count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close();
3、使用RabbitMQ實現松耦合設計
4、RabbitMQ消息處理
a)RabbitMQ的消息持久化處理
1、消息的可靠性是RabbitMQ的一部分,那么RabbitMQ是如何保證消息可靠性的呢---消息持久化
2、autoDelete
@Queue:當所有消費客戶端鏈接斷開后,是否自動刪除隊列隊列;true:刪除,false:不刪除
@Exchange:當所有綁定隊列都不在使用時,是否自動刪除交換機;true:刪除 false:不刪除
3、消息確認ACK機制
ACK機制是消費者從RabbitMQ收到消息並處理完成后,反饋給RabbitMQ,RabbitMQ收到反饋才將此消息從隊列中刪除
5、Rabbitmq的消息確認機制(事務+confirm)
a)在rabbitmq中,可以通過持久化數據,解決rabbitmq服務器異常的數據丟失問題
b)問題:生產者將消息發送出去之后,消息到底有沒有達到rabbitmq服務器,默認是不知道的
解決(兩種方式):
1、AMQP實現事務機制
2、Confirm
c)事務機制(txSelect,txCommit,txRollback)
1、txSelect:用戶將當前channel設置成transation模式
2、txCommit:用於提交事務
3、txRollback:回滾事務
6、Kafka消息隊列