1、簡單隊列
其實上篇文章末尾給出的代碼就是簡單隊列。
一個生產者對應一個消費者!!!
生產者將消息發送到“hello”隊列。消費者從該隊列接收消息。
①、pom文件
必須導入rabbitmq 依賴包
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.4.1</version> </dependency>
②、工具類

package com.ys.utils; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * Create by hadoop */ public class ConnectionUtil { public static Connection getConnection(String host,int port,String vHost,String userName,String passWord) throws Exception{ //1、定義連接工廠 ConnectionFactory factory = new ConnectionFactory(); //2、設置服務器地址 factory.setHost(host); //3、設置端口 factory.setPort(port); //4、設置虛擬主機、用戶名、密碼 factory.setVirtualHost(vHost); factory.setUsername(userName); factory.setPassword(passWord); //5、通過連接工廠獲取連接 Connection connection = factory.newConnection(); return connection; } }
③、生產者 Producer

package com.ys.simple; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.ys.utils.ConnectionUtil; /** * Create by YSOcean */ public class Producer { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception{ //1、獲取連接 Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest"); //2、聲明信道 Channel channel = connection.createChannel(); //3、聲明(創建)隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //4、定義消息內容 String message = "hello rabbitmq "; //5、發布消息 channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("[x] Sent'"+message+"'"); //6、關閉通道 channel.close(); //7、關閉連接 connection.close(); } }
④、消費者Consumer

package com.ys.simple; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.ys.utils.ConnectionUtil; /** * Create by YSOcean */ public class Consumer { private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception{ //1、獲取連接 Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest"); //2、聲明通道 Channel channel = connection.createChannel(); //3、聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //4、定義隊列的消費者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //5、監聽隊列 /* true:表示自動確認,只要消息從隊列中獲取,無論消費者獲取到消息后是否成功消費,都會認為消息已經成功消費 false:表示手動確認,消費者獲取消息后,服務器會將該消息標記為不可用狀態,等待消費者的反饋, 如果消費者一直沒有反饋,那么該消息將一直處於不可用狀態,並且服務器會認為該消費者已經掛掉,不會再給其 發送消息,直到該消費者反饋。 */ channel.basicConsume(QUEUE_NAME,true,queueingConsumer); //6、獲取消息 while (true){ QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } }
注意這里消費者有自動確認消息和手動確認消息兩種模式。
2、work 模式
一個生產者對應多個消費者,但是只能有一個消費者獲得消息!!!
競爭消費者模式。
①、生產者

package com.ys.workqueue; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.ys.utils.ConnectionUtil; /** * Create by YSOcean */ public class Producer { private final static String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception{ //1、獲取連接 Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest"); //2、聲明信道 Channel channel = connection.createChannel(); //3、聲明(創建)隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //4、定義消息內容(發布多條消息) for(int i = 0 ; i < 10 ; i++){ String message = "hello rabbitmq "+i; //5、發布消息 channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("[x] Sent'"+message+"'"); //模擬發送消息延時,便於演示多個消費者競爭接受消息 Thread.sleep(i*10); } //6、關閉通道 channel.close(); //7、關閉連接 connection.close(); } }
②、消費者
這里創建兩個消費者
消費者1:每接收一條消息后休眠10毫秒

package com.ys.workqueue; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.ys.utils.ConnectionUtil; /** * Create by YSOcean */ public class Consumer1 { private final static String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception{ //1、獲取連接 Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest"); //2、聲明通道 Channel channel = connection.createChannel(); //3、聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //同一時刻服務器只會發送一條消息給消費者 //channel.basicQos(1); //4、定義隊列的消費者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //5、監聽隊列,手動返回完成狀態 channel.basicConsume(QUEUE_NAME,false,queueingConsumer); //6、獲取消息 while (true){ QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); //消費者1接收一條消息后休眠10毫秒 Thread.sleep(10); //返回確認狀態 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); } } }
消費者2:每接收一條消息后休眠1000毫秒

package com.ys.workqueue; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.ys.utils.ConnectionUtil; /** * Create by YSOcean */ public class Consumer2 { private final static String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception{ //1、獲取連接 Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest"); //2、聲明通道 Channel channel = connection.createChannel(); //3、聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //同一時刻服務器只會發送一條消息給消費者 //channel.basicQos(1); //4、定義隊列的消費者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //5、監聽隊列,手動返回完成狀態 channel.basicConsume(QUEUE_NAME,false,queueingConsumer); //6、獲取消息 while (true){ QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); //消費者2接收一條消息后休眠1000毫秒 Thread.sleep(1000); //返回確認狀態 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); } } }
③、測試結果
首先生產者一次打印從0-9條消息
接着我們看消費者1:結果為打印偶數條消息
消費者2:結果為打印奇數條消息
④、分析結果
消費者1和消費者2獲取到的消息內容是不同的,也就是說同一個消息只能被一個消費者獲取。
消費者1和消費者2分別獲取奇數條消息和偶數條消息,兩種獲取消息的條數是一樣的。
前面我們說這種模式是競爭消費者模式,一條隊列被多個消費者監聽,這里兩個消費者,其中消費者1和消費者2在獲取消息后分別休眠了10毫秒和1000毫秒,也就是說兩個消費者獲取消息的效率是不一樣的,但是結果卻是兩者獲得的消息條數是一樣的,這根本就不構成競爭關系,那么我們應該怎么辦才能讓工作效率高的消費者獲取消息更多,也就是消費者1獲取消息更多呢?
PS:在增加一個消費者其實獲取消息條數也是一樣的,消費者1獲取0,3,6,9,消費者2獲取1,4,7,消費者3獲取2,5,8
⑤、能者多勞
channel.basicQos(1);
增加如上代碼,表示同一時刻服務器只會發送一條消息給消費者。消費者1和消費者2獲取消息結果如下:
⑥、應用場景
效率高的消費者消費消息多。可以用來進行負載均衡。
3、發布/訂閱模式
一個消費者將消息首先發送到交換器,交換器綁定到多個隊列,然后被監聽該隊列的消費者所接收並消費。
ps:X表示交換器,在RabbitMQ中,交換器主要有四種類型:direct、fanout、topic、headers,這里的交換器是 fanout。下面我們會詳細介紹這幾種交換器。
①、生產者

package com.ys.ps; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.ys.utils.ConnectionUtil; /** * Create by YSOcean */ public class Producer { private final static String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws Exception { //1、獲取連接 Connection connection = ConnectionUtil.getConnection("192.168.146.251", 5672, "/", "guest", "guest"); //2、聲明信道 Channel channel = connection.createChannel(); //3、聲明交換器 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //4、創建消息 String message = "hello rabbitmq"; //5、發布消息 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println("[x] Sent'" + message + "'"); //6、關閉通道 channel.close(); //7、關閉連接 connection.close(); } }
②、消費者
消費者1:

package com.ys.ps; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.ys.utils.ConnectionUtil; /** * Create by YSOcean */ public class Consumer1 { private final static String QUEUE_NAME = "fanout_queue_1"; private final static String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws Exception{ //1、獲取連接 Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest"); //2、聲明通道 Channel channel = connection.createChannel(); //3、聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //4、綁定隊列到交換機 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""); //同一時刻服務器只會發送一條消息給消費者 channel.basicQos(1); //5、定義隊列的消費者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //6、監聽隊列,手動返回完成狀態 channel.basicConsume(QUEUE_NAME,false,queueingConsumer); //6、獲取消息 while (true){ QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" 消費者1:" + message + "'"); //消費者1接收一條消息后休眠10毫秒 Thread.sleep(10); //返回確認狀態 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); } } }
消費者2:

package com.ys.ps; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.ys.utils.ConnectionUtil; /** * Create by YSOcean */ public class Consumer2 { private final static String QUEUE_NAME = "fanout_queue_2"; private final static String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws Exception{ //1、獲取連接 Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest"); //2、聲明通道 Channel channel = connection.createChannel(); //3、聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //4、綁定隊列到交換機 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""); //同一時刻服務器只會發送一條消息給消費者 channel.basicQos(1); //5、定義隊列的消費者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //6、監聽隊列,手動返回完成狀態 channel.basicConsume(QUEUE_NAME,false,queueingConsumer); //6、獲取消息 while (true){ QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" 消費者2:" + message + "'"); //消費者2接收一條消息后休眠10毫秒 Thread.sleep(1000); //返回確認狀態 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); } } }
注意:消費者1和消費者2兩者監聽的隊列名稱是不一樣的,我們可以通過前台管理系統看到:
③、測試結果
消費1和消費者2都消費了該消息。
ps:這是因為消費者1和消費者2都監聽了被同一個交換器綁定的隊列。如果消息發送到沒有隊列綁定的交換器時,消息將丟失,因為交換器沒有存儲消息的能力,消息只能存儲在隊列中。
④、應用場景
比如一個商城系統需要在管理員上傳商品新的圖片時,前台系統必須更新圖片,日志系統必須記錄相應的日志,那么就可以將兩個隊列綁定到圖片上傳交換器上,一個用於前台系統更新圖片,另一個用於日志系統記錄日志。
4、路由模式
生產者將消息發送到direct交換器,在綁定隊列和交換器的時候有一個路由key,生產者發送的消息會指定一個路由key,那么消息只會發送到相應key相同的隊列,接着監聽該隊列的消費者消費消息。
也就是讓消費者有選擇性的接收消息。
①、生產者

package com.ys.routing; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.ys.utils.ConnectionUtil; /** * Create by YSOcean */ public class Producer { private final static String EXCHANGE_NAME = "direct_exchange"; public static void main(String[] args) throws Exception { //1、獲取連接 Connection connection = ConnectionUtil.getConnection("192.168.146.251", 5672, "/", "guest", "guest"); //2、聲明信道 Channel channel = connection.createChannel(); //3、聲明交換器,類型為direct channel.exchangeDeclare(EXCHANGE_NAME, "direct"); //4、創建消息 String message = "hello rabbitmq"; //5、發布消息 channel.basicPublish(EXCHANGE_NAME, "update", null, message.getBytes()); System.out.println("生產者發送" + message + "'"); //6、關閉通道 channel.close(); //7、關閉連接 connection.close(); } }
②、消費者
消費者1:

package com.ys.routing; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.ys.utils.ConnectionUtil; /** * Create by YSOcean */ public class Consumer1 { private final static String QUEUE_NAME = "direct_queue_1"; private final static String EXCHANGE_NAME = "direct_exchange"; public static void main(String[] args) throws Exception{ //1、獲取連接 Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest"); //2、聲明通道 Channel channel = connection.createChannel(); //3、聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //4、綁定隊列到交換機,指定路由key為update channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"update"); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"delete"); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"add"); //同一時刻服務器只會發送一條消息給消費者 channel.basicQos(1); //5、定義隊列的消費者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //6、監聽隊列,手動返回完成狀態 channel.basicConsume(QUEUE_NAME,false,queueingConsumer); //6、獲取消息 while (true){ QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" 消費者1:" + message + "'"); //消費者1接收一條消息后休眠10毫秒 Thread.sleep(10); //返回確認狀態 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); } } }
消費者2:

package com.ys.routing; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.ys.utils.ConnectionUtil; /** * Create by YSOcean */ public class Consumer2 { private final static String QUEUE_NAME = "direct_queue_2"; private final static String EXCHANGE_NAME = "direct_exchange"; public static void main(String[] args) throws Exception{ //1、獲取連接 Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest"); //2、聲明通道 Channel channel = connection.createChannel(); //3、聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //4、綁定隊列到交換機,指定路由key為select channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"select"); //同一時刻服務器只會發送一條消息給消費者 channel.basicQos(1); //5、定義隊列的消費者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //6、監聽隊列,手動返回完成狀態 channel.basicConsume(QUEUE_NAME,false,queueingConsumer); //6、獲取消息 while (true){ QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" 消費者1:" + message + "'"); //消費者2接收一條消息后休眠10毫秒 Thread.sleep(1000); //返回確認狀態 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); } } }
③、測試結果
我們首先看代碼,生產者發布消息,指定的路由key為update。消費者1綁定隊列和交換機時key分別是update/delete/add;消費者2綁定隊列和交換器時key是select。
所以我們可以猜測生產者發送的消息,只有消費者1能夠接收並消費,而消費者2是不能接收的。
④、應用場景
利用消費者能夠有選擇性的接收消息的特性,比如我們商城系統的后台管理系統對於商品進行修改、刪除、新增操作都需要更新前台系統的界面展示,而查詢操作確不需要,那么這兩個隊列分開接收消息就比較好。
5、主題模式
上面的路由模式是根據路由key進行完整的匹配(完全相等才發送消息),這里的通配符模式通俗的來講就是模糊匹配。
符號“#”表示匹配一個或多個詞,符號“*”表示匹配一個詞。
①、生產者

package com.ys.topic; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.ys.utils.ConnectionUtil; /** * Create by YSOcean */ public class Producer { private final static String EXCHANGE_NAME = "topic_exchange"; public static void main(String[] args) throws Exception { //1、獲取連接 Connection connection = ConnectionUtil.getConnection("192.168.146.251", 5672, "/", "guest", "guest"); //2、聲明信道 Channel channel = connection.createChannel(); //3、聲明交換器,類型為direct channel.exchangeDeclare(EXCHANGE_NAME, "topic"); //4、創建消息 String message = "hello rabbitmq111"; //5、發布消息 channel.basicPublish(EXCHANGE_NAME, "update.Name", null, message.getBytes()); System.out.println("生產者發送" + message + "'"); //6、關閉通道 channel.close(); //7、關閉連接 connection.close(); } }
②、消費者
消費者1:

package com.ys.topic; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.ys.utils.ConnectionUtil; /** * Create by YSOcean */ public class Consumer1 { private final static String QUEUE_NAME = "topic_queue_1"; private final static String EXCHANGE_NAME = "topic_exchange"; public static void main(String[] args) throws Exception{ //1、獲取連接 Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest"); //2、聲明通道 Channel channel = connection.createChannel(); //3、聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //4、綁定隊列到交換機,指定路由key為update.# channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"update.#"); //同一時刻服務器只會發送一條消息給消費者 channel.basicQos(1); //5、定義隊列的消費者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //6、監聽隊列,手動返回完成狀態 channel.basicConsume(QUEUE_NAME,false,queueingConsumer); //6、獲取消息 while (true){ QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" 消費者1:" + message + "'"); //消費者1接收一條消息后休眠10毫秒 Thread.sleep(10); //返回確認狀態 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); } } }
消費2:

package com.ys.topic; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import com.ys.utils.ConnectionUtil; /** * Create by YSOcean */ public class Consumer2 { private final static String QUEUE_NAME = "topic_queue_2"; private final static String EXCHANGE_NAME = "topic_exchange"; public static void main(String[] args) throws Exception{ //1、獲取連接 Connection connection = ConnectionUtil.getConnection("192.168.146.251",5672,"/","guest","guest"); //2、聲明通道 Channel channel = connection.createChannel(); //3、聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //4、綁定隊列到交換機,指定路由key為select.# channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"select.#"); //同一時刻服務器只會發送一條消息給消費者 channel.basicQos(1); //5、定義隊列的消費者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //6、監聽隊列,手動返回完成狀態 channel.basicConsume(QUEUE_NAME,false,queueingConsumer); //6、獲取消息 while (true){ QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" 消費者1:" + message + "'"); //消費者2接收一條消息后休眠10毫秒 Thread.sleep(1000); //返回確認狀態 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); } } }
③、分析結果
生產者發送消息綁定的路由key為update.Name;消費者1監聽的隊列和交換器綁定路由key為update.#;消費者2監聽的隊列和交換器綁定路由key為select.#。
很顯然,消費者1會接收到消息,而消費者2接收不到。
6、四種交換器
前面五種隊列模式介紹完了,但是實際上只有三種,第一種簡單隊列,第二種工作模式,剩下的三種都是和交換器綁定的合起來稱為一種,這小節我們就來詳細介紹交換器。
交換器分為四種,分別是:direct、fanout、topic和 headers。
前面三種分別對應路由模式、發布訂閱模式和通配符模式,headers 交換器允許匹配 AMQP 消息的 header 而非路由鍵,除此之外,header 交換器和 direct 交換器完全一致,但是性能卻差很多,因此基本上不會用到該交換器,這里也不詳細介紹。
①、direct
如果路由鍵完全匹配的話,消息才會被投放到相應的隊列。
②、fanout
當發送一條消息到fanout交換器上時,它會把消息投放到所有附加在此交換器上的隊列。
③、topic
設置模糊的綁定方式,“*”操作符將“.”視為分隔符,匹配單個字符;“#”操作符沒有分塊的概念,它將任意“.”均視為關鍵字的匹配部分,能夠匹配多個字符。
7、總結
關於 RabbitMQ 的五種隊列,其實實際使用最多的是最后一種主題模式,通過模糊匹配,使得操作更加自如。那么我們總結一下有交換器參與的隊列(最后三種隊列)工作方式如下: