RabbitMQ之消費者Demo(隊列參數詳細說明)


  1 package com.jiefupay;
  2 
  3 import java.io.IOException;
  4 import java.util.HashMap;
  5 import java.util.Map;  8 
  6 import org.apache.log4j.Logger;
  7 
  8 import com.jiefupay.dao.Dao;
  9 
 10 import com.rabbitmq.client.AMQP;
 11 import com.rabbitmq.client.Channel;
 12 import com.rabbitmq.client.Connection;
 13 import com.rabbitmq.client.ConnectionFactory;
 14 import com.rabbitmq.client.Consumer;
 15 import com.rabbitmq.client.DefaultConsumer;
 16 import com.rabbitmq.client.Envelope;
 17 
 18 public class App{
 19 
 20     private static final Logger log = Logger.getLogger(App.class); 26     
 21     private static final String EXCHANGE_NAME = "refreshDispatcherMemoryExchange";
 22     
 23     private static String QUEUE_NAME = "refreshDispatcherMemoryhfQueue";
 24 
 25     public static void main(String[] args) throws Exception {
 26         
 27         ConnectionFactory factory = new ConnectionFactory();
 28         factory.setHost("127.0.0.1");
 29         factory.setPort(5672);
 30         factory.setUsername("yourusername");
 31         factory.setPassword("yourpassword");
 32         
 33         //0.創建連接和通道
 34         Connection connection = factory.newConnection();
 35         Channel channel = connection.createChannel();
 36         
 37         //1.聲明一個死信交換機(扇形交換機)
 38         channel.exchangeDeclare("refreshDispatcherDeadExchange", "fanout");
 39 
 40         //2.創建隊列的參數
 41         Map<String, Object> queueArgs = new HashMap<String, Object>();
 42         queueArgs.put("x-dead-letter-exchange", "refreshDispatcherDeadExchange");  //死信隊列
 43         queueArgs.put("x-message-ttl", 10000);     // 消息超時:讓發布的message在隊列中可以存活多長時間,以毫秒為單位。
 44         queueArgs.put("x-expires", 1000);          // 隊列超時:當前的queue在指定的時間內,沒有消費者訂閱就會被刪除,以毫秒為單位。
 45         queueArgs.put("x-max-length", 100);        // 隊列最大長度:當超過了這個大小的時候,會刪除之前最早插入的消息為本次的留出空間。
 46         queueArgs.put("x-queue-mode", "lazy");     //延遲加載:queue的信息盡可能的都保存在磁盤上,僅在有消費者訂閱的時候才會加載到RAM中。
 47         
 48         //3.聲明隊列。-將隊列參數傳到隊列 (隊列名字,是否持久化,是否排外,是否自動清理,參數)
 49         channel.queueDeclare(QUEUE_NAME, true, false, false, queueArgs);
 50         
 51         //4.隊列綁定交換機。   綁定鍵的意義依賴於轉發器的類型,對於fanout類型,忽略此參數(第三個參數為binding key)。
 52         channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
 53         
 54         Consumer consumer = new DefaultConsumer(channel) {
 55             @Override
 56             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
 57                     byte[] body) throws IOException {
 58                 // 捕獲消息內容
 59                 String message = new String(body, "UTF-8");
 60                 
 61                 try {
 62                     //消息處理(自己實現的方法)
 63                     messageHandler(message);
 64                     
 65                     //消息確認
 66                     channel.basicAck(envelope.getDeliveryTag(), false);
 67                     
 68                 }catch (Exception e) {
 69                     
 70                     //出現異常,置為true,重新入隊。
 71                     channel.basicAck(envelope.getDeliveryTag(), true);
 72                     
 73                     //出現異常,不重新入隊,而是reject入死信隊列。
 74                     //channel.basicReject(envelope.getDeliveryTag(), false);
 75                     
 76                 }
 77             }
 78         };
 79         //第二個參數值為false代表關閉RabbitMQ的自動應答機制,改為手動應答。
 80         channel.basicConsume(QUEUE_NAME, false, consumer);
 81     }
 82     
 83     public static void messageHandler(String message) {
 84         switch (message) {
 85         case "loadQDProductData":   // 渠道信息  渠道產品
 86             Dao.loadQDProductDataToSystem();
 87             break;
 88         case "loadQDGroupData":  //渠道組
 89             Dao.loadQDGroupDataToSystem();
 90             break;
 91         case "loadCustomerData": // 客戶信息
 92             Dao.loadCustomerDataToSystem();
 93             break;
 94         case "loadUserProductData": // 客戶產品
 95             Dao.loadUserProductDataToSystem();
 96             break;
 97         default:
 98             break;
 99         }
100         log.info( message + " Done" );
101         
102     }
103 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM