Java秒殺系統實戰系列~整合RabbitMQ實現消息異步發送


摘要:

本篇博文是“Java秒殺系統實戰系列文章”的第八篇,在這篇文章中我們將整合消息中間件RabbitMQ,包括添加依賴、加入配置信息以及自定義注入相關操作組件,比如RabbitTemplate等等,最終初步實現消息的發送和接收,並在下一篇章將其與郵件服務整合,實現“用戶秒殺成功發送郵件通知消息”的功能!

內容:

對於消息中間件RabbitMQ,想必各位小伙伴沒有用過、也該有聽過,它是一款目前市面上應用相當廣泛的消息中間件,可以實現消息異步通信、業務服務模塊解耦、接口限流、消息分發等功能,在微服務、分布式系統架構中可以說是充當着一名了不起的角色!(詳細的介紹,Debug在這里就不贅述了,各位小伙伴可以上官網看看其更多的介紹及其典型的應用場景)!

在本篇博文中,我們將使用RabbitMQ充當消息發送的組件,將它與后面篇章介紹的“郵件服務”結合實現“用戶秒殺成功后異步發送郵件通知消息,告知用戶秒殺已經成功!”,下面我們一起進入代碼實戰吧。

(1)要使用RabbitMQ,前提得在本地開發環境或者服務器安裝RabbitMQ服務,如下圖所示為Debug在本地安裝RabbitMQ服務成功后訪問其后端控制台應用的首頁:

之后我們開始將其與SpringBoot進行整合。首先需要加入其依賴,其版本號跟SpringBoot的版本一致,版本號為1.5.7.RELEASE:

<!--rabbitmq-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>${spring-boot.version}</version>
</dependency>

 

然后需要在配置文件application.properties中加入RabbitMQ服務相關的配置,比如其服務所在的Host、端口Port等等:

#rabbitmq spring.rabbitmq.virtual-host=/ spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.listener.simple.concurrency=5 spring.rabbitmq.listener.simple.max-concurrency=15 spring.rabbitmq.listener.simple.prefetch=10

 

(2)緊接着,我們借助SpringBoot天然具有的一些特性,自動注入RabbitMQ一些組件的配置,包括其“單一實例消費者”配置、“多實例消費者”配置以及用於發送消息的操作組件實例“RabbitTemplate”的配置:

//通用化 Rabbitmq 配置
@Configuration public class RabbitmqConfig { private final static Logger log = LoggerFactory.getLogger(RabbitmqConfig.class); @Autowired private Environment env; @Autowired private CachingConnectionFactory connectionFactory; @Autowired private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer; //單一消費者
  @Bean(name = "singleListenerContainer") public SimpleRabbitListenerContainerFactory listenerContainer(){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setConcurrentConsumers(1); factory.setMaxConcurrentConsumers(1); factory.setPrefetchCount(1); factory.setTxSize(1); return factory; } //多個消費者
  @Bean(name = "multiListenerContainer") public SimpleRabbitListenerContainerFactory multiListenerContainer(){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factoryConfigurer.configure(factory,connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); //確認消費模式-NONE
 factory.setAcknowledgeMode(AcknowledgeMode.NONE); factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.simple.concurrency",int.class)); factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.simple.max-concurrency",int.class)); factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.simple.prefetch",int.class)); return factory; } @Bean public RabbitTemplate rabbitTemplate(){ connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("消息發送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause); } }); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.warn("消息丟失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message); } }); return rabbitTemplate; } }

 

 在RabbitMQ的消息發送組件RabbitTemplate的配置中,我們還特意加入了“消息發送確認”、“消息丟失回調”的輸出配置,即當消息正確進入到隊列后,即代表消息發送成功;當消息找不到對應的隊列(在某種程度上,其實也就是找不到交換機和路由)時,會輸出消息丟失。

(3)完了之后,我們准備開始使用RabbitMQ實現消息的發送和接收。首先,我們需要在RabbitmqConfig配置類中創建隊列、交換機、路由以及綁定等Bean組件,如下所示:

 

//構建異步發送郵箱通知的消息模型
 @Bean public Queue successEmailQueue(){ return new Queue(env.getProperty("mq.kill.item.success.email.queue"),true); } @Bean public TopicExchange successEmailExchange(){ return new TopicExchange(env.getProperty("mq.kill.item.success.email.exchange"),true,false); } @Bean public Binding successEmailBinding(){ return BindingBuilder.bind(successEmailQueue()).to(successEmailExchange()).with(env.getProperty("mq.kill.item.success.email.routing.key")); }

 

其中,環境變量實例env讀取的那些屬性我們是配置在application.properties文件中的,如下所示:

mq.env=test #秒殺成功異步發送郵件的消息模型 mq.kill.item.success.email.queue=${mq.env}.kill.item.success.email.queue mq.kill.item.success.email.exchange=${mq.env}.kill.item.success.email.exchange mq.kill.item.success.email.routing.key=${mq.env}.kill.item.success.email.routing.key

 

緊接着,我們需要在通用的消息發送服務類 RabbitSenderService 中寫一段發送消息的方法,該方法用於接收“訂單編號”參數,然后在數據庫中查詢其對應的詳細訂單記錄,將該記錄充當“消息”並發送至RabbitMQ的隊列中,等待被監聽消費:

/** * RabbitMQ通用的消息發送服務 * @Author:debug (SteadyJack) * @Date: 2019/6/21 21:47 **/ @Service public class RabbitSenderService { public static final Logger log= LoggerFactory.getLogger(RabbitSenderService.class); @Autowired private RabbitTemplate rabbitTemplate; @Autowired private Environment env; @Autowired private ItemKillSuccessMapper itemKillSuccessMapper; //秒殺成功異步發送郵件通知消息
  public void sendKillSuccessEmailMsg(String orderNo){ log.info("秒殺成功異步發送郵件通知消息-准備發送消息:{}",orderNo); try { if (StringUtils.isNotBlank(orderNo)){ KillSuccessUserInfo info=itemKillSuccessMapper.selectByCode(orderNo); if (info!=null){ //TODO:rabbitmq發送消息的邏輯
                  rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); rabbitTemplate.setExchange(env.getProperty("mq.kill.item.success.email.exchange")); rabbitTemplate.setRoutingKey(env.getProperty("mq.kill.item.success.email.routing.key")); //TODO:將info充當消息發送至隊列
                  rabbitTemplate.convertAndSend(info, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { MessageProperties messageProperties=message.getMessageProperties(); messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); messageProperties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,KillSuccessUserInfo.class); return message; } }); } } }catch (Exception e){ log.error("秒殺成功異步發送郵件通知消息-發生異常,消息為:{}",orderNo,e.fillInStackTrace()); } } }

 

(4)最后,是在通用的消息接收服務類RabbitReceiverService中實現消息的接收,其完整的源代碼如下所示:

/** * RabbitMQ通用的消息接收服務 * @Author:debug (SteadyJack) * @Date: 2019/6/21 21:47 **/ @Service public class RabbitReceiverService { public static final Logger log= LoggerFactory.getLogger(RabbitReceiverService.class); @Autowired private MailService mailService; @Autowired private Environment env; @Autowired private ItemKillSuccessMapper itemKillSuccessMapper; //秒殺異步郵件通知-接收消息
  @RabbitListener(queues = {"${mq.kill.item.success.email.queue}"},containerFactory = "singleListenerContainer") public void consumeEmailMsg(KillSuccessUserInfo info){ try { log.info("秒殺異步郵件通知-接收消息:{}",info); //到時候這里將整合郵件服務發送郵件通知消息的邏輯
 }catch (Exception e){ log.error("秒殺異步郵件通知-接收消息-發生異常:",e.fillInStackTrace()); } } }

 

至此,關於SpringBoot整合消息中間件RabbitMQ的代碼實戰,本篇文章就介紹到這里了。

最后一點,我們需要進行測試,即用戶在界面發起“搶購”的請求操作之后,如果能秒殺成功,則RabbitMQ會發送、接收一條消息,如下所示:

好了,關於RabbitMQ的使用,本文到此就暫且告一段落了,在下一篇文章中我們將把它與郵件服務進行整合,實現“用戶秒殺成功后異步發送郵件通知消息給到用戶郵箱”的功能!除此之外,我們還將在后面的篇章介紹“如何使用RabbitMQ的死信隊列,處理用戶下單成功后卻超時未支付的訂單~在那里我們將采取失效的操作”。

補充:

1、目前,這一秒殺系統的整體構建與代碼實戰已經全部完成了,完整的源代碼數據庫地址可以來這里下載:gitee.com/steadyjack/… 記得Fork跟Star啊!!

2、最后,不要忘記了關注一下Debug的技術微信公眾號:

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM