一、訂閱模式
訂閱模式示例圖:
訂閱模型中,多了一個exchange角色:
- P:生產者,也就是要發送消息的程序,但是不再發送到隊列中,而是發給X(交換機)C:消費者,消息的接受者,會一直等待消息到來。
- Queue:消息隊列,接收消息、緩存消息。
- Exchange:交換機,圖中的X。一方面,接收生產者發送的消息。另一方面,知道如何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決於Exchange的類型。Exchange有常見以下3種類型:
Fanout:廣播,將消息交給所有綁定到交換機的隊列
Direct:定向,把消息交給符合指定routing key 的隊列
Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列
Exchange(交換機)只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規則的隊列,那么消息會丟失!
二、Publish/Subscribe發布與訂閱模式
1.模式說明
發布訂閱模式:
(1)每個消費者監聽自己的隊列。
(2)生產者將消息發給broker,由交換機將消息轉發到綁定此交換機的每個隊列,每個綁定交換機的隊列都將接收到消息
2、示例
Producer_PubSub:

1 /** 2 * 發送消息 3 */ 4 public class Producer_PubSub { 5 public static void main(String[] args) throws IOException, TimeoutException { 6 7 //1.創建連接工廠 8 ConnectionFactory factory = new ConnectionFactory(); 9 //2. 設置參數 10 factory.setHost("172.16.98.133");//ip 默認值 localhost 11 factory.setPort(5672); //端口 默認值 5672 12 factory.setVirtualHost("/itcast");//虛擬機 默認值/ 13 factory.setUsername("jingdong");//用戶名 默認 guest 14 factory.setPassword("jingdong");//密碼 默認值 guest 15 //3. 創建連接 Connection 16 Connection connection = factory.newConnection(); 17 //4. 創建Channel 18 Channel channel = connection.createChannel(); 19 /* 20 21 exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) 22 參數: 23 1. exchange:交換機名稱 24 2. type:交換機類型 25 DIRECT("direct"),:定向,把消息交給符合指定routing key 的隊列。 26 FANOUT("fanout"),:扇形(廣播),發送消息到每一個與之綁定隊列。 27 TOPIC("topic"),通配符的方式 28 HEADERS("headers");參數匹配 29 30 3. durable:是否持久化 31 4. autoDelete:自動刪除 32 5. internal:內部使用。 一般false 33 6. arguments:參數 34 */ 35 36 String exchangeName = "test_fanout"; 37 //5. 創建交換機 38 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null); 39 //6. 創建隊列 40 String queue1Name = "test_fanout_queue1"; 41 String queue2Name = "test_fanout_queue2"; 42 channel.queueDeclare(queue1Name,true,false,false,null); 43 channel.queueDeclare(queue2Name,true,false,false,null); 44 //7. 綁定隊列和交換機 45 /* 46 queueBind(String queue, String exchange, String routingKey) 47 參數: 48 1. queue:隊列名稱 49 2. exchange:交換機名稱 50 3. routingKey:路由鍵,綁定規則 51 如果交換機的類型為fanout ,routingKey設置為"" 52 */ 53 channel.queueBind(queue1Name,exchangeName,""); 54 channel.queueBind(queue2Name,exchangeName,""); 55 56 String body = "日志信息:張三調用了findAll方法...日志級別:info..."; 57 //8. 發送消息 58 channel.basicPublish(exchangeName,"",null,body.getBytes()); 59 60 //9. 釋放資源 61 channel.close(); 62 connection.close(); 63 64 } 65 }
Consumer_PubSub1:

1 public class Consumer_PubSub1 { 2 public static void main(String[] args) throws IOException, TimeoutException { 3 4 //1.創建連接工廠 5 ConnectionFactory factory = new ConnectionFactory(); 6 //2. 設置參數 7 factory.setHost("172.16.98.133");//ip 默認值 localhost 8 factory.setPort(5672); //端口 默認值 5672 9 factory.setVirtualHost("/itcast");//虛擬機 默認值/ 10 factory.setUsername("jingdong");//用戶名 默認 guest 11 factory.setPassword("jingdong");//密碼 默認值 guest 12 //3. 創建連接 Connection 13 Connection connection = factory.newConnection(); 14 //4. 創建Channel 15 Channel channel = connection.createChannel(); 16 17 18 String queue1Name = "test_fanout_queue1"; 19 String queue2Name = "test_fanout_queue2"; 20 21 22 /* 23 basicConsume(String queue, boolean autoAck, Consumer callback) 24 參數: 25 1. queue:隊列名稱 26 2. autoAck:是否自動確認 27 3. callback:回調對象 28 29 */ 30 // 接收消息 31 Consumer consumer = new DefaultConsumer(channel){ 32 /* 33 回調方法,當收到消息后,會自動執行該方法 34 35 1. consumerTag:標識 36 2. envelope:獲取一些信息,交換機,路由key... 37 3. properties:配置信息 38 4. body:數據 39 40 */ 41 @Override 42 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 43 /* System.out.println("consumerTag:"+consumerTag); 44 System.out.println("Exchange:"+envelope.getExchange()); 45 System.out.println("RoutingKey:"+envelope.getRoutingKey()); 46 System.out.println("properties:"+properties);*/ 47 System.out.println("body:"+new String(body)); 48 System.out.println("將日志信息打印到控制台....."); 49 } 50 }; 51 channel.basicConsume(queue1Name,true,consumer); 52 53 54 //關閉資源?不要 55 56 } 57 }
Consumer_PubSub2:

1 public class Consumer_PubSub2 { 2 public static void main(String[] args) throws IOException, TimeoutException { 3 4 //1.創建連接工廠 5 ConnectionFactory factory = new ConnectionFactory(); 6 //2. 設置參數 7 factory.setHost("172.16.98.133");//ip 默認值 localhost 8 factory.setPort(5672); //端口 默認值 5672 9 factory.setVirtualHost("/itcast");//虛擬機 默認值/ 10 factory.setUsername("jingdong");//用戶名 默認 guest 11 factory.setPassword("jingdong");//密碼 默認值 guest 12 //3. 創建連接 Connection 13 Connection connection = factory.newConnection(); 14 //4. 創建Channel 15 Channel channel = connection.createChannel(); 16 17 18 String queue1Name = "test_fanout_queue1"; 19 String queue2Name = "test_fanout_queue2"; 20 21 22 /* 23 basicConsume(String queue, boolean autoAck, Consumer callback) 24 參數: 25 1. queue:隊列名稱 26 2. autoAck:是否自動確認 27 3. callback:回調對象 28 29 */ 30 // 接收消息 31 Consumer consumer = new DefaultConsumer(channel){ 32 /* 33 回調方法,當收到消息后,會自動執行該方法 34 35 1. consumerTag:標識 36 2. envelope:獲取一些信息,交換機,路由key... 37 3. properties:配置信息 38 4. body:數據 39 40 */ 41 @Override 42 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 43 /* System.out.println("consumerTag:"+consumerTag); 44 System.out.println("Exchange:"+envelope.getExchange()); 45 System.out.println("RoutingKey:"+envelope.getRoutingKey()); 46 System.out.println("properties:"+properties);*/ 47 System.out.println("body:"+new String(body)); 48 System.out.println("將日志信息保存數據庫....."); 49 } 50 }; 51 channel.basicConsume(queue2Name,true,consumer); 52 53 54 //關閉資源?不要 55 56 } 57 }