一.基礎知識點
在上述章節中,我們理解的RabbitMQ是基於如下這種模式運作的。
而事實上,這只是我們簡單化了的模型的結果,真正的模型應該是這樣的。
P:Producer 生產者,生產消息,把它放進交換機
X:Exchange 交換機,可以理解為存在於生產者和隊列之間的一個橋梁。或者你可以將它理解為隊列的一個父級,或者更形象的,你就把它理解為像局域網中的交換機,把隊列理解為主機,它有direct, topic, headers 和fanout這幾種類型,后面會做介紹。
orange這些叫做binding,它是Exchange與Queue之間的紐帶。對某些類型的Exchange它是無效的(比如fanout),我的理解它其實是將隊列進一步的分類。如上圖所示,orange被分為一類,而black和green被分為了另外一類。
Q:Queue 隊列,如果你想要在P和C之間共享指定隊列的消息,你就得給隊列指定顯式的名稱。它是直接與C(consumer)連接的。
C:消費者,從隊列中訂閱消息,並且處理。
二.fanout類別的Exchange
下面通過一個日志消息的demo來介紹Exchange的用法。
記得發布消息的方法嗎?channel.basicPublish("", "hello", null, message.getBytes());第一個參數其實就是Exchange,只是當時我們並未注意它,它其實是一個無名字的默認的Exchange。既然是無名字,就無法通過它來區分Queue。所以在那些例子里,我們給Queue指定了名字以供消費者來尋找。(可以用數據庫查詢的思路去理解,比如可將Exchange理解為省份,Binding理解為城市,queue理解為地區。我們可以從指定省份,指定城市的集合里面去查詢地區,相當於給Exchange命名,指定Binding的值,當然也可以直接指定一個確定的地區,指定queue的名字)。
現在我們要做的日志消息的demo要求如下:
1.我們希望接收所有的消息。
2.我們對消息的一致性要求不高,只需要接收那些新近的消息,如果接收不到,就丟棄這個消息。
所以我們需要使用fanout類型的Exchange,fanout類型的Exchange的特點是,它廣播所有的消息給與它關聯的所有隊列,不加任何區分。所以這時候指定binding也沒什么意義。當然,queue也無需指定名稱。
鋪墊已久,現在來着手編寫生產者類,如下。
package com.xdx.learn; import java.io.IOException; import java.util.concurrent.TimeoutException; import net.sf.json.JSONObject; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLog { private final static String EXCHANGE_NAME="logs";//交換機名稱為log public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory=new ConnectionFactory(); factory.setHost("192.168.1.195");//服務器ip factory.setPort(5672);//端口 factory.setUsername("xdx");//登錄名 factory.setPassword("xxxxx");//密碼 Connection connection=factory.newConnection();//建立連接 Channel channel=connection.createChannel();//建立頻道 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);//在頻道里聲明一個交換機,類型定位fanout System.out.println(channel+"發布100條日志消息"); for(int i=0;i<100;i++){ JSONObject jsonObjet=new JSONObject(); jsonObjet.put("msgType", "log");//該消息是針對發送驗證郵件的。 jsonObjet.put("content", "日志消息"+i); String message=jsonObjet.toString(); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());//發布消息,發布到EXCHANGE_NAME,此時它會到哪個queue里面是不確定的 System.out.println(jsonObjet.get("content")); } channel.close(); connection.close(); } }
我們運行這個生產者類,由於此時還沒有queue,所以到RabbitMQ的控制台去查看,此時並沒有queue,但是已經有了一個EXCHANGE。如圖所示。
接下來編寫消費者的類,如下所示。
package com.xdx.learn; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class ReceiveLogs { private final static String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, TimeoutException { // 下面的配置與生產者相對應 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.1.195");// 服務器ip factory.setPort(5672);// 端口 factory.setUsername("xdx");// 登錄名 factory.setPassword("xxxxx");// 密碼 Connection connection = factory.newConnection();// 連接 final Channel channel = connection.createChannel();// 頻道 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); final String queueName = channel.queueDeclare().getQueue();// 生成一個獨立的,非持久的,自動刪除的queue channel.queueBind(queueName, EXCHANGE_NAME, "");// 綁定queue和exchange。這樣隊列中就有通過EXCHANGE_NAME發布的消息。 System.out.println(" messages from channel:"+ channel+",queue:"+ queueName + ". To exit press CTRL+C"); // defaultConsumer實現了Consumer,我們將使用它來緩存生產者發送過來儲存在隊列中的消息。當我們可以接收消息的時候,從中獲取,可理解為被一個一直運行着的線程調用。 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("channel:"+channel+",queue:"+queueName+",consumer:"+this.getConsumerTag()+" Received '" + message + "'"); // channel.basicAck(envelope.getDeliveryTag(),false); try { Thread.sleep(300); } catch (Exception e) { } } }; channel.basicConsume(queueName, true, consumer);//自動回復,消息發出后隊列自動消除 } }
消費者的channel一樣聲明相同的EXCHANGE,然后final String queueName = channel.queueDeclare().getQueue();這句話生成一個獨立的,非持久的,自動刪除的queue。
接下來運行消費者程序,控制台打印出:
messages from channel:AMQChannel(amqp://xdx@192.168.1.195:5672/,1),queue:amq.gen-ZUNKY5IQG3GQG2Y8lZOjUA. To exit press CTRL+C
可見已經生成了一個名為amq.gen-ZUNKY5IQG3GQG2Y8lZOjUA的queue,事實上,去RabbitMQ后台查看,確實看到了一個queue.他的特點是AD(auto delete)和EXCL(exclusive)。
auto delete:當他沒用的時候,服務就會刪除掉它。
exclusive:獨占,說明這個queue僅僅被這個connection獨占。
但是並未處理之前我們生產者程序發出來的消息啊。這是為什么呢?
這是因為當我們的生產者發出消息的時候,那時我們的消費者程序還未運行,所以還未建立queue隊列,那些消息自然無處容身,所以它們被丟棄了。
下面我們在保持生產者程序運行的情況下,也就是有了queue的情況下,再次運行生產者程序,再發送100條信息。然后我們就可以看到消費者程序開始在處理消息了。
接下來,我們在啟動另外一個消費者,即把剛才這個生產者程序在運行一個。在eclipse上你可以看到有兩個程序正在運行。
同時,在RabbitMQ的控制台,看到又增多了一個queue.
接着,我們再次運行生產者,按照預期,應該是兩個消費者程序同時接受並處理生產者發送過來的消息。
果然,運行結果截圖如下。
消費者1:
消費者2:
我們發現消費者1和消費者2都接受到了生產者發出來的消息,並且進行處理了。
三.總結
1.生產者先創建一個connection,然后通過該connection創建一個channel,再在該channel中創建一個Exchange。生產者不再指定的queue名稱,而只是將消息廣播到特定routingKey的Exchange中。通過channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());這個方法。
2.消費者也創建一個新的connection,然后通過該connection創建一個channel,再在該channel中聲明一個Exchange與生產者那邊對應(經測試,如果在生產者中已聲明,且先運行了,則這個Exchange已經存在,這一步可以省去。但一般情況下不省略,因為有時候我們可能先運行的消費者程序,就有可能找不到,也就是說生產者和消費者如果都聲明了同一個Exchange,則只生成了一個這樣的Exchange),再從channel中拿出一個臨時的queue,通過queueBind方法與生產者那邊所定義的Exchange和RouteKey發生關聯。關聯之后,消費者就可以從queue中取到所有廣播到了滿足該Exchange&&RouteKey下的消息。
3.生產者程序只負責廣播,不負責把消息送到指定的queue中,這部分工作是消費者程序來做的。一旦沒有消費者來接收消息,這些消息就被丟棄了。而消費者通過queueBind方法訂閱特定Exchange,特定routeKey中的消息並處理。我想這就是這章叫做廣播/訂閱的原因吧。
4.細心的讀者可以看到當我們啟動兩個消費者程序的時候,這兩個消費者都接收了生產者的消息,且都是一模一樣的消息。這是因為,這兩個消費者從兩個queue中去取消息,而這兩個queue都關聯了相同的Exchange和RouteKey。所以接收到的是相同的消息,我們完全可以在這兩個消費者中編寫不同的處理代碼,比如消費者1就進行顯示消息的工作,而消費者2則進行存儲消息的動作。注意與前面章節的那種模式相區分,在前面章節的例子中,我們在生產者中指定了一個命名了的queue,所以以后不管擴展了幾個新的消費者,他們都是從同一個queue中去取消息,所以不會取得重復的消息。