【RabbitMQ】 Java簡單的實現RabbitMQ


准備工作

  1、安裝RabbitMQ,參考【RabbitMQ】 RabbitMQ安裝

  2、新建Java項目,引入RabbitMQ的Maven依賴

1 <dependency>
2     <groupId>com.rabbitmq</groupId>
3     <artifactId>amqp-client</artifactId>
4     <version>5.5.0</version>
5 </dependency>

單生產者消費者

  流程圖:

    

  1、創建生產者Producer1

 1 public class Producer1 {
 2 
 3     private final static String QUEUE_NAME = "rabbitMQ.test.queue";
 4 
 5     public static void main(String[] args) throws IOException, TimeoutException {
 6 
 7         // 創建連接工廠
 8         ConnectionFactory factory = new ConnectionFactory();
 9         // 設置RabbitMQ相關信息
10         factory.setHost("localhost");
11         factory.setUsername("guest");
12         factory.setPassword("guest");
13         factory.setPort(5672);
14         // 創建一個新的連接
15         Connection connection = factory.newConnection();
16         // 創建一個通道
17         Channel channel = connection.createChannel();
18         
19         // 聲明一個隊列 
20         // queueDeclare(隊列名稱,是否持久化(true表示是,隊列將在服務器重啟時生存),是否是獨占隊列(創建者可以使用的私有隊列,斷開后自動刪除),
21         //                        當所有消費者客戶端連接斷開時是否自動刪除隊列,隊列的其他參數)
22         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
23         String message = "Hello RabbitMQ ~";
24         
25         // 發送消息到隊列中
26         // basicPublish(交換機名稱,隊列映射的路由key,消息的其他屬性,發送信息的主體)
27         channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
28          System.out.println("Producer Send +'" + message + "'");
29         // 關閉通道和連接
30         channel.close();
31         connection.close();
32 
33     }
34 
35 }

  2、創建消費者Customer1

 1 public class Customer1 {
 2 
 3     private final static String QUEUE_NAME = "rabbitMQ.test.queue";
 4 
 5     public static void main(String[] args) throws IOException, TimeoutException {
 6         // 創建連接工廠
 7         ConnectionFactory factory = new ConnectionFactory();
 8         // 設置RabbitMQ地址
 9         factory.setHost("localhost");
10         factory.setUsername("guest");
11         factory.setPassword("guest");
12         factory.setPort(5672);
13         // 創建一個新的連接
14         Connection connection = factory.newConnection();
15         // 創建一個通道
16         final Channel channel = connection.createChannel();
17         // 聲明要關注的隊列
18         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
19         channel.basicQos(1);//保證一次只分發一個
20         System.out.println("Customer Waiting Received messages");
21         
22         // DefaultConsumer類實現了Consumer接口,通過傳入一個頻道,
23         // 告訴服務器我們需要那個頻道的消息,如果頻道中有消息,就會執行回調函數handleDelivery
24         Consumer consumer = new DefaultConsumer(channel) {
25             @Override
26             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
27                     byte[] body) throws IOException {
28                 String message = new String(body, "UTF-8");
29                 System.out.println("Customer Received '" + message + "'");
30             }
31         };
32         
33         // 自動回復隊列應答 -- RabbitMQ中的消息確認機制
34         channel.basicConsume(QUEUE_NAME, true, consumer);
35     }
36 
37 }

  3、運行結果

    a、生產者

    

    b、消費者

    

推送確認和消費應答

  流程圖

    

  1、創建推送確認生產者Producer2

 1 public class Producer2 {
 2 
 3     private final static String QUEUE_NAME = "rabbitMQ.test.queue";
 4 
 5     public static void main(String[] args) throws IOException, TimeoutException {
 6 
 7         // 創建連接工廠
 8         ConnectionFactory factory = new ConnectionFactory();
 9         // 設置RabbitMQ相關信息
10         factory.setHost("localhost");
11         factory.setUsername("guest");
12         factory.setPassword("guest");
13         factory.setPort(5672);
14         // 創建一個新的連接
15         Connection connection = factory.newConnection();
16         // 創建一個通道
17         Channel channel = connection.createChannel();
18 
19         // 聲明一個隊列
20         // queueDeclare(隊列名稱,是否持久化(true表示是,隊列將在服務器重啟時生存),是否是獨占隊列(創建者可以使用的私有隊列,斷開后自動刪除),
21         // 當所有消費者客戶端連接斷開時是否自動刪除隊列,隊列的其他參數)
22         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
23 
24         // 開啟發送方確認模式
25         channel.confirmSelect();
26 
27         // 存儲未確認的消息標識tag
28         final SortedSet<Long> confirmSet = Collections.synchronizedNavigableSet(new TreeSet<Long>());
29 
30         // 異步監聽確認和未確認的消息
31         channel.addConfirmListener(new ConfirmListener() {
32 
33             /**
34              * 處理返回確認成功
35              * 
36              * @param deliveryTag 如果是多條,這個就是最后一條消息的tag
37              * @param multiple    是否多條
38              * @throws IOException
39              */
40             public void handleAck(long deliveryTag, boolean multiple) throws IOException {
41                 System.out.println("消息發送成功,    deliveryTag:" + deliveryTag + "        multiple:" + multiple + "");
42                 if (multiple) {
43                     // 移除發送成功的多條消息標識tag
44                     confirmSet.headSet(deliveryTag + 1).clear();
45                 } else {
46                     // 移除發送成功的一條消息標識tag
47                     confirmSet.remove(deliveryTag);
48                 }
49             }
50 
51             /**
52              * 處理返回確認失敗
53              * 
54              * @param deliveryTag 如果是多條,這個就是最后一條消息的tag
55              * @param multiple    是否多條
56              * @throws IOException
57              */
58             public void handleNack(long deliveryTag, boolean multiple) throws IOException {
59                 System.out.println("失敗,deliveryTag:" + deliveryTag + "multiple:" + multiple + "");
60                 if (multiple) {
61                     confirmSet.headSet(deliveryTag + 1).clear();
62                 } else {
63                     confirmSet.remove(deliveryTag);
64                 }
65             }
66 
67         });
68 
69         String message = "Hello RabbitMQ ~ ";
70 
71         // 發送消息到隊列中
72         // basicPublish(交換機名稱,隊列映射的路由key,消息的其他屬性,發送信息的主體)
73         for (int i = 1; i <= 10; i++) {
74             String msg = message + i;
75             long tag = channel.getNextPublishSeqNo();
76             confirmSet.add(tag);
77             System.out.println("tag:" + tag);
78             
79             channel.basicPublish("", QUEUE_NAME, null, msg.getBytes("UTF-8"));
80             System.out.println("Producer Send +'" + msg + "'");
81             
82         }
83         
84         System.out.println("============================");
85         // 關閉通道和連接
86         // channel.close();
87         // connection.close();
88 
89     }
90 }

  2、創建消費應答消費者Customer2

 1 public class Customer2 {
 2 
 3     private final static String QUEUE_NAME = "rabbitMQ.test.queue";
 4 
 5     public static void main(String[] args) throws IOException, TimeoutException {
 6         // 創建連接工廠
 7         ConnectionFactory factory = new ConnectionFactory();
 8         // 設置RabbitMQ地址
 9         factory.setHost("localhost");
10         factory.setUsername("guest");
11         factory.setPassword("guest");
12         factory.setPort(5672);
13         // 創建一個新的連接
14         Connection connection = factory.newConnection();
15         // 創建一個通道
16         final Channel channel = connection.createChannel();
17         // 聲明要關注的隊列
18         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
19         channel.basicQos(1);// 保證一次只分發一個
20         System.out.println("Customer Waiting Received messages");
21 
22         // DefaultConsumer類實現了Consumer接口,通過傳入一個頻道,
23         // 告訴服務器我們需要那個頻道的消息,如果頻道中有消息,就會執行回調函數handleDelivery
24         Consumer consumer = new DefaultConsumer(channel) {
25             @Override
26             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
27                     byte[] body) throws IOException {
28                 String message = new String(body, "UTF-8");
29                 System.out.println("Customer Received '" + message + "'");
30 
31                 // 返回消費確認狀態
32                 channel.basicAck(envelope.getDeliveryTag(), false);
33             }
34         };
35 
36         // 消費手動確認 -- RabbitMQ中的消息確認機制
37         channel.basicConsume(QUEUE_NAME, false, consumer);
38     }
39 
40 }

  3、運行結果

    生產者:
      

    消費者:
      

多消費者

  流程圖
    

  1、創建推送確認生產者Producer3(與示例Producer2一樣)

  2、創建消費應答消費者Customer3

 1 public class Customer3 {
 2 
 3     private final static String QUEUE_NAME = "rabbitMQ.test.queue";
 4 
 5     public static void main(String[] args) throws IOException, TimeoutException {
 6         Customer3 customer3 = new Customer3();
 7         customer3.createCustomer("customer1");
 8         customer3.createCustomer("customer2");
 9     }
10 
11     public void createCustomer(final String customerName) throws IOException, TimeoutException{
12         
13         // 創建連接工廠
14         ConnectionFactory factory = new ConnectionFactory();
15         // 設置RabbitMQ地址
16         factory.setHost("localhost");
17         factory.setUsername("guest");
18         factory.setPassword("guest");
19         factory.setPort(5672);
20         // 創建一個新的連接
21         Connection connection = factory.newConnection();
22         // 創建一個通道
23         final Channel channel = connection.createChannel();
24         // 聲明要關注的隊列
25         channel.queueDeclare(QUEUE_NAME, false, false, false, null);
26         channel.basicQos(1);// 保證一次只分發一個
27         System.out.println(customerName + " Waiting Received messages");
28 
29         // DefaultConsumer類實現了Consumer接口,通過傳入一個頻道,
30         // 告訴服務器我們需要那個頻道的消息,如果頻道中有消息,就會執行回調函數handleDelivery
31         Consumer consumer = new DefaultConsumer(channel) {
32             @Override
33             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
34                     byte[] body) throws IOException {
35                 String message = new String(body, "UTF-8");
36                 System.out.println(customerName + " Received '" + message + "'");
37                 
38                 // doWork處理任務
39                 doWork(customerName);
40                 
41                 // 返回消費確認狀態
42                 channel.basicAck(envelope.getDeliveryTag(), false);
43             }
44         };
45 
46         // 消費手動確認 -- RabbitMQ中的消息確認機制
47         channel.basicConsume(QUEUE_NAME, false, consumer);
48     }
49     
50     
51     private void doWork(String customer) {
52         try {
53             Thread.sleep(2000); // 暫停2秒鍾
54             System.out.println(customer + ": completion of the job!");
55         } catch (InterruptedException _ignored) {
56             Thread.currentThread().interrupt();
57         }
58     }
59 }

  3、運行結果
    生產者與前面相同

    消費者
      
  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM