Spring boot集成Rabbit MQ使用初體驗
1.rabbit mq基本特性
首先介紹一下rabbitMQ的幾個特性
Asynchronous Messaging
Supports multiple messaging protocols, message queuing, delivery acknowledgement, flexible routing to queues, multiple exchange type.
異步消息
支持多種消息傳遞協議,消息排隊,傳遞確認,靈活路由規則,多種交換類型。這些應該是rabbitmq最核心的特性了。
Developer Experience
Deploy with BOSH, Chef, Docker and Puppet. Develop cross-language messaging with favorite programming languages such as: Java, .NET, PHP, Python, JavaScript, Ruby, Go, and many others.
部署體驗?
與BOSH,Chef,Docker和Puppet一起部署。使用喜歡的編程語言來開發跨語言消息傳遞,例如Java,.NET,PHP,Python,JavaScript,Ruby,Go等。
Distributed Deployment
Deploy as clusters for high availability and throughput; federate across multiple availability zones and regions.
分布式部署
部署為集群以實現高可用性和吞吐量;跨多個可用區域和區域聯合。
Enterprise & Cloud Ready
Pluggable authentication, authorisation, supports TLS and LDAP. Lightweight and easy to deploy in public and private clouds.
企業和雲就緒
可插拔身份驗證,授權,支持TLS和LDAP。輕巧且易於在公共和私有雲中進行部署。
Tools & Plugins
Diverse array of tools and plugins supporting continuous integration, operational metrics, and integration to other enterprise systems. Flexible plug-in approach for extending RabbitMQ functionality.
工具&插件
工具和插件的種類繁多,支持持續集成,運營指標以及與其他企業系統的集成。靈活的插件方法,用於擴展RabbitMQ功能。
Management & Monitoring
HTTP-API, command line tool, and UI for managing and monitoring RabbitMQ.
管理和監控
HTTP-API支持,命令行工具,管理和監控界面。
2.rabbit mq核心概念
①Message
消息,消息就是數據的載體,由消息頭和消息體組成。消息體是不透明的,而消息頭由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵,也就是消息是如何分發給隊列的),priority(相對於其它消息的優先權),delivery-mode(指定是否需要持久化存儲)
②Publisher
消息的生產者,向交換機發布消息的客戶端應用程序。
③Exchange
交換機用來接收生產者發送的消息並將這些消息按照路由規則或者交換機類型路由到指定的隊列。交換機有4種類型:direct(默認),fanout,topic,以及headers,這四種類型支持不同的路由策略。
④Queue
消息隊列,用於保存消息直到發送給消費者,是消息的容器。一個消息可以存入一個或多個隊列,一直到消費者消費這個消息,才會從隊列中刪除。
⑤Binding
綁定,指定交換機和隊列的綁定規則,可以理解為一個過濾器,當路由鍵符合這個綁定規則時,就會將消息發送給隊列。交換機和隊列之間的綁定可以是多對多的關系
⑥Connection
一個TCP連接
⑦Channel
信道,多路復用連接中的一條獨立的雙向數據流通道。信道是建立在真實的TCP連接內的虛擬連接,AMQP 命令都是通過信道發出去的,不管是發布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對於操作系統來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復用一條 TCP 連接。
⑧Consumer
消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。
⑨Virtual Host
虛擬主機,表示一批交換機、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有自己的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在連接時指定,RabbitMQ 默認的 vhost 是 / 。
⑩Broker
表示消息隊列服務器實體。
更詳細的說明請參考官方文檔:https://www.rabbitmq.com/tutorials/amqp-concepts.html
3.交換機類型和消息路由
- Direct Exchange
消息中的路由鍵(routing key)如果和 Binding中的 binding key 一致,交換器就將消息發到對應的隊列中。路由鍵與隊列名完全匹配,如果一個隊列綁定到交換機要求路由鍵為“dog”,則只轉發 routing key 標記為“dog”的消息,不會轉發“dog.puppy”,也不會轉發“dog.guard”等等。它是完全匹配、單播的模式。
- Fanout Exchange
每個發到 fanout 類型交換器的消息都會分到所有綁定的隊列上去。fanout 交換器不處理路由鍵【路由鍵被忽略】,只是簡單的將隊列綁定到交換器上,每個發送到交換器的消息都會被轉發到與該交換器綁定的所有隊列上。很像子網廣播,每台子網內的主機都獲得了一份復制的消息。fanout 類型轉發消息是最快的。
- Topic Exchange
topic 交換器通過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列需要綁定到一個模式上。它將路由鍵和綁定鍵的字符串切分成單詞,這些單詞之間用點隔開。它同樣也會識別兩個通配符:符號“#”和符號“”。
注意#匹配0個或者多個單詞,*匹配一個單詞
4.開始使用
我們先看spring boot的官方文檔是怎么說的吧。
首先,添加這些配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /test
rabbitmq默認用戶名密碼為guest:guest
先上配置類,RabbitTemplate使用自動配置好的,自動注入進來就可以了,我們還需要配置一個RabbitAdmin對象,RabbitAdmin有兩個構造方法
/**
* Construct an instance using the provided {@link ConnectionFactory}.
* @param connectionFactory the connection factory - must not be null.
*/
public RabbitAdmin(ConnectionFactory connectionFactory) {
Assert.notNull(connectionFactory, "ConnectionFactory must not be null");
this.connectionFactory = connectionFactory;
this.rabbitTemplate = new RabbitTemplate(connectionFactory);
}
/**
* Construct an instance using the provided {@link RabbitTemplate}. Use this
* constructor when, for example, you want the admin operations to be performed within
* the scope of the provided template's {@code invoke()} method.
* @param rabbitTemplate the template - must not be null and must have a connection
* factory.
* @since 2.0
*/
public RabbitAdmin(RabbitTemplate rabbitTemplate) {
Assert.notNull(rabbitTemplate, "RabbitTemplate must not be null");
Assert.notNull(rabbitTemplate.getConnectionFactory(), "RabbitTemplate's ConnectionFactory must not be null");
this.connectionFactory = rabbitTemplate.getConnectionFactory();
this.rabbitTemplate = rabbitTemplate;
}
但實際看他們的構造函數,發現如果我們不需要自己定制RabbitTemplate,直接使用第一個構造方法即可。
@Configuration
public class RabbitMqConfig {
@Autowired
RabbitTemplate rabbitTemplate;
@Bean
public RabbitAdmin rabbitAdmin() {
return new RabbitAdmin(rabbitTemplate);
}
}
類似這樣,就配置好了。
接下來寫一個測試類,測試聲明交換機,隊列,以及發送消息和接收消息等操作。
首先是聲明交換機類型,四種交換機對應的構造方法如下
//參數列表分別是:1.交換器名稱,2.是否持久化,3.是否自動刪除【指的是當最后一個與它綁定的隊列刪除時,是否自動刪除該交換機】
TopicExchange topicExchange = new TopicExchange("default.topic", true, false);
DirectExchange directExchange = new DirectExchange("default.direct", true, false);
FanoutExchange fanoutExchange = new FanoutExchange("default.fanout", true, false);
HeadersExchange headersExchange = new HeadersExchange("default.headers", true, false);
rabbitAdmin.declareExchange(topicExchange);
rabbitAdmin.declareExchange(directExchange);
rabbitAdmin.declareExchange(fanoutExchange);
rabbitAdmin.declareExchange(headersExchange);
然后是聲明隊列
//1.隊列名稱,2.聲明一個持久隊列,3.聲明一個獨立隊列,4.是否自動刪除隊列
Queue queue1 = new Queue("queue1", true, false, false);
Queue queue2 = new Queue("queue2", true, false, false);
Queue queue3 = new Queue("queue3", true, false, false);
Queue queue4 = new Queue("queue4", true, false, false);
rabbitAdmin.declareQueue(queue1);
rabbitAdmin.declareQueue(queue2);
rabbitAdmin.declareQueue(queue3);
rabbitAdmin.declareQueue(queue4);
然后把隊列和交換機相互綁定
//1.queue:綁定的隊列,2.topicExchange:綁定到那個交換器,3.test.send.topic:綁定的路由名稱[routing key]
rabbitAdmin.declareBinding(BindingBuilder.bind(queue1).to(fanoutExchange));
rabbitAdmin.declareBinding(BindingBuilder.bind(queue2).to(fanoutExchange));
rabbitAdmin.declareBinding(BindingBuilder.bind(queue3).to(topicExchange).with("mq.*"));
rabbitAdmin.declareBinding(BindingBuilder.bind(queue4).to(directExchange).with("mq.direct"));
因為fanout類型的交換機忽略routing key屬性,所以不需要設置。
完整測試代碼如下
@SpringBootTest
@RunWith(SpringJUnit4ClassRunner.class)
public class RabbitMqTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RabbitAdmin rabbitAdmin;
@Test
public void testDeclare() {
//參數列表分別是:1.交換器名稱,2.是否持久化,3.是否自動刪除【指的是當最后一個與它綁定的隊列刪除時,是否自動刪除該交換機】
TopicExchange topicExchange = new TopicExchange("default.topic", true, false);
DirectExchange directExchange = new DirectExchange("default.direct", true, false);
FanoutExchange fanoutExchange = new FanoutExchange("default.fanout", true, false);
HeadersExchange headersExchange = new HeadersExchange("default.headers", true, false);
rabbitAdmin.declareExchange(topicExchange);
rabbitAdmin.declareExchange(directExchange);
rabbitAdmin.declareExchange(fanoutExchange);
rabbitAdmin.declareExchange(headersExchange);
//1.隊列名稱,2.聲明一個持久隊列,3.聲明一個獨立隊列,4.是否自動刪除隊列
Queue queue1 = new Queue("queue1", true, false, false);
Queue queue2 = new Queue("queue2", true, false, false);
Queue queue3 = new Queue("queue3", true, false, false);
Queue queue4 = new Queue("queue4", true, false, false);
rabbitAdmin.declareQueue(queue1);
rabbitAdmin.declareQueue(queue2);
rabbitAdmin.declareQueue(queue3);
rabbitAdmin.declareQueue(queue4);
//1.queue:綁定的隊列,2.topicExchange:綁定到那個交換器,3.test.send.topic:綁定的路由名稱[routing key]
rabbitAdmin.declareBinding(BindingBuilder.bind(queue1).to(fanoutExchange));
rabbitAdmin.declareBinding(BindingBuilder.bind(queue2).to(fanoutExchange));
rabbitAdmin.declareBinding(BindingBuilder.bind(queue3).to(topicExchange).with("mq.*"));
rabbitAdmin.declareBinding(BindingBuilder.bind(queue4).to(directExchange).with("mq.direct"));
}
}
運行結果如下:
再看一下綁定情況:
Direct交換機
Fanout交換機
Topic交換機
全都測試成功,接下來就可以開始發送消息了。
發送消息有多個API可用,這里選擇高亮的那個API,實際還有send方法可用,不過需要自己來構建消息
@Test
public void testSendMessage() {
//1.交換機,2.路由鍵,3.發送的消息體【這里的消息體會自動轉換為消息,也可以自己構建消息對象】
rabbitTemplate.convertAndSend("default.topic","mq.whatever.this.is",new Student(1,"mmp","male",234));
}
測試結果如下:
一定要注意topic類型的交換機的路由鍵的匹配規則,#匹配0個或者多個單詞,*匹配一個單詞
那如果不想每次都是在測試類里面創建交換機和隊列,可以怎么做呢?可以在程序入口類里面實現CommandLineRunner接口,代碼如下,這樣的話,每次啟動都會聲明一次,當然重復聲明不會報錯,但會覆蓋之前的聲明,比如說之前聲明的時候定義的routing key可能就會被覆蓋。
@SpringBootApplication
@EnableRabbit
public class AmqpApplication implements CommandLineRunner {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RabbitAdmin rabbitAdmin;
public static void main(String[] args) {
SpringApplication.run(AmqpApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
//參數列表分別是:1.交換器名稱,2.是否持久化,3.是否自動刪除【指的是當最后一個與它綁定的隊列刪除時,是否自動刪除該交換機】
TopicExchange topicExchange = new TopicExchange("default.topic", true, false);
DirectExchange directExchange = new DirectExchange("default.direct", true, false);
FanoutExchange fanoutExchange = new FanoutExchange("default.fanout", true, false);
HeadersExchange headersExchange = new HeadersExchange("default.headers", true, false);
rabbitAdmin.declareExchange(topicExchange);
rabbitAdmin.declareExchange(directExchange);
rabbitAdmin.declareExchange(fanoutExchange);
rabbitAdmin.declareExchange(headersExchange);
//1.隊列名稱,2.聲明一個持久隊列,3.聲明一個獨立隊列,4.是否自動刪除隊列
Queue queue1 = new Queue("queue1", true, false, false);
Queue queue2 = new Queue("queue2", true, false, false);
Queue queue3 = new Queue("queue3", true, false, false);
Queue queue4 = new Queue("queue4", true, false, false);
rabbitAdmin.declareQueue(queue1);
rabbitAdmin.declareQueue(queue2);
rabbitAdmin.declareQueue(queue3);
rabbitAdmin.declareQueue(queue4);
//1.queue:綁定的隊列,2.topicExchange:綁定到那個交換器,3.test.send.topic:綁定的路由名稱[routing key]
rabbitAdmin.declareBinding(BindingBuilder.bind(queue1).to(fanoutExchange));
rabbitAdmin.declareBinding(BindingBuilder.bind(queue2).to(fanoutExchange));
rabbitAdmin.declareBinding(BindingBuilder.bind(queue3).to(topicExchange).with("mq.*"));
rabbitAdmin.declareBinding(BindingBuilder.bind(queue4).to(directExchange).with("mq.direct"));
}
}
但其實這樣做還是比較復雜的,而且完全沒有必要,更加簡單方便的做法是,把那些配置聲明的對象直接添加到IOC容器中,讓spring自動的去調用相應的聲明方法,真是縱享絲滑呀,類似下面這樣子:
@Bean
public Queue Queue() {
return new Queue("hello");
}
繼續測試接收消息,有一個注解很方便。
@Service
public class ReceiverService {
@RabbitListener(queues = {"queue3"})
public void receive(Student student) {
System.out.println("接收到消息並打印:"+student);
}
}
測試結果如下:
接收到消息並打印:student{id=1, name='mmp', gender='male', age=234}
也可以直接使用方法接收消息
@Test
public void testReceive() {
Student student = (Student) rabbitTemplate.receiveAndConvert("queue3");
System.out.println(student);
}
測試結果如下:
student{id=1, name='mmp', gender='male', age=234}
如果想讓發送的學生對象使用JSON格式怎么辦呢?
需要定制一下:
@Configuration
public class RabbitMqConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
@Bean
public RabbitAdmin rabbitAdmin(RabbitTemplate rabbitTemplate) {
return new RabbitAdmin(rabbitTemplate);
}
}
測試一下:
源碼地址:https://github.com/lingEric/springboot-integration-hello
更多官方tutorials請移步https://github.com/rabbitmq/rabbitmq-tutorials