只做下工作記錄,比較重要的幾個屬性:
concurrency:一個生產者可以同時由多少個消費者消費,這個一般根據你的機器性能來進行配置
prefetch:允許為每個consumer指定最大的unacked messages數目。要是對實時性要求很高的話,prefetch應該設置成1,concurrency的值調高點
隊列中Ready狀態和Unacknowledged狀態的消息數,分別指的是等待投遞給消費者的消息數和已經投遞給消費者但是未收到ack信號的消息數。
注意配置超時重連機制,防止死機
spring.rabbitmq.listener.simple.retry.max-attempts=5
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.initial-interval=5000
spring.rabbitmq.listener.simple.default-requeue-rejected=false
spring.rabbitmq.listener.simple.prefetch=1
Springboot中消費簡單實現
@RabbitListener(concurrency = "12",bindings = {@QueueBinding(value = @Queue(value = YOUR_QUEUE), exchange = @Exchange(value = YOUR_EXCHANGE,type = "fanout"))}) public void process(Message message, com.rabbitmq.client.Channel channel) { Long deliveryTag = null; String data = null; try { deliveryTag = message.getMessageProperties().getDeliveryTag(); data = new String(message.getBody(), "UTF-8"); //業務處理 } catch (Exception e) { logger.warn("=====消息處理異常"); logger.warn("MQ異常 {} , {} , {}" , e.getMessage(),e.getCause(),data); }finally { try { channel.basicAck(deliveryTag, false); // 確認消息成功消費 } catch (IOException e) { logger.warn("======應答出錯,請檢查"); } logger.warn("======消息結束"); } }
特別要注意的是:如果不是用springboot注解形式連接,很多時候會因為配置的問題導致access refused(queue 或 exchange),這個時候一定要注意參數是否使用正確,比如php的
另外如果交換機和隊列不允許客戶進行聲明操作的,要注意passive等於true
轉載請注明博客出處:http://www.cnblogs.com/cjh-notes/