RabbitMQ是一個在AMQP基礎上完成的,可復用的企業消息系統。他遵循Mozilla Public License開源協議。RabbitMQ是流行的開源消息隊列系統,用erlang語言開發。RabbitMQ是AMQP(高級消息隊列協議)的標准實現。
消息中間件的工作過程可以用生產者消費者模型來表示.即,生產者不斷的向消息隊列發送信息,而消費者從消息隊列中消費信息.
如果你還沒有安裝rabbitmq的,可以看看這篇《centos安裝MQ》
不說了不說了,來一張圖直截了當的看看MQ工作的具體過程:
開局一張圖 故事全靠編.從上圖可看出,對於消息隊列來說,生產者,消息隊列,消費者是最重要的三個概念,生產者發消息到消息隊列中去,消費者監聽指定的消息隊列,並且當消息隊列收到消息之后,接收消息隊列傳來的消息,並且給予相應的處理.消息隊列常用於分布式系統之間互相信息的傳遞.
v基礎概念
對於RabbitMQ來說,除了這三個基本模塊以外,還添加了一個模塊,即交換機(Exchange).它使得生產者和消息隊列之間產生了隔離,生產者將消息發送給交換機,而交換機則根據調度策略把相應的消息轉發給對應的消息隊列.那么RabitMQ的工作流程如下所示:
關於rabbitmq幾個基礎名詞的介紹:
交換機的主要作用是接收相應的消息並且綁定到指定的隊列.交換機有四種類型,分別為Direct,topic,headers,Fanout:
v實戰演練
注:若是現有工程引入MQ,則添加Maven引用。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
這里我們延續之前springboot系列博文中的例子hellospringboot,在已有項目中添加mq的Maven引用。
在application.properties文件當中引入RabbitMQ基本的配置信息
# ----- MQ -------- #
spring.rabbitmq.host=192.168.11.108
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
package com.demo.mq.model; import java.io.Serializable; import java.util.UUID; /** * Created by toutou on 2019/1/1. */ public class MyModel implements Serializable { private static final long serialVersionUID = 1L; private UUID id; private String info; public UUID getId() { return id; } public void setId(UUID id) { this.id = id; } public String getInfo() { return info; } public void setInfo(String info) { this.info = info; } }
package com.demo.mq.common; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; /** * Created by toutou on 2019/1/1. */ @Configuration public class RabbitConfig { @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}") private int port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; public static final String EXCHANGE_A = "my-mq-exchange_A"; public static final String EXCHANGE_B = "my-mq-exchange_B"; public static final String QUEUE_A = "QUEUE_A"; public static final String QUEUE_B = "QUEUE_B"; public static final String ROUTINGKEY_A = "spring-boot-routingKey_A"; public static final String ROUTINGKEY_B = "spring-boot-routingKey_B"; @Bean public CachingConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); return connectionFactory; } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template; } /** * 針對消費者配置 * 1. 設置交換機類型 * 2. 將隊列綁定到交換機 FanoutExchange: 將消息分發到所有的綁定隊列,無routingkey的概念 HeadersExchange :通過添加屬性key-value匹配 DirectExchange:按照routingkey分發到指定隊列 TopicExchange:多關鍵字匹配 */ @Bean public DirectExchange defaultExchange() { return new DirectExchange(EXCHANGE_A); } /** * 獲取隊列A * @return */ @Bean public Queue queueA() { return new Queue(QUEUE_A, true); //隊列持久 } /** * 獲取隊列B * @return */ @Bean public Queue queueB() { return new Queue(QUEUE_B, true); //隊列持久 } /** * 把交換機,隊列,通過路由關鍵字進行綁定 * @return */ @Bean public Binding binding() { return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A); } /** * 一個交換機可以綁定多個消息隊列,也就是消息通過一個交換機,可以分發到不同的隊列當中去。 * @return */ @Bean public Binding bindingB(){ return BindingBuilder.bind(queueB()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_B); } }
package com.demo.mq.producer; import com.demo.mq.common.RabbitConfig; import com.demo.mq.model.MyModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * Created by toutou on 2019/1/1. */ @Component public class MyProducer implements RabbitTemplate.ConfirmCallback { private final Logger logger = LoggerFactory.getLogger(this.getClass()); //由於rabbitTemplate的scope屬性設置為ConfigurableBeanFactory.SCOPE_PROTOTYPE,所以不能自動注入 private RabbitTemplate rabbitTemplate; /** * 構造方法注入rabbitTemplate */ @Autowired public MyProducer(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果為單例的話,那回調就是最后設置的內容 } public void sendMsg(MyModel model) { //把消息放入ROUTINGKEY_A對應的隊列當中去,對應的是隊列A rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, model); } /** * 回調 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { logger.info(" 回調id:" + correlationData); if (ack) { logger.info("消息成功消費"); } else { logger.info("消息消費失敗:" + cause); } } }
package com.demo.mq.receiver; import com.demo.mq.common.RabbitConfig; import com.demo.mq.model.MyModel; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * Created by toutou on 2019/1/1. */ @Component @RabbitListener(queues = RabbitConfig.QUEUE_A) public class MyReceiver { @RabbitHandler public void process(MyModel model) { System.out.println("接收處理隊列A當中的消息: " + model.getInfo()); } }
package com.demo.controller; import com.demo.mq.model.MyModel; import com.demo.mq.producer.MyProducer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.util.UUID; /** * Created by toutou on 2019/1/1. */ @RestController @Slf4j public class MyMQController { @Autowired MyProducer myProducers; @GetMapping("/mq/producer") public String myProducer(String content){ MyModel model = new MyModel(); model.setId(UUID.randomUUID()); model.setInfo(content); myProducers.sendMsg(model); return "已發送:" + content; } }
2.9.1 在頁面中請求http://localhost:8081/mq/producer?content=hello rabbitmq
2.9.2 查看http://ip:15672/#/queues的變化
關於RabbitMQ Management有疑問的,可以看上篇博文。《淺談RabbitMQ Management》。
2.9.3 查看消費者日志記錄
這樣一個完整的rabbitmq實例就有了。
v源碼地址
https://github.com/toutouge/javademosecond/tree/master/hellospringboot
作 者:請叫我頭頭哥
出 處:http://www.cnblogs.com/toutou/
關於作者:專注於基礎平台的項目開發。如有問題或建議,請多多賜教!
版權聲明:本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文鏈接。
特此聲明:所有評論和私信都會在第一時間回復。也歡迎園子的大大們指正錯誤,共同進步。或者直接私信我
聲援博主:如果您覺得文章對您有幫助,可以點擊文章右下角【推薦】一下。您的鼓勵是作者堅持原創和持續寫作的最大動力!