之前的學習了把消息直接publish到queue里面,然后consume掉,
真實的情況,我們會把消息先發送到exchange里面,由它來處理,是發給某一個隊列,還是發給某些隊列,還是丟棄掉?
exchange類型: direct,topic,headers,fanout
下面以fanout為例子(把收到的消息,全部發給所有的隊列)
如何查看服務器上面的所有的exchanges?
sudo rabbitmqctl list_exchanges
如何查看服務器上面的所有的binding關系?
sudo rabbitmqctl list_bindings
前面幾章消息發送是把$msg消息通過默認(如果第二個參數為‘ ’)的exchange發送給hello隊列
$channel->basic_publish($msg, '', 'hello');
為了實現一對所有發送同一個消息
第一步:修改發送消息部分,這里basic-publish沒有傳遞第三個參數,因為這里的消息是要發送給每個隊列?
第二步:保證每次連接rabbit 都會新產生一個全新的 消息為空的隊列,我們可以通過服務器自動生成no-durable queue 同時名字是隨機的類似:amq.gen-JzTY20BRgKO-HjmUJj0wLg.
這樣就能保證每次consumer斷開和服務器的連接以后,隊列自動釋放掉,因為他是exclusive
第三步:上面創建了一個fanout類型的exchange 和 一個任意名字的隊列queue, exchange和queue之間的關系叫做binding(綁定?)
經過上面這步: logs這個exchange就會把消息綁定到$queue_name這個隊列,(但是$queue_name是隨機生成的名字)
consumer 代碼:

public function worker() { set_time_limit(0); $connection = new AMQPStreamConnection('localhost', 5672, 'bitch', 'bitch'); $channel = $connection->channel(); #申明一個exchange名字叫logs,類型是fanout $channel->exchange_declare('logs','fanout',false,false,false); #申明一個由服務器自動命名的隊列,這個隊列會在連接結束以后 自動斷掉 list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); #把隨機命名的隊列綁定到絎棉新建的exchange $channel->queue_bind($queue_name, 'logs'); #下面第四個參數如果為false表示開啟確認模式,也就是消費以后會告知rabbitmq服務器該條消息已經處理完畢,這樣可以方式消息處理一半掛掉了,結果服務器也刪除了這條未處理完畢的消息 $receiver = new self(); $channel->basic_consume($queue_name, '', false, true, false, false, [$receiver, 'callFunc']); while(true) { $channel->wait(); } $channel->close(); $connection->close(); }
publisher代碼:

public function task() { $connection = new AMQPStreamConnection('localhost', 5672, 'bitch', 'bitch'); $channel = $connection->channel(); #申明一個exchange名字叫logs,類型是fanout $channel->exchange_declare('logs','fanout',false,false,false); $msg = new AMQPMessage('123'); #第二個參數是表示走什么exchange 第三個參數表示走什么隊列 $channel->basic_publish($msg, 'logs'); echo "第".'1'.'發送完畢'; $channel->close(); $connection->close(); }
實驗過程描述: .
1 ,task 方法去publish 一條消息“123”,
2,啟動3個worker方法,這樣系統自動生成3個不一樣的隨機名字的隊列,然后去接受logs的exchange發送來的消息
這樣就實現了 一個服務器發送消息給所有的consumer
注意: 如果worker沒有先啟動就發送了消息,在一對所有(publish/subscrible的模式下), 如果發消息后打開worker就無法收到消息
---恢復內容結束---