RabbitMQ——訂閱模式類型/發布與訂閱模式


 


一、訂閱模式

訂閱模式示例圖:

 

 

 訂閱模型中,多了一個exchange角色:

  • P:生產者,也就是要發送消息的程序,但是不再發送到隊列中,而是發給X(交換機)C:消費者,消息的接受者,會一直等待消息到來。
  • Queue:消息隊列,接收消息、緩存消息。
  • Exchange:交換機,圖中的X。一方面,接收生產者發送的消息。另一方面,知道如何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決於Exchange的類型。Exchange有常見以下3種類型:

    Fanout:廣播,將消息交給所有綁定到交換機的隊列
    Direct:定向,把消息交給符合指定routing key 的隊列
    Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列

Exchange(交換機)只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規則的隊列,那么消息會丟失! 

二、Publish/Subscribe發布與訂閱模式

1.模式說明

 發布訂閱模式:
(1)每個消費者監聽自己的隊列。
(2)生產者將消息發給broker,由交換機將消息轉發到綁定此交換機的每個隊列,每個綁定交換機的隊列都將接收到消息 

2、示例

Producer_PubSub:
 1 /**
 2  * 發送消息
 3  */
 4 public class Producer_PubSub {
 5     public static void main(String[] args) throws IOException, TimeoutException {
 6 
 7         //1.創建連接工廠
 8         ConnectionFactory factory = new ConnectionFactory();
 9         //2. 設置參數
10         factory.setHost("172.16.98.133");//ip  默認值 localhost
11         factory.setPort(5672); //端口  默認值 5672
12         factory.setVirtualHost("/itcast");//虛擬機 默認值/
13         factory.setUsername("jingdong");//用戶名 默認 guest
14         factory.setPassword("jingdong");//密碼 默認值 guest
15         //3. 創建連接 Connection
16         Connection connection = factory.newConnection();
17         //4. 創建Channel
18         Channel channel = connection.createChannel();
19        /*
20 
21        exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
22        參數:
23         1. exchange:交換機名稱
24         2. type:交換機類型
25             DIRECT("direct"),:定向,把消息交給符合指定routing key 的隊列。
26             FANOUT("fanout"),:扇形(廣播),發送消息到每一個與之綁定隊列。
27             TOPIC("topic"),通配符的方式
28             HEADERS("headers");參數匹配
29 
30         3. durable:是否持久化
31         4. autoDelete:自動刪除
32         5. internal:內部使用。 一般false
33         6. arguments:參數
34         */
35 
36        String exchangeName = "test_fanout";
37         //5. 創建交換機
38         channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
39         //6. 創建隊列
40         String queue1Name = "test_fanout_queue1";
41         String queue2Name = "test_fanout_queue2";
42         channel.queueDeclare(queue1Name,true,false,false,null);
43         channel.queueDeclare(queue2Name,true,false,false,null);
44         //7. 綁定隊列和交換機
45         /*
46         queueBind(String queue, String exchange, String routingKey)
47         參數:
48             1. queue:隊列名稱
49             2. exchange:交換機名稱
50             3. routingKey:路由鍵,綁定規則
51                 如果交換機的類型為fanout ,routingKey設置為""
52          */
53         channel.queueBind(queue1Name,exchangeName,"");
54         channel.queueBind(queue2Name,exchangeName,"");
55 
56         String body = "日志信息:張三調用了findAll方法...日志級別:info...";
57         //8. 發送消息
58         channel.basicPublish(exchangeName,"",null,body.getBytes());
59 
60         //9. 釋放資源
61         channel.close();
62         connection.close();
63 
64     }
65 }
View Code
Consumer_PubSub1:
 1 public class Consumer_PubSub1 {
 2     public static void main(String[] args) throws IOException, TimeoutException {
 3 
 4         //1.創建連接工廠
 5         ConnectionFactory factory = new ConnectionFactory();
 6         //2. 設置參數
 7         factory.setHost("172.16.98.133");//ip  默認值 localhost
 8         factory.setPort(5672); //端口  默認值 5672
 9         factory.setVirtualHost("/itcast");//虛擬機 默認值/
10         factory.setUsername("jingdong");//用戶名 默認 guest
11         factory.setPassword("jingdong");//密碼 默認值 guest
12         //3. 創建連接 Connection
13         Connection connection = factory.newConnection();
14         //4. 創建Channel
15         Channel channel = connection.createChannel();
16 
17 
18         String queue1Name = "test_fanout_queue1";
19         String queue2Name = "test_fanout_queue2";
20 
21 
22         /*
23         basicConsume(String queue, boolean autoAck, Consumer callback)
24         參數:
25             1. queue:隊列名稱
26             2. autoAck:是否自動確認
27             3. callback:回調對象
28 
29          */
30         // 接收消息
31         Consumer consumer = new DefaultConsumer(channel){
32             /*
33                 回調方法,當收到消息后,會自動執行該方法
34 
35                 1. consumerTag:標識
36                 2. envelope:獲取一些信息,交換機,路由key...
37                 3. properties:配置信息
38                 4. body:數據
39 
40              */
41             @Override
42             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
43               /*  System.out.println("consumerTag:"+consumerTag);
44                 System.out.println("Exchange:"+envelope.getExchange());
45                 System.out.println("RoutingKey:"+envelope.getRoutingKey());
46                 System.out.println("properties:"+properties);*/
47                 System.out.println("body:"+new String(body));
48                 System.out.println("將日志信息打印到控制台.....");
49             }
50         };
51         channel.basicConsume(queue1Name,true,consumer);
52 
53 
54         //關閉資源?不要
55 
56     }
57 }
View Code
Consumer_PubSub2:
 1 public class Consumer_PubSub2 {
 2     public static void main(String[] args) throws IOException, TimeoutException {
 3 
 4         //1.創建連接工廠
 5         ConnectionFactory factory = new ConnectionFactory();
 6         //2. 設置參數
 7         factory.setHost("172.16.98.133");//ip  默認值 localhost
 8         factory.setPort(5672); //端口  默認值 5672
 9         factory.setVirtualHost("/itcast");//虛擬機 默認值/
10         factory.setUsername("jingdong");//用戶名 默認 guest
11         factory.setPassword("jingdong");//密碼 默認值 guest
12         //3. 創建連接 Connection
13         Connection connection = factory.newConnection();
14         //4. 創建Channel
15         Channel channel = connection.createChannel();
16 
17 
18         String queue1Name = "test_fanout_queue1";
19         String queue2Name = "test_fanout_queue2";
20 
21 
22         /*
23         basicConsume(String queue, boolean autoAck, Consumer callback)
24         參數:
25             1. queue:隊列名稱
26             2. autoAck:是否自動確認
27             3. callback:回調對象
28 
29          */
30         // 接收消息
31         Consumer consumer = new DefaultConsumer(channel){
32             /*
33                 回調方法,當收到消息后,會自動執行該方法
34 
35                 1. consumerTag:標識
36                 2. envelope:獲取一些信息,交換機,路由key...
37                 3. properties:配置信息
38                 4. body:數據
39 
40              */
41             @Override
42             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
43               /*  System.out.println("consumerTag:"+consumerTag);
44                 System.out.println("Exchange:"+envelope.getExchange());
45                 System.out.println("RoutingKey:"+envelope.getRoutingKey());
46                 System.out.println("properties:"+properties);*/
47                 System.out.println("body:"+new String(body));
48                 System.out.println("將日志信息保存數據庫.....");
49             }
50         };
51         channel.basicConsume(queue2Name,true,consumer);
52 
53 
54         //關閉資源?不要
55 
56     }
57 }
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM