rabbitmq實現一台服務器同時給所有的consumer發送消息(tp框架)(第四篇)


 

之前的學習了把消息直接publish到queue里面,然后consume掉,

真實的情況,我們會把消息先發送到exchange里面,由它來處理,是發給某一個隊列,還是發給某些隊列,還是丟棄掉?

exchange類型: direct,topic,headers,fanout

下面以fanout為例子(把收到的消息,全部發給所有的隊列)

 

如何查看服務器上面的所有的exchanges?

 sudo rabbitmqctl list_exchanges

如何查看服務器上面的所有的binding關系?

 sudo rabbitmqctl list_bindings

 

前面幾章消息發送是把$msg消息通過默認(如果第二個參數為‘ ’)的exchange發送給hello隊列

$channel->basic_publish($msg, '', 'hello');

 為了實現一對所有發送同一個消息

第一步:修改發送消息部分,這里basic-publish沒有傳遞第三個參數,因為這里的消息是要發送給每個隊列?

第二步:保證每次連接rabbit 都會新產生一個全新的 消息為空的隊列,我們可以通過服務器自動生成no-durable queue 同時名字是隨機的類似:amq.gen-JzTY20BRgKO-HjmUJj0wLg.

    這樣就能保證每次consumer斷開和服務器的連接以后,隊列自動釋放掉,因為他是exclusive

    

 

第三步:上面創建了一個fanout類型的exchange  和  一個任意名字的隊列queue, exchange和queue之間的關系叫做binding(綁定?)

     

    經過上面這步: logs這個exchange就會把消息綁定到$queue_name這個隊列,(但是$queue_name是隨機生成的名字)

    

consumer 代碼:

public function worker()
    {
        set_time_limit(0);
        $connection = new AMQPStreamConnection('localhost', 5672, 'bitch', 'bitch');
        $channel = $connection->channel();

        #申明一個exchange名字叫logs,類型是fanout
        $channel->exchange_declare('logs','fanout',false,false,false);

        #申明一個由服務器自動命名的隊列,這個隊列會在連接結束以后 自動斷掉
        list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

        #把隨機命名的隊列綁定到絎棉新建的exchange
        $channel->queue_bind($queue_name, 'logs');

        #下面第四個參數如果為false表示開啟確認模式,也就是消費以后會告知rabbitmq服務器該條消息已經處理完畢,這樣可以方式消息處理一半掛掉了,結果服務器也刪除了這條未處理完畢的消息
        $receiver = new self();
        $channel->basic_consume($queue_name, '', false, true, false, false, [$receiver, 'callFunc']);

        while(true) {
            $channel->wait();
        }
        $channel->close();
        $connection->close();
    }
View Code

 

publisher代碼:

public function task()
    {
        $connection = new AMQPStreamConnection('localhost', 5672, 'bitch', 'bitch');
        $channel = $connection->channel();

        #申明一個exchange名字叫logs,類型是fanout
        $channel->exchange_declare('logs','fanout',false,false,false);

            $msg = new AMQPMessage('123');

            #第二個參數是表示走什么exchange  第三個參數表示走什么隊列
            $channel->basic_publish($msg, 'logs');
            echo "第".'1'.'發送完畢';

        $channel->close();
        $connection->close();
    }
View Code

 

實驗過程描述: .

1 ,task 方法去publish 一條消息“123”,

2,啟動3個worker方法,這樣系統自動生成3個不一樣的隨機名字的隊列,然后去接受logs的exchange發送來的消息

 

這樣就實現了 一個服務器發送消息給所有的consumer

 注意: 如果worker沒有先啟動就發送了消息,在一對所有(publish/subscrible的模式下), 如果發消息后打開worker就無法收到消息

 

 

 

 

 
        

---恢復內容結束---


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM