RabbitMq的使用--一對一,一對多,隊列,消息持久化-輪詢,公平調度,自動,手動響應篇


//生產者
<?php namespace app\rabbit; //require_once __DIR__ . '/autoload.php'; //因為我自己使用的是tp框架 所以我在這里不需要再加載這個類了 use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; class Send { public function connect($num) { //連接rabbit $connection = new AMQPStreamConnection('***.**.***.***', 5672, 'mq用戶名', 'mq密碼'); //創建一個通道 $channel = $connection->channel(); //創建隊列 $channel->queue_declare('hello', false, false, false, false); // 1.隊列名 2.未知3.隊列是否持久化 false:隊列在內存中,服務器掛了 隊列就沒了 true:服務器掛了,隊列重新生成 注意:只是隊列持久化,不是隊列仲的消息持久化!!! // 4.//隊列是否專屬,專屬的范圍針對的是連接,也就是說,一個連接下面的多個信道是可見的.對於其他連接是不可見的.連接斷開后,該隊列會被刪除.注意,不是信道斷開,是連接斷開.並且,就算設置成了持久化,也會刪除. // 5.//如果所有消費者都斷開連接了,是否自動刪除.如果還沒有消費者從該隊列獲取過消息或者監聽該隊列,那么該隊列不會刪除.只有在有消費者從該隊列獲取過消息后,該隊列才有可能自動刪除(當所有消費者都斷開連接,不管消息是否獲取完) //創建一條信息 $msg = new AMQPMessage('如果我是沙兄+++'.$num); //推送一條消息到隊列中 $res = $channel->basic_publish($msg, '', 'hello'); //通道關閉 $channel->close(); //連接關閉 $connection->close(); } }



//消費者 要加多個消費者 創建多個這樣的文件運行就行 rabbitMq默認是公平分配的方式 --輪詢(平均分配)  就是說你有100條消息 消費者a已經消費完單數的50條 消費者b才消費完雙數的10條 那也還有40條等着消費者b去消費,不會派給消費者a的

<?php
require_once __DIR__ . '/autoload.php'; //因為消費者一般來說是在命令行執行 所以要引入這個類
use PhpAmqpLib\Connection\AMQPStreamConnection;
        //創建一個連接
        $connection = new AMQPStreamConnection('***.***.***.***', 5672, 'mq用戶名', 'mq密碼');
        //創建一個通道
        $channel = $connection->channel();
        //聲明一個隊列是冪等的,它僅僅在不存在的情況下被創建
        $channel->queue_declare('hello', false, false, false, false);
        //拉取隊列中的消息
        $res = $channel->basic_consume('hello', '', false, false, false, false, function ($msg) {
        //  file_put_contents('get.text', '這是消息2'.$msg->body."\n", FILE_APPEND);
         echo '說:'.$msg->body."\n";
         sleep(3);
            try {
                var_dump($msg);
                //這里是上面第四個消息回復設置為false 之后需要手動確認 不然的話消息不會刪掉
                $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);

            }catch(Exception $e) {
                return false;
            }
        });

/**
 * 1.$queue  隊列名
 * 2.$consumer_tag  消費者名稱,自定義,可以為空
 * 3.$no_local  功能屬於AMQP協議的標准,但是rabbitMQ並沒有做實現。
 * 4.$no_ack  自動應答,當自動應答等於true的時候,表示當消費者一收到消息就表示消費者收到了消息,消息就會立即從隊列中刪除。一般設置為false,由消費者手動進行ack。
 * 5.$exclusive  是否排外(排外:queue只被一個消費者使用並且在消費者斷開連接時queue被刪除),一般默認為false。
 * 6.$nowait  當nowait為true時,不要等待服務器確認請求就立即開始消費消息。如果不能消費,有可能引發通道異常並關閉通道。一般默認設置為false。
 * 7.$callback  回調函數,拿到消息你的業務邏輯就寫在里面。
 */

        while(count($channel->callbacks)) {
            $channel->wait();
        }

 

//隊列持久化和消息持久化和公平調度

如果你沒有特意告訴RabbitMQ,那么在它退出或者崩潰的時候,將會丟失所有隊列和消息。為了確保信息不會丟失,有兩個事情是需要注意的:我們必須把“隊列”和“消息”設為持久化。

首先,為了不讓隊列消失,需要把隊列聲明為持久化(durable)。為此我們通過queue_declare的第三參數為true: (這里生產者和消費者都必須改,上面我們已經聲明了一個hello的隊列是不持久化的,所以這里要重新聲明一個新的隊列,不然會報錯)

$channel->queue_declare('shaxiong', false, true, false, false);

這時候,我們就可以確保在RabbitMq重啟之后queue_declare隊列不會丟失。另外,我們需要把我們的消息也要設為持久化——設置delivery_mode = 2。(在生產者創建消息的時候設置)

$msg = new AMQPMessage($data,
       array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
       );
//注意:消息持久化
//將消息設為持久化並不能完全保證不會丟失。以上代碼只是告訴了RabbitMq要把消息存到硬盤,但從RabbitMq收到消息到保存之間還是有一個很小的間隔時間。因為RabbitMq並不是所有的消息都使用fsync(2)
//——它有可能只是保存到緩存中,並不一定會寫到硬盤中。並不能保證真正的持久化,但已經足夠應付我們的簡單工作隊列。如果你一定要保證持久化,你可以使用publisher confirms。--https://www.rabbitmq.com/confirms.html
//推送一條消息到隊列中

公平調度(多勞多得)

在創建完通道之后加上這么一行代碼(生產者和消費者都需要添加) 這樣是告訴RabbitMQ,再同一時刻,不要發送超過1條消息給一個工作者(worker),直到它已經處理了上一條消息並且作出了響應。這樣,RabbitMQ就會把消息分發給下一個空閑的工作者(worker)。

$channel->basic_qos(null, 1, null); //rabbitMq 削峰的表現

可以看到 100條消息 我消費者a消費了99條,有些沒截圖到 消費者b只消費了一條


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM