在前面的兩篇博客中
遇到的實例都是一個消息只發送給一個消費者(工作者),他們的消息模型分別為(P代表生產者,C代表消費者,紅色代表隊列):
這次我們來看下將一個消息發送給多個消費者(工作者),這種模式一般被稱為“發布/訂閱”模式。其工作模型為(P代表生產者,X代表Exchange(路由器/交換機),C代表消費者,紅色代表隊列):
我們發現,工作模型中首次出現路由器,並且每個消費者有單獨的隊列。生產者生成消息后將其發送給路由器,然后路由器轉送到隊列,消費者各自到自己的隊列里面獲取消息進行消費。在實際的應用場景中,生產者一般不會直接將消息發送給隊列,而是發送給路由器進行中轉,Exchange必須清楚的知道怎么處理收到的消息:是將消息發送到一個特定隊列還是多有隊列,或者直接廢棄消息。這種才符合RabbitMQ消息模型的核心思想。
接下來我們詳細展開今天的話題:
一、Exchange
Exchange在我們的工作模型中首次出現,因此需要詳細介紹下。
Exchange分為4種類型:
Direct:完全根據key進行投遞的,例如,綁定時設置了routing key為”abc”,那么客戶端提交的消息,只有設置了key為”abc”的才會投遞到隊列。 Topic:對key進行模式匹配后進行投遞,符號”#”匹配一個或多個詞,符號”*”匹配正好一個詞。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。 Fanout:不需要key,它采取廣播模式,一個消息進來時,投遞到與該交換機綁定的所有隊列。 Headers:我們可以不考慮它。
今天我們的實例采用fanout類型的exchange。
盡管首次出現,但是其實我們前面的案例中也有用到exchange,只是我們沒有給他名字,用的是RabbitMQ默認的,比如下面這段代碼,我們將路由器名這個參數傳入了“”,如果我們需要自己聲明exchange的話,這個就不能傳入“”了,而是傳入自己定義好的值。
二、臨時隊列
前面兩篇博客中,我們都在使用隊列的時候給出了定義好的名字,這在生產者和消費者共用相同隊列的時候很有必要,但是我們有了exchange,生產者不需要知道有哪些隊列,因此隊列名字可以不用指定了,而是通過RabbitMQ 接口自己去生成臨時隊列,隊列名字也由RabbitMQ自動生成。通過
可以聲明一個非持久的、通道獨占的、自動刪除的隊列,getQueue()方法可以獲取隨機隊列名字。這個名字用來在隊列和exchange之間建立binding關系的時候使用:
三、代碼實現
基於上面exchange和臨時隊列的知識鋪墊,可以展開今天的代碼實現了。
- 生產者
public class Product { //exchange名字 public static String EXCHANGE_NAME = "exchange"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; Channel channel = null; try { // 1.創建連接和通道 connection = factory.newConnection(); channel = connection.createChannel(); // 2.為通道聲明exchange和exchange的類型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String msg = " hello rabbitmq, this is publish/subscribe mode"; // 3.發送消息到指定的exchange,隊列指定為空,由exchange根據情況判斷需要發送到哪些隊列 channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes()); System.out.println("product send a msg: " + msg); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { // 4.關閉連接 if (channel != null) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
- 消費者1
public class Consumer1 { public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; Channel channel = null; try { // 1.創建連接和通道 connection = factory.newConnection(); channel = connection.createChannel(); // 2.為通道聲明exchange以及exchange類型 channel.exchangeDeclare(Product.EXCHANGE_NAME, BuiltinExchangeType.FANOUT); // 3.創建隨機名字的隊列 String queueName = channel.queueDeclare().getQueue(); // 4.建立exchange和隊列的綁定關系 channel.queueBind(queueName, Product.EXCHANGE_NAME, ""); System.out.println(" **** Consumer1 keep alive ,waiting for messages, and then deal them"); // 5.通過回調生成消費者並進行監聽 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException { // 獲取消息內容然后處理 String msg = new String(body, "UTF-8"); System.out.println("*********** Consumer1" + " get message :[" + msg + "]"); } }; // 6.消費消息 channel.basicConsume(queueName, true, consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
- 消費者2,核心代碼同消費者1一樣,只是在日志打印上將"Consumer1"改為"Consumer2"而已。這里不再列出具體代碼。
- 先運行消費者1和2,然后運行生產者,觀察控制台log打印情況:
生產者: product send a msg: hello rabbitmq, this is publish/subscribe mode 消費者1: **** Consumer1 keep alive ,waiting for messages, and then deal them *********** Consumer1 get message :[ hello rabbitmq, this is publish/subscribe mode] 消費者2: **** Consumer2 keep alive ,waiting for messages, and then deal them *********** Consumer2 get message :[ hello rabbitmq, this is publish/subscribe mode]
可以看到,當生產者發出消息后,兩個消費者最終都收到了消息。
- 我們去查看RabbitMQ管理頁面:
在Exchanges 標簽頁里面多了一個名為“exchange”的路由器,他的類型是fanout。點exchange 的link進入詳細頁面:
發現在binding項目中有了兩條綁定關系,隊列的名字也可以看到。將頁面切換到Queues標簽頁:
出現了兩個新的隊列,隊列名字和綁定關系中的一樣,並且隊列都是自動刪除的、通道獨占的。
- 然后將消費者1和消費者2都停掉,重新查看管理頁面,我們發現exchange還在,binding關系不存在了,臨時隊列也自動刪除了