四.RabbitMQ之發布/訂閱(Publish/Subscribe)


  一.基礎知識點

  在上述章節中,我們理解的RabbitMQ是基於如下這種模式運作的。

  

  而事實上,這只是我們簡單化了的模型的結果,真正的模型應該是這樣的。

  

  P:Producer 生產者,生產消息,把它放進交換機

  X:Exchange 交換機,可以理解為存在於生產者和隊列之間的一個橋梁。或者你可以將它理解為隊列的一個父級,或者更形象的,你就把它理解為像局域網中的交換機,把隊列理解為主機,它有direct, topic, headers 和fanout這幾種類型,后面會做介紹。

  orange這些叫做binding,它是Exchange與Queue之間的紐帶。對某些類型的Exchange它是無效的(比如fanout),我的理解它其實是將隊列進一步的分類。如上圖所示,orange被分為一類,而black和green被分為了另外一類。

  Q:Queue 隊列,如果你想要在P和C之間共享指定隊列的消息,你就得給隊列指定顯式的名稱。它是直接與C(consumer)連接的。

  C:消費者,從隊列中訂閱消息,並且處理。

  二.fanout類別的Exchange

  下面通過一個日志消息的demo來介紹Exchange的用法。

  記得發布消息的方法嗎?channel.basicPublish("", "hello", null, message.getBytes());第一個參數其實就是Exchange,只是當時我們並未注意它,它其實是一個無名字的默認的Exchange。既然是無名字,就無法通過它來區分Queue。所以在那些例子里,我們給Queue指定了名字以供消費者來尋找。(可以用數據庫查詢的思路去理解,比如可將Exchange理解為省份,Binding理解為城市,queue理解為地區。我們可以從指定省份,指定城市的集合里面去查詢地區,相當於給Exchange命名,指定Binding的值,當然也可以直接指定一個確定的地區,指定queue的名字)。

  現在我們要做的日志消息的demo要求如下:

  1.我們希望接收所有的消息。

  2.我們對消息的一致性要求不高,只需要接收那些新近的消息,如果接收不到,就丟棄這個消息。

  所以我們需要使用fanout類型的Exchange,fanout類型的Exchange的特點是,它廣播所有的消息給與它關聯的所有隊列,不加任何區分。所以這時候指定binding也沒什么意義。當然,queue也無需指定名稱。

  鋪墊已久,現在來着手編寫生產者類,如下。

  

package com.xdx.learn;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import net.sf.json.JSONObject;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLog {
    private final static String EXCHANGE_NAME="logs";//交換機名稱為log

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("192.168.1.195");//服務器ip
        factory.setPort(5672);//端口
        factory.setUsername("xdx");//登錄名
        factory.setPassword("xxxxx");//密碼
        Connection connection=factory.newConnection();//建立連接
        Channel channel=connection.createChannel();//建立頻道
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//在頻道里聲明一個交換機,類型定位fanout
        System.out.println(channel+"發布100條日志消息");
        for(int i=0;i<100;i++){
            JSONObject jsonObjet=new JSONObject();
            jsonObjet.put("msgType", "log");//該消息是針對發送驗證郵件的。
            jsonObjet.put("content", "日志消息"+i);
            String message=jsonObjet.toString();
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());//發布消息,發布到EXCHANGE_NAME,此時它會到哪個queue里面是不確定的
            System.out.println(jsonObjet.get("content"));
        }
        channel.close();
        connection.close();
    }
}

  我們運行這個生產者類,由於此時還沒有queue,所以到RabbitMQ的控制台去查看,此時並沒有queue,但是已經有了一個EXCHANGE。如圖所示。

  

   接下來編寫消費者的類,如下所示。

  

package com.xdx.learn;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class ReceiveLogs {
    private final static String EXCHANGE_NAME = "logs";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 下面的配置與生產者相對應
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.1.195");// 服務器ip
        factory.setPort(5672);// 端口
        factory.setUsername("xdx");// 登錄名
        factory.setPassword("xxxxx");// 密碼
        Connection connection = factory.newConnection();// 連接
        final Channel channel = connection.createChannel();// 頻道
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        final String queueName = channel.queueDeclare().getQueue();// 生成一個獨立的,非持久的,自動刪除的queue
        channel.queueBind(queueName, EXCHANGE_NAME, "");// 綁定queue和exchange。這樣隊列中就有通過EXCHANGE_NAME發布的消息。
        System.out.println(" messages from channel:"+ channel+",queue:"+ queueName
                + ". To exit press CTRL+C");
        // defaultConsumer實現了Consumer,我們將使用它來緩存生產者發送過來儲存在隊列中的消息。當我們可以接收消息的時候,從中獲取,可理解為被一個一直運行着的線程調用。
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                    AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("channel:"+channel+",queue:"+queueName+",consumer:"+this.getConsumerTag()+"  Received '" + message + "'");
//                channel.basicAck(envelope.getDeliveryTag(),false);
                try {
                    Thread.sleep(300);
                } catch (Exception e) {
                }
            }
        };
        channel.basicConsume(queueName, true, consumer);//自動回復,消息發出后隊列自動消除
    }

}

  消費者的channel一樣聲明相同的EXCHANGE,然后final String queueName = channel.queueDeclare().getQueue();這句話生成一個獨立的,非持久的,自動刪除的queue。

  接下來運行消費者程序,控制台打印出:

   messages from channel:AMQChannel(amqp://xdx@192.168.1.195:5672/,1),queue:amq.gen-ZUNKY5IQG3GQG2Y8lZOjUA. To exit press CTRL+C
  可見已經生成了一個名為amq.gen-ZUNKY5IQG3GQG2Y8lZOjUA的queue,事實上,去RabbitMQ后台查看,確實看到了一個queue.他的特點是AD(auto delete)和EXCL(exclusive)。

  auto delete:當他沒用的時候,服務就會刪除掉它。

  exclusive:獨占,說明這個queue僅僅被這個connection獨占。

  但是並未處理之前我們生產者程序發出來的消息啊。這是為什么呢?

  這是因為當我們的生產者發出消息的時候,那時我們的消費者程序還未運行,所以還未建立queue隊列,那些消息自然無處容身,所以它們被丟棄了。

  下面我們在保持生產者程序運行的情況下,也就是有了queue的情況下,再次運行生產者程序,再發送100條信息。然后我們就可以看到消費者程序開始在處理消息了。

 

  接下來,我們在啟動另外一個消費者,即把剛才這個生產者程序在運行一個。在eclipse上你可以看到有兩個程序正在運行。

  

  同時,在RabbitMQ的控制台,看到又增多了一個queue.

  

    接着,我們再次運行生產者,按照預期,應該是兩個消費者程序同時接受並處理生產者發送過來的消息。

  果然,運行結果截圖如下。

  消費者1:

  消費者2:

  我們發現消費者1和消費者2都接受到了生產者發出來的消息,並且進行處理了。

  三.總結

  1.生產者先創建一個connection,然后通過該connection創建一個channel,再在該channel中創建一個Exchange。生產者不再指定的queue名稱,而只是將消息廣播到特定routingKey的Exchange中。通過channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());這個方法。

  2.消費者也創建一個新的connection,然后通過該connection創建一個channel,再在該channel中聲明一個Exchange與生產者那邊對應(經測試,如果在生產者中已聲明,且先運行了,則這個Exchange已經存在,這一步可以省去。但一般情況下不省略,因為有時候我們可能先運行的消費者程序,就有可能找不到,也就是說生產者和消費者如果都聲明了同一個Exchange,則只生成了一個這樣的Exchange),再從channel中拿出一個臨時的queue,通過queueBind方法與生產者那邊所定義的Exchange和RouteKey發生關聯。關聯之后,消費者就可以從queue中取到所有廣播到了滿足該Exchange&&RouteKey下的消息。

  3.生產者程序只負責廣播,不負責把消息送到指定的queue中,這部分工作是消費者程序來做的。一旦沒有消費者來接收消息,這些消息就被丟棄了。而消費者通過queueBind方法訂閱特定Exchange,特定routeKey中的消息並處理。我想這就是這章叫做廣播/訂閱的原因吧。

  4.細心的讀者可以看到當我們啟動兩個消費者程序的時候,這兩個消費者都接收了生產者的消息,且都是一模一樣的消息。這是因為,這兩個消費者從兩個queue中去取消息,而這兩個queue都關聯了相同的Exchange和RouteKey。所以接收到的是相同的消息,我們完全可以在這兩個消費者中編寫不同的處理代碼,比如消費者1就進行顯示消息的工作,而消費者2則進行存儲消息的動作。注意與前面章節的那種模式相區分,在前面章節的例子中,我們在生產者中指定了一個命名了的queue,所以以后不管擴展了幾個新的消費者,他們都是從同一個queue中去取消息,所以不會取得重復的消息。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM