在java springboot 項目中 利用@RabbitListener 監聽處理消息時 消息處理成功,但偶爾報PublisherCallbackChannel is closed 異常 。
處理消息代碼如下:
@RabbitListener(queues = {CommonConstant.PRODUCT_SEARCH_QUEUE}) public void receiverProductSearchQueue(Message message, Channel channel) throws IOException { log.info("開始消費:" + new String(message.getBody())); try { long productSkuId = Long.parseLong(new String(message.getBody())); ResultVo resultVo = esSearchService.saveProductToElasticSearch(productSkuId); if (resultVo.getCode() == CommonConstant.SUCCESS) { log.info("消費成功:" + resultVo.getMsg()); } else { log.info("消費失敗-------------" + message); } } catch (Exception ex) { log.error(ex.toString() + "-------------" + message); } finally { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
最后修改 yml配置文件問題解決,配置如下:
#消息配置 rabbitmq: host: simba-test-mq port: 5672 username: simba password: simba virtual-host: / template: routing-key: product_search exchange: product_search_exchange mandatory: true #publisher-confirms: true publisher-returns: true queue-name: product_search_queue listener: simple: acknowledge-mode: manual #消息手動確認 concurrency: 4 #消費者數量 max-concurrency: 50 #消費者最大數量 prefetch: 1 #消費者每次從隊列中取幾個消息 default-requeue-rejected: true #消息消費失敗后,重新進入消費隊列中 retry: initial-interval: 1000 #1秒后重試 enabled: true #啟用發布重試 max-attempts: 3 #傳遞消息的最大嘗試次數 max-interval: 10000 #嘗試的最大時間間隔 multiplier: 1.0 #應用於先前傳遞重試時間間隔的乘數