//生產者
<?php namespace app\rabbit; //require_once __DIR__ . '/autoload.php'; //因為我自己使用的是tp框架 所以我在這里不需要再加載這個類了 use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; class Send { public function connect($num) { //連接rabbit $connection = new AMQPStreamConnection('***.**.***.***', 5672, 'mq用戶名', 'mq密碼'); //創建一個通道 $channel = $connection->channel(); //創建隊列 $channel->queue_declare('hello', false, false, false, false); // 1.隊列名 2.未知3.隊列是否持久化 false:隊列在內存中,服務器掛了 隊列就沒了 true:服務器掛了,隊列重新生成 注意:只是隊列持久化,不是隊列仲的消息持久化!!! // 4.//隊列是否專屬,專屬的范圍針對的是連接,也就是說,一個連接下面的多個信道是可見的.對於其他連接是不可見的.連接斷開后,該隊列會被刪除.注意,不是信道斷開,是連接斷開.並且,就算設置成了持久化,也會刪除. // 5.//如果所有消費者都斷開連接了,是否自動刪除.如果還沒有消費者從該隊列獲取過消息或者監聽該隊列,那么該隊列不會刪除.只有在有消費者從該隊列獲取過消息后,該隊列才有可能自動刪除(當所有消費者都斷開連接,不管消息是否獲取完) //創建一條信息 $msg = new AMQPMessage('如果我是沙兄+++'.$num); //推送一條消息到隊列中 $res = $channel->basic_publish($msg, '', 'hello'); //通道關閉 $channel->close(); //連接關閉 $connection->close(); } }
//消費者 要加多個消費者 創建多個這樣的文件運行就行 rabbitMq默認是公平分配的方式 --輪詢(平均分配) 就是說你有100條消息 消費者a已經消費完單數的50條 消費者b才消費完雙數的10條 那也還有40條等着消費者b去消費,不會派給消費者a的
<?php require_once __DIR__ . '/autoload.php'; //因為消費者一般來說是在命令行執行 所以要引入這個類 use PhpAmqpLib\Connection\AMQPStreamConnection; //創建一個連接 $connection = new AMQPStreamConnection('***.***.***.***', 5672, 'mq用戶名', 'mq密碼'); //創建一個通道 $channel = $connection->channel(); //聲明一個隊列是冪等的,它僅僅在不存在的情況下被創建 $channel->queue_declare('hello', false, false, false, false); //拉取隊列中的消息 $res = $channel->basic_consume('hello', '', false, false, false, false, function ($msg) { // file_put_contents('get.text', '這是消息2'.$msg->body."\n", FILE_APPEND); echo '說:'.$msg->body."\n"; sleep(3); try { var_dump($msg); //這里是上面第四個消息回復設置為false 之后需要手動確認 不然的話消息不會刪掉 $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }catch(Exception $e) { return false; } }); /** * 1.$queue 隊列名 * 2.$consumer_tag 消費者名稱,自定義,可以為空 * 3.$no_local 功能屬於AMQP協議的標准,但是rabbitMQ並沒有做實現。 * 4.$no_ack 自動應答,當自動應答等於true的時候,表示當消費者一收到消息就表示消費者收到了消息,消息就會立即從隊列中刪除。一般設置為false,由消費者手動進行ack。 * 5.$exclusive 是否排外(排外:queue只被一個消費者使用並且在消費者斷開連接時queue被刪除),一般默認為false。 * 6.$nowait 當nowait為true時,不要等待服務器確認請求就立即開始消費消息。如果不能消費,有可能引發通道異常並關閉通道。一般默認設置為false。 * 7.$callback 回調函數,拿到消息你的業務邏輯就寫在里面。 */ while(count($channel->callbacks)) { $channel->wait(); }
//隊列持久化和消息持久化和公平調度
如果你沒有特意告訴RabbitMQ,那么在它退出或者崩潰的時候,將會丟失所有隊列和消息。為了確保信息不會丟失,有兩個事情是需要注意的:我們必須把“隊列”和“消息”設為持久化。
首先,為了不讓隊列消失,需要把隊列聲明為持久化(durable)。為此我們通過queue_declare的第三參數為true: (這里生產者和消費者都必須改,上面我們已經聲明了一個hello的隊列是不持久化的,所以這里要重新聲明一個新的隊列,不然會報錯)
$channel->queue_declare('shaxiong', false, true, false, false);
這時候,我們就可以確保在RabbitMq重啟之后queue_declare隊列不會丟失。另外,我們需要把我們的消息也要設為持久化——設置delivery_mode = 2。(在生產者創建消息的時候設置)
$msg = new AMQPMessage($data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) );
//注意:消息持久化
//將消息設為持久化並不能完全保證不會丟失。以上代碼只是告訴了RabbitMq要把消息存到硬盤,但從RabbitMq收到消息到保存之間還是有一個很小的間隔時間。因為RabbitMq並不是所有的消息都使用fsync(2)
//——它有可能只是保存到緩存中,並不一定會寫到硬盤中。並不能保證真正的持久化,但已經足夠應付我們的簡單工作隊列。如果你一定要保證持久化,你可以使用publisher confirms。--https://www.rabbitmq.com/confirms.html
//推送一條消息到隊列中
公平調度(多勞多得)
在創建完通道之后加上這么一行代碼(生產者和消費者都需要添加) 這樣是告訴RabbitMQ,再同一時刻,不要發送超過1條消息給一個工作者(worker),直到它已經處理了上一條消息並且作出了響應。這樣,RabbitMQ就會把消息分發給下一個空閑的工作者(worker)。
$channel->basic_qos(null, 1, null); //rabbitMq 削峰的表現
可以看到 100條消息 我消費者a消費了99條,有些沒截圖到 消費者b只消費了一條