消息隊列MQ


1、MQ的應用場景

  優點:
    a)主要解決異步消息
    b)應用解耦
    c)流量消峰等問題
    d)日志處理(kafka)

  缺點:

    a)系統可用性降低:你想啊,本來其他系統只要運行好好的,那你的系統就是正常的。現在你非要加個消息隊列進去,那消息隊列掛了,你的系統不是呵呵了。因此,系統可用性降低

    b)系統復雜性增加:要多考慮很多方面的問題,比如一致性問題、如何保證消息不被重復消費,如何保證保證消息可靠傳輸。因此,需要考慮的東西更多,系統復雜性增大。

      1、為什么會造成重復消費?

        因為網絡傳輸等等故障,確認信息沒有傳送到消息隊列,導致消息隊列不知道自己已經消費過該消息了,再次將該消息分發給其他的消費者。

      2、解決重復消費的方案:

        (1)比如,你拿到這個消息做數據庫的insert操作。那就容易了,給這個消息做一個唯一主鍵,那么就算出現重復消費的情況,就會導致主鍵沖突,避免數據庫出現臟數據。
        (2)再比如,你拿到這個消息做redis的set的操作,那就容易了,不用解決,因為你無論set幾次結果都是一樣的,set操作本來就算冪等操作。
        (3)如果上面兩種情況還不行,上大招。准備一個第三方介質,來做消費記錄。以redis為例,給消息分配一個全局id,只要消費過該消息,將<id,message>以K-V形式寫入redis。那消費者開始消費前,先去redis中查詢有沒消費記錄即可。

2、消息模型
  a)P2P(Point to Point)點對點模式(也就是一個任務只能被一個消費者消費)
    1、包含三個角色:消息隊列(Queue),發送者(Sender),接受者(Receiver)

PHP實現:

安裝rabbitMQ擴展:

在你的項目中添加一個 composer.json文件:

{
     "require": {
      "php-amqplib/php-amqplib": "2.6.1"
     }
 }

     2、簡單模式(一對一)

<?php
# @File  : sample-send.php
# @Author: Liugp
# @Date  : 2019/7/22
# @Desc  : 生產者

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 建立AMQP連接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel    = $connection->channel();
// 定義隊列名稱
$channel->queue_declare('hello', false, false, false, false);
// 定義要發送的信息
$msg = new AMQPMessage('Hello World!'.time());
// 發送消息
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent 'Hello World!'\n";
$channel->close();
$connection->close();
<?php
# @File  : sample-reciver.php
# @Author: Liugp
# @Date  : 2019/7/22
# @Desc  : 消費者

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// 建立連接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
// 創建信息通道
$channel    = $connection->channel();

// 聲明隊列
$channel->queue_declare('hello', false, false, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";

//  接受生產者的消息回調函數
$callback = function ($msg) {
    echo ' [x] Received ', $msg->body, "\n";
};

// 消費信息
$channel->basic_consume('hello', '', false, true, false, false, $callback);

// 正在消費時,則等待
while ($channel->is_consuming()) {
    $channel->wait();
}
$channel->close();
$connection->close();

    3、Work模式(輪循隊列,每個消費者消費的數量是一樣的)(一對多)
    4、Work模式(能者多勞)(一對多)

<?php
# @File  : work-send.php
# @Author: Liugp
# @Date  : 2019/7/22
# @Desc  : [work模式]生產者

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// 定義隊列名稱
// 隊列聲明為持久化(durable); 通過queue_declare的第三參數為true
$channel->queue_declare('task_queue', false, true, false, false);
$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
    $data = "Hello World!";
}

$msg = new AMQPMessage(
    $data,
    array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
);
$channel->basic_publish($msg, '', 'task_queue');
echo ' [x] Sent ', $data, "\n";
$channel->close();
$connection->close();
<?php
# @File  : work-reciver.php
# @Author: Liugp
# @Date  : 2019/7/22
# @Desc  : [work模式]消費者

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// 建立連接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

// 創建信息通道
$channel    = $connection->channel();

// 隊列聲明為持久化(durable); 通過queue_declare的第三參數為true
$channel->queue_declare('task_queue', false, true, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) {
    echo ' [x] Received ', $msg->body, "\n";
    sleep(substr_count($msg->body, '.'));
    echo " [x] Done\n";
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

// 公平調度(即能者多勞)
$channel->basic_qos(null, 1, null);

// 第四個參數basic_consume為false (true 意味着不響應ack);消費者掛掉這后,所有沒有響應的消息都會重新發送,減小消息丟失的概率,改為false后,則是手動確認,默認是自動確認
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}
$channel->close();
$connection->close();

    5、每個消息只有一個消費者

    6、發送者和接受者沒有時間依賴

    7、接受者確認消息接受和處理成功

 


  b)Publish/Subscribe(Pub/Sub)發布訂閱模式
    1、包含三個角色:主題(Topic),發布者(Publisher),訂閱者(Subscriber)
    2、一個生產者,多個消息者;客戶端只有訂閱后才能收到消息;持久化和非持久化
    3、每個消費者都有自己的隊列
    4、生產者沒有直接把消息發送到隊列,而是發送到交換機 轉發器exchange
    5、每個隊列都要綁定到交換機上
    6、生產者發送的消息經過交換機到達隊列,就能實現一個消息被多個消費者消費
    7、Exchange(交換機 轉發器)
      1、一方面是接受生產者的消息,另一方面是向隊列推送消息
      2、匿名轉發
      3、Fanout(訂閱模式;不處理路由鍵,廣播)

<?php
# @File  : subscribe-send.php
# @Author: Liugp
# @Date  : 2019/7/23
# @Desc  : [發布/訂閱模式]生產者

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 建立連接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

// 創建信息通道
$channel    = $connection->channel();

// 定義交換機,第一個參數是交換機名稱,第二參數是交換機類型
$channel->exchange_declare('logs', 'fanout', false, false, false);
$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
    $data = "info: Hello World!";
}
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, 'logs');
echo ' [x] Sent ', $data, "\n";
$channel->close();
$connection->close();
<?php
# @File  : subscribe-reciver.php
# @Author: Liugp
# @Date  : 2019/7/23
# @Desc  : [發布/訂閱模式]消費者
#

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

// 建立連接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

// 創建信息通道
$channel    = $connection->channel();

// 定義交換機,第一個參數是交換機名稱,第二參數是交換機類型
$channel->exchange_declare('logs', 'fanout', false, false, false);

// 定義隊列
// php-amqplib 客戶端,當我們提供隊列名稱為空字符串時,我們創建了一個具有生成名稱的非持久隊列:
// list($queue_name, ,) = $channel->queue_declare("");
// 方法返回時,$queue_name變量包含一個隨機生成的RabbitMQ隊列名稱。例如,類似amq.gen-jzty20brgko-hjmujj0wlg。
list($queue_name, ) = $channel->queue_declare("", false, false, true, false);

// 將交換機綁定到隊列
$channel->queue_bind($queue_name, 'logs');

echo " [*] Waiting for logs. To exit press CTRL+C\n";
$callback = function ($msg) {
    echo ' [x] ', $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
while ($channel->is_consuming()) {
    $channel->wait();
}
$channel->close();
$connection->close();

      4、Direct(路由模式;處理路由鍵,發布與訂閱,完全匹配)

多個綁定(Multiple bindings)

 

整合

<?php
# @File  : routing-send.php
# @Author: Liugp
# @Date  : 2019/7/23
# @Desc  : [路由模式]生產者

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 建立連接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

// 創建信息通道
$channel = $connection->channel();

// 聲明交換機,第一參數為交換機名稱,第二參數為交換機類型
$channel->exchange_declare('direct_logs', 'direct', false, false, false);

$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';

$data = implode(' ', array_slice($argv, 2));
if(empty($data)) $data = "Hello World!";

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'direct_logs', $severity);

echo " [x] Sent ",$severity,':',$data," \n";

$channel->close();
$connection->close();

?>
<?php
# @File  : routing-reciver.php
# @Author: Liugp
# @Date  : 2019/7/23
# @Desc  : [路由模式]消費者

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel    = $connection->channel();

$channel->exchange_declare('direct_logs', 'direct', false, false, false);

// 定義隊列,第一個參數為隊列名稱,為空則隨機生成
list($queue_name, ) = $channel->queue_declare("", false, false, true, false);

$severities = array_slice($argv, 1);
if (empty($severities)) {
    file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
    exit(1);
}

foreach ($severities as $severity) {
    // 第二參數是交換機名稱,第三個參數是路由鍵名稱
    $channel->queue_bind($queue_name, 'direct_logs', $severity);
}

echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";

$callback = function ($msg) {
    echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while (count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

      5、TopicTopic模式,規則匹配)

        1、將路由鍵和某模式匹配
        2、"#"匹配零個或者多個
        3、“*”匹配任意一個

<?php
# @File  : topic-send.php
# @Author: Liugp
# @Date  : 2019/7/23
# @Desc  : [主題模式]生產者

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 建立連接
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

// 創建信息通道
$channel = $connection->channel();

// 聲明交換機,第一參數為交換機名稱,第二參數為交換機類型
$channel->exchange_declare('topic_logs', 'topic', false, false, false);

$routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';
$data        = implode(' ', array_slice($argv, 2));
if (empty($data)) {
    $data = "Hello World!";
}

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'topic_logs', $routing_key);

echo " [x] Sent ", $routing_key, ':', $data, " \n";

$channel->close();
$connection->close();
<?php
# @File  : topic-reciver.php
# @Author: Liugp
# @Date  : 2019/7/23
# @Desc  : [主題模式]消費者

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel    = $connection->channel();

// 聲明交換機,第一參數為交換機名稱,第二參數為交換機類型
$channel->exchange_declare('topic_logs', 'topic', false, false, false);

list($queue_name, ) = $channel->queue_declare("", false, false, true, false);

$binding_keys = array_slice($argv, 1);
if (empty($binding_keys)) {
    file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
    exit(1);
}

foreach ($binding_keys as $binding_key) {
    $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}

echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";

$callback = function ($msg) {
    echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while (count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

 

 3、使用RabbitMQ實現松耦合設計

4、RabbitMQ消息處理
  a)RabbitMQ的消息持久化處理
    1、消息的可靠性是RabbitMQ的一部分,那么RabbitMQ是如何保證消息可靠性的呢---消息持久化
    2、autoDelete
      @Queue:當所有消費客戶端鏈接斷開后,是否自動刪除隊列隊列;true:刪除,false:不刪除
      @Exchange:當所有綁定隊列都不在使用時,是否自動刪除交換機;true:刪除 false:不刪除
    3、消息確認ACK機制
      ACK機制是消費者從RabbitMQ收到消息並處理完成后,反饋給RabbitMQ,RabbitMQ收到反饋才將此消息從隊列中刪除

5、Rabbitmq的消息確認機制(事務+confirm)
  a)在rabbitmq中,可以通過持久化數據,解決rabbitmq服務器異常的數據丟失問題
  b)問題:生產者將消息發送出去之后,消息到底有沒有達到rabbitmq服務器,默認是不知道的
    解決(兩種方式):
      1、AMQP實現事務機制
      2、Confirm
  c)事務機制(txSelect,txCommit,txRollback)
    1、txSelect:用戶將當前channel設置成transation模式
    2、txCommit:用於提交事務
    3、txRollback:回滾事務

6、Kafka消息隊列

 

 

 

 

   


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM