先學習一下RabbitMQ中的六種隊列,只學習前五種,具體的官方文檔地址是:http://next.rabbitmq.com/getstarted.html
導入maven依賴:
1 <dependency> 2 <groupId>com.rabbitmq</groupId> 3 <artifactId>amqp-client</artifactId> 4 <version>3.4.1</version> 5 </dependency>
一、簡單隊列
1、圖示
P:消息的生產者
C:消息的消費者
紅色:隊列
生產者將消息發送到隊列,消費者從隊列中獲取消息。
2、獲取MQ的連接
1 public static Connection getConnection() throws Exception { 2 //定義連接工廠 3 ConnectionFactory factory = new ConnectionFactory(); 4 //設置服務地址 5 factory.setHost("localhost"); 6 //端口 7 factory.setPort(5672); 8 //設置賬號信息,用戶名、密碼、vhost 9 factory.setVirtualHost("/taotao"); 10 factory.setUsername("taotao"); 11 factory.setPassword("taotao"); 12 // 通過工程獲取連接 13 Connection connection = factory.newConnection(); 14 return connection; 15 }
3、生產者發送消息到隊列
1 public class Send { 2 3 private final static String QUEUE_NAME = "test_queue"; 4 5 public static void main(String[] argv) throws Exception { 6 // 獲取到連接以及mq通道 7 Connection connection = ConnectionUtil.getConnection(); 8 // 從連接中創建通道 9 Channel channel = connection.createChannel(); 10 11 // 聲明(創建)隊列 12 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 13 14 // 消息內容 15 String message = "Hello World!"; 16 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 17 System.out.println(" [x] Sent '" + message + "'"); 18 19 //關閉通道和連接 20 channel.close(); 21 connection.close(); 22 } 23 }
4、管理工具中查看消息
點擊上面的隊列名稱,查詢具體的隊列中的信息:
5、消費者從隊列中獲取消息
1 public class Recv { 2 3 private final static String QUEUE_NAME = "test_queue"; 4 5 public static void main(String[] argv) throws Exception { 6 7 // 獲取到連接以及mq通道 8 Connection connection = ConnectionUtil.getConnection(); 9 Channel channel = connection.createChannel(); 10 11 // 聲明隊列 12 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 13 14 // 定義隊列的消費者 15 QueueingConsumer consumer = new QueueingConsumer(channel); 16 // 監聽隊列 17 channel.basicConsume(QUEUE_NAME, true, consumer); 18 19 // 獲取消息 20 while (true) { 21 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 22 String message = new String(delivery.getBody()); 23 System.out.println(" [x] Received '" + message + "'"); 24 } 25 } 26 }
二、 Work模式
1、圖示
一個生產者、2個消費者。
一個消息只能被一個消費者獲取。
2、消費者1
1 public class Recv { 2 3 private final static String QUEUE_NAME = "test_queue_work"; 4 5 public static void main(String[] argv) throws Exception { 6 7 // 獲取到連接以及mq通道 8 Connection connection = ConnectionUtil.getConnection(); 9 Channel channel = connection.createChannel(); 10 11 // 聲明隊列 12 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 13 14 // 同一時刻服務器只會發一條消息給消費者 15 //channel.basicQos(1); 16 17 // 定義隊列的消費者 18 QueueingConsumer consumer = new QueueingConsumer(channel); 19 // 監聽隊列,手動返回完成 20 channel.basicConsume(QUEUE_NAME, false, consumer); 21 22 // 獲取消息 23 while (true) { 24 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 25 String message = new String(delivery.getBody()); 26 System.out.println(" [x] Received '" + message + "'"); 27 //休眠 28 Thread.sleep(10); 29 // 返回確認狀態 30 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 31 } 32 } 33 }
3、消費者2
1 public class Recv2 { 2 3 private final static String QUEUE_NAME = "test_queue_work"; 4 5 public static void main(String[] argv) throws Exception { 6 7 // 獲取到連接以及mq通道 8 Connection connection = ConnectionUtil.getConnection(); 9 Channel channel = connection.createChannel(); 10 11 // 聲明隊列 12 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 13 14 // 同一時刻服務器只會發一條消息給消費者 15 //channel.basicQos(1); 16 17 // 定義隊列的消費者 18 QueueingConsumer consumer = new QueueingConsumer(channel); 19 // 監聽隊列,手動返回完成狀態 20 channel.basicConsume(QUEUE_NAME, false, consumer); 21 22 // 獲取消息 23 while (true) { 24 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 25 String message = new String(delivery.getBody()); 26 System.out.println(" [x] Received '" + message + "'"); 27 // 休眠1秒 28 Thread.sleep(1000); 29 30 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 31 } 32 } 33 }
4、生產者
1 public class Send { 2 3 private final static String QUEUE_NAME = "test_queue_work"; 4 5 public static void main(String[] argv) throws Exception { 6 // 獲取到連接以及mq通道 7 Connection connection = ConnectionUtil.getConnection(); 8 Channel channel = connection.createChannel(); 9 10 // 聲明隊列 11 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 12 13 //向隊列中發送50條消息 14 for (int i = 0; i < 50; i++) { 15 // 消息內容 16 String message = "" + i; 17 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 18 System.out.println(" [x] Sent '" + message + "'"); 19 20 Thread.sleep(i * 10); 21 } 22 23 channel.close(); 24 connection.close(); 25 } 26 }
5、測試結果
測試結果:
1、 消費者1和消費者2獲取到的消息內容是不同的,同一個消息只能被一個消費者獲取。
2、 消費者1和消費者2獲取到的消息的數量是相同的,一個是奇數一個是偶數。
其實,這樣是不合理的,應該是消費者1要比消費者2獲取到的消息多才對。
6、Work模式的“能者多勞”
測試:
消費者1比消費者2獲取的消息更多。
這種是比較符合實際情況的,能者多勞,RabbitMQ客戶端同一時刻只會給消費者發送一條消息,消費者拿到消息后,重新向客戶端拿消息,做的快的消費者要的消息多,同理,做的慢的消費者要的消息少,就體現出來能者多勞的機制了。
7、消息的確認模式
消費者從隊列中獲取消息,服務端如何知道消息已經被消費呢?
模式1:自動確認
只要消息從隊列中獲取,無論消費者獲取到消息后是否成功消息,都認為是消息已經成功消費。
模式2:手動確認
消費者從隊列中獲取消息后,服務器會將該消息標記為不可用狀態,等待消費者的反饋,如果消費者一直沒有反饋,那么該消息將一直處於不可用狀態。
7.1 手動模式
7.2 自動模式
三、訂閱模式
1、圖示
解讀:
1、1個生產者,多個消費者
2、每一個消費者都有自己的一個隊列
3、生產者沒有將消息直接發送到隊列,而是發送到了交換機
4、每個隊列都要綁定到交換機
5、生產者發送的消息,經過交換機,到達隊列,實現,一個消息被多個消費者獲取的目的
注釋:Fanout Exchange – 不處理路由鍵。你只需要簡單的將隊列綁定到交換機上。一個發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上。很像子網廣播,每台子網內的主機都獲得了一份復制的消息。Fanout交換機轉發消息是最快的。
2、消息的生產者(看作是后台系統)
向交換機中發行消息
1 public class Send { 2 3 private final static String EXCHANGE_NAME = "test_exchange_fanout"; 4 5 public static void main(String[] argv) throws Exception { 6 // 獲取到連接以及mq通道 7 Connection connection = ConnectionUtil.getConnection(); 8 Channel channel = connection.createChannel(); 9 10 // 聲明exchange 11 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); 12 13 // 消息內容 14 String message = "商品已經被更新,id=1001"; 15 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); 16 System.out.println(" 后台系統: '" + message + "'"); 17 18 channel.close(); 19 connection.close(); 20 } 21 }
注意:消息發送到沒有隊列綁定的交換機時,消息將丟失,因為,交換機沒有存儲消息的能力,消息只能存在在隊列中。
3、消費者1(看作是前台系統)
1 public class Recv { 2 3 private final static String QUEUE_NAME = "test_queue_ps_1"; 4 5 private final static String EXCHANGE_NAME = "test_exchange_fanout"; 6 7 public static void main(String[] argv) throws Exception { 8 9 // 獲取到連接以及mq通道 10 Connection connection = ConnectionUtil.getConnection(); 11 Channel channel = connection.createChannel(); 12 13 // 聲明隊列 14 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 15 16 // 綁定隊列到交換機 17 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); 18 19 // 同一時刻服務器只會發一條消息給消費者 20 channel.basicQos(1); 21 22 // 定義隊列的消費者 23 QueueingConsumer consumer = new QueueingConsumer(channel); 24 // 監聽隊列,手動返回完成 25 channel.basicConsume(QUEUE_NAME, false, consumer); 26 27 // 獲取消息 28 while (true) { 29 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 30 String message = new String(delivery.getBody()); 31 System.out.println(" 前台系統: '" + message + "'"); 32 Thread.sleep(10); 33 34 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 35 } 36 } 37 }
4、消費者2(看作是搜索系統)
1 public class Recv2 { 2 3 private final static String QUEUE_NAME = "test_queue_ps_2"; 4 5 private final static String EXCHANGE_NAME = "test_exchange_fanout"; 6 7 public static void main(String[] argv) throws Exception { 8 9 // 獲取到連接以及mq通道 10 Connection connection = ConnectionUtil.getConnection(); 11 Channel channel = connection.createChannel(); 12 13 // 聲明隊列 14 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 15 16 // 綁定隊列到交換機 17 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); 18 19 // 同一時刻服務器只會發一條消息給消費者 20 channel.basicQos(1); 21 22 // 定義隊列的消費者 23 QueueingConsumer consumer = new QueueingConsumer(channel); 24 // 監聽隊列,手動返回完成 25 channel.basicConsume(QUEUE_NAME, false, consumer); 26 27 // 獲取消息 28 while (true) { 29 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 30 String message = new String(delivery.getBody()); 31 System.out.println(" 搜索系統: '" + message + "'"); 32 Thread.sleep(10); 33 34 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 35 } 36 } 37 }
5、測試
測試結果:
同一個消息被多個消費者獲取。
在管理工具中查看隊列和交換機的綁定關系:
6、使用訂閱模式能否實現商品數據的同步?
答案:可以的。
后台系統就是消息的生產者。
前台系統和搜索系統是消息的消費者。
后台系統將消息發送到交換機中,前台系統和搜索系統都創建自己的隊列,然后將隊列綁定到交換機,即可實現。
消息,新增商品、修改商品、刪除商品。
前台系統:修改商品、刪除商品。
搜索系統:新增商品、修改商品、刪除商品。
所以使用訂閱模式實現商品數據的同步並不合理。
四、路由模式
1、圖示
注釋:Direct Exchange – 處理路由鍵。需要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵完全匹配。這是一個完整的匹配。如果一個隊列綁定到該交換機上要求路由鍵 “dog”,則只有被標記為“dog”的消息才被轉發,不會轉發dog.puppy,也不會轉發dog.guard,只會轉發dog。
2、生產者
1 public class Send { 2 3 private final static String EXCHANGE_NAME = "test_exchange_direct"; 4 5 public static void main(String[] argv) throws Exception { 6 // 獲取到連接以及mq通道 7 Connection connection = ConnectionUtil.getConnection(); 8 Channel channel = connection.createChannel(); 9 10 // 聲明exchange 11 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); 12 13 // 消息內容 14 String message = "商品刪除,id=1002"; 15 channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes()); 16 System.out.println(" 后台系統: '" + message + "'"); 17 18 channel.close(); 19 connection.close(); 20 } 21 }
3、消費者1(前台系統)
1 public class Recv { 2 3 private final static String QUEUE_NAME = "test_queue_direct_1"; 4 5 private final static String EXCHANGE_NAME = "test_exchange_direct"; 6 7 public static void main(String[] argv) throws Exception { 8 9 // 獲取到連接以及mq通道 10 Connection connection = ConnectionUtil.getConnection(); 11 Channel channel = connection.createChannel(); 12 13 // 聲明隊列 14 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 15 16 // 綁定隊列到交換機 17 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update"); 18 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete"); 19 20 // 同一時刻服務器只會發一條消息給消費者 21 channel.basicQos(1); 22 23 // 定義隊列的消費者 24 QueueingConsumer consumer = new QueueingConsumer(channel); 25 // 監聽隊列,手動返回完成 26 channel.basicConsume(QUEUE_NAME, false, consumer); 27 28 // 獲取消息 29 while (true) { 30 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 31 String message = new String(delivery.getBody()); 32 System.out.println(" 前台系統: '" + message + "'"); 33 Thread.sleep(10); 34 35 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 36 } 37 } 38 }
4、消費者2(搜索系統)
1 public class Recv2 { 2 3 private final static String QUEUE_NAME = "test_queue_direct_2"; 4 5 private final static String EXCHANGE_NAME = "test_exchange_direct"; 6 7 public static void main(String[] argv) throws Exception { 8 9 // 獲取到連接以及mq通道 10 Connection connection = ConnectionUtil.getConnection(); 11 Channel channel = connection.createChannel(); 12 13 // 聲明隊列 14 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 15 16 // 綁定隊列到交換機 17 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert"); 18 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update"); 19 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete"); 20 21 // 同一時刻服務器只會發一條消息給消費者 22 channel.basicQos(1); 23 24 // 定義隊列的消費者 25 QueueingConsumer consumer = new QueueingConsumer(channel); 26 // 監聽隊列,手動返回完成 27 channel.basicConsume(QUEUE_NAME, false, consumer); 28 29 // 獲取消息 30 while (true) { 31 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 32 String message = new String(delivery.getBody()); 33 System.out.println(" 搜索系統: '" + message + "'"); 34 Thread.sleep(10); 35 36 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 37 } 38 } 39 }
五、通配符模式
1、圖示
注釋:Topic Exchange – 將路由鍵和某模式進行匹配。此時隊列需要綁定要一個模式上。符號“#”匹配一個或多個詞,符號“*”匹配不多不少一個詞。因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*” 只會匹配到“audit.irs”。我在RedHat的朋友做了一張不錯的圖,來表明topic交換機是如何工作的:
2、生產者
1 public class Send { 2 3 private final static String EXCHANGE_NAME = "test_exchange_topic"; 4 5 public static void main(String[] argv) throws Exception { 6 // 獲取到連接以及mq通道 7 Connection connection = ConnectionUtil.getConnection(); 8 Channel channel = connection.createChannel(); 9 10 // 聲明exchange 11 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); 12 13 // 消息內容 14 String message = "商品刪除,id=1003"; 15 channel.basicPublish(EXCHANGE_NAME, "item.delete", null, message.getBytes()); 16 System.out.println(" 后台系統: '" + message + "'"); 17 18 channel.close(); 19 connection.close(); 20 } 21 }
3、消費者1(前台系統)
1 public class Recv { 2 3 private final static String QUEUE_NAME = "test_queue_topic_1"; 4 5 private final static String EXCHANGE_NAME = "test_exchange_topic"; 6 7 public static void main(String[] argv) throws Exception { 8 9 // 獲取到連接以及mq通道 10 Connection connection = ConnectionUtil.getConnection(); 11 Channel channel = connection.createChannel(); 12 13 // 聲明隊列 14 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 15 16 // 綁定隊列到交換機 17 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update"); 18 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete"); 19 20 // 同一時刻服務器只會發一條消息給消費者 21 channel.basicQos(1); 22 23 // 定義隊列的消費者 24 QueueingConsumer consumer = new QueueingConsumer(channel); 25 // 監聽隊列,手動返回完成 26 channel.basicConsume(QUEUE_NAME, false, consumer); 27 28 // 獲取消息 29 while (true) { 30 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 31 String message = new String(delivery.getBody()); 32 System.out.println(" 前台系統: '" + message + "'"); 33 Thread.sleep(10); 34 35 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 36 } 37 } 38 }
4、消費者2(搜索系統)
1 public class Recv2 { 2 3 private final static String QUEUE_NAME = "test_queue_topic_2"; 4 5 private final static String EXCHANGE_NAME = "test_exchange_topic"; 6 7 public static void main(String[] argv) throws Exception { 8 9 // 獲取到連接以及mq通道 10 Connection connection = ConnectionUtil.getConnection(); 11 Channel channel = connection.createChannel(); 12 13 // 聲明隊列 14 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 15 16 // 綁定隊列到交換機 17 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.#"); 18 19 // 同一時刻服務器只會發一條消息給消費者 20 channel.basicQos(1); 21 22 // 定義隊列的消費者 23 QueueingConsumer consumer = new QueueingConsumer(channel); 24 // 監聽隊列,手動返回完成 25 channel.basicConsume(QUEUE_NAME, false, consumer); 26 27 // 獲取消息 28 while (true) { 29 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 30 String message = new String(delivery.getBody()); 31 System.out.println(" 搜索系統: '" + message + "'"); 32 Thread.sleep(10); 33 34 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); 35 } 36 } 37 }