Springboot為了應對高並發,接入了消息隊列Rabbitmq,第一版驗證時使用簡單消費隊列:
//發送端 AbstractOrder order =new Order(); rabbitmqTemplate.convertAndSend(order); //消費端 public void recieved(AbstractOrder order){ log.info("recieved order:"+order); //處理邏輯 }
第二版為了應對可能出行的處理失敗,使用了Rabbitmq的Ack
下面是最終版代碼:
//發送端
//把訂單加入隊列 public void convertAndSendOrder(AbstractOrder order){ rabbitmqTemplate.setMandatory(true); rabbitmqTemplate.setConfirmCallback(confirmCallback); rabbitmqTemplate.setReturnCallback(returnCallback); //全局唯一 不然ReturnCallback 無效 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitmqTemplate.convertAndSend("exchange-order","rkey-orderadd",order,correlationData); }
//消費端邏輯
@RabbitListener(bindings = { @QueueBinding(value = @Queue(value = "queue-order"), exchange = @Exchange(value = "exchange-order"), key = "rkey-order")},containerFactory="rabbitListenerContainerFactory") public void recieved(Message messageorigin, Channel channel){ log.info("recieved message:"+messageorigin); boolean success = false; Order order =null; try { Jackson2JsonMessageConverter jackson2JsonMessageConverter =new Jackson2JsonMessageConverter(); order = (Order)jackson2JsonMessageConverter.fromMessage(messageorigin); }catch (Exception e){ log.info("get order fromBytes message exception"+e.getMessage()); } //處理邏輯 //回調成功確認消息 if(success){ //成功確認消息 try{ channel.basicAck(messageorigin.getMessageProperties().getDeliveryTag(),false); }catch (IOException e){ log.info("basicAck ex:"+e.getMessage()); try{ channel.basicAck(messageorigin.getMessageProperties().getDeliveryTag(),false); }catch (IOException ee){ log.info("again basicAck ex:"+ee.getMessage()); } } }
注意:如果在received 中還像第一版直接轉自定義對象,消息進程會報錯
org.springframework.amqp.AmqpException: No method found for class com.xxx.Order
解決方案是使用Jackson2JsonMessageConverter 。在發送消息時,它會先將自定義的消息類序列化成json格式,再轉成byte構造 Message
//發送 設置Converter @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMessageConverter(new Jackson2JsonMessageConverter()); return template; } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; }
//消費時 指定Converter
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = "queue-order"), exchange = @Exchange(value = "exchange-order"), key = "rkey-order")},containerFactory="rabbitListenerContainerFactory")
但是直接使用Jackson2JsonMessageConverter后,反序列化時要求發送的類和接受的類完全一樣(字段,類名,包路徑)。
查了下 文檔 https://docs.spring.io/spring-amqp/api/org/springframework/amqp/support/converter/Jackson2JsonMessageConverter.html
這里直接使用jackson2JsonMessageConverter.fromMessage的方法從Message拿出來。
ACK完成