SpringBoot整合RabbitMq


1. 大多應用中,可通過消息服務中間件來提升系統異步通信、擴展解耦能力
2. 消息服務中兩個重要概念:
消息代理(message broker)和目的地(destination)
當消息發送者發送消息以后,將由消息代理接管,消息代理保證消息傳遞到指定目
的地。
3. 消息隊列主要有兩種形式的目的地
1. 隊列(queue):點對點消息通信(point-to-point)
2. 主題(topic):發布(publish)/訂閱(subscribe)消息通信
 
異步處理:

 

 應用解耦:

 

 流量削峰

 

 

4. 點對點式:
消息發送者發送消息,消息代理將其放入一個隊列中,消息接收者從隊列中獲取消息內容,
消息讀取后被移出隊列
消息只有唯一的發送者和接受者,但並不是說只能有一個接收者
5. 發布訂閱式:
發送者(發布者)發送消息到主題,多個接收者(訂閱者)監聽(訂閱)這個主題,那么
就會在消息到達時同時收到消息
6. JMS(Java Message Service)JAVA消息服務:
基於JVM消息代理的規范。ActiveMQ、HornetMQ是JMS實現
7. AMQP(Advanced Message Queuing Protocol)
高級消息隊列協議,也是一個消息代理的規范,兼容JMS
RabbitMQ是AMQP的實現

 

8. Spring支持
spring-jms提供了對JMS的支持
spring-rabbit提供了對AMQP的支持
需要ConnectionFactory的實現來連接消息代理
提供JmsTemplate、RabbitTemplate來發送消息
@JmsListener(JMS)、@RabbitListener(AMQP)注解在方法上監聽消息代理發
布的消息
@EnableJms、@EnableRabbit開啟支持
9. Spring Boot自動配置
JmsAutoConfiguration
RabbitAutoConfiguration
二、RabbitMQ簡介
RabbitMQ簡介:
RabbitMQ是一個由erlang開發的AMQP(Advanved Message Queue Protocol)的開源實現。
核心概念
Message
消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組
成,這些屬性包括routing-key(路由鍵)、priority(相對於其他消息的優先權)、delivery-mode(指出
該消息可能需要持久性存儲)等。
Publisher
消息的生產者,也是一個向交換器發布消息的客戶端應用程序。
Exchange
交換器,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列。
Exchange有4種類型:direct(默認),fanout, topic, 和headers,不同類型的Exchange轉發消息的策略有
所區別Queue
消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息
可投入一個或多個隊列。消息一直在隊列里面,等待消費者連接到這個隊列將其取走。
Binding
綁定,用於消息隊列和交換器之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列連
接起來的路由規則,所以可以將交換器理解成一個由綁定構成的路由表。
Exchange 和Queue的綁定可以是多對多的關系。
Connection
網絡連接,比如一個TCP連接。
Channel
信道,多路復用連接中的一條獨立的雙向數據流通道。信道是建立在真實的TCP連接內的虛
擬連接,AMQP 命令都是通過信道發出去的,不管是發布消息、訂閱隊列還是接收消息,這
些動作都是通過信道完成。因為對於操作系統來說建立和銷毀 TCP 都是非常昂貴的開銷,所
以引入了信道的概念,以復用一條 TCP 連接。Consumer
消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。
Virtual Host
虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加
密環境的獨立服務器域。每個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有
自己的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在連接時指定,
RabbitMQ 默認的 vhost 是 / 。
Broker
表示消息隊列服務器實體

 

 

三、RabbitMQ運行機制
AMQP 中的消息路由
• AMQP 中消息的路由過程和 Java 開發者熟悉的 JMS 存在一些差別,AMQP 中增加了
Exchange 和 Binding 的角色。生產者把消息發布到 Exchange 上,消息最終到達隊列並被
消費者接收,而 Binding 決定交換器的消息應該發送到那個隊列。

 

 

Exchange 類型
• Exchange分發消息時根據類型的不同分發策略有區別,目前共四種類型:
direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header
而不是路由鍵, headers 交換器和 direct 交換器完全一致,但性能差很多,
目前幾乎用不到了,所以直接看另外三種類型:
消息中的路由鍵(routing key)如果和 Binding 中的 binding
key 一致, 交換器就將消息發到對應的隊列中。路由鍵與隊
列名完全匹配,如果一個隊列綁定到交換機要求路由鍵為
“dog”,則只轉發 routing key 標記為“dog”的消息,不會轉
發“dog.puppy”,也不會轉發“dog.guard”等等。它是完全
匹配、單播的模式
 
 
 
每個發到 fanout 類型交換器的消息都會分到所有綁定的隊列上去。fanout 交換器不處理路由鍵,
只是簡單的將隊列綁定到交換器上,每個發送到交換器的消息都會被轉發到與該交換器綁定
的所有隊列上。很像子網廣播,每台子網內的主機都獲得了一份復制的消息。fanout 類型轉發消息是最快的。 

 
topic 交換器通過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列需要綁定到一個模式上。它將路由鍵和綁定鍵
的字符串切分成單詞,這些單詞之間用點隔開。它同樣也會識別兩個通配符:符號“#”和符號“*”。#匹配0個或多個單詞,*匹配一個單詞。
四、RabbitMQ整合
1. 引入 spring-boot-starter-amqp
2. application.yml配置
3. 測試RabbitMQ
1. AmqpAdmin:管理組件
2. RabbitTemplate:消息發送處理組件
 

管理台網址: http://192.168.0.108:15672/    賬號密碼都是guest
application.prpperties
spring.rabbitmq.host=192.168.0.106
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#spring.rabbitmq.virtual-host=
service
@Service
public class BookService {

@RabbitListener(queues = "atguigu.news")
public void receive(Book book){
System.out.println("收到消息:"+book);
}

@RabbitListener(queues = "atguigu")
public void receive02(Message message){
System.out.println(message.getBody());
System.out.println(message.getMessageProperties());
}
}
bean
public class Book {
private String bookName;
private String author;
}
config
@Configuration
public class MyAMQPConfig {

@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
啟動類
**
* 自動配置
* 1、RabbitAutoConfiguration
* 2、有自動配置了連接工廠ConnectionFactory;
* 3、RabbitProperties 封裝了 RabbitMQ的配置
* 4、 RabbitTemplate :給RabbitMQ發送和接受消息;
* 5、 AmqpAdmin : RabbitMQ系統管理功能組件;
* AmqpAdmin:創建和刪除 Queue,Exchange,Binding
* 6、@EnableRabbit + @RabbitListener 監聽消息隊列的內容
*
*/
@EnableRabbit
@SpringBootApplication
public class SpringbootrabbitmqApplication {

public static void main(String[] args) {
SpringApplication.run(SpringbootrabbitmqApplication.class, args);
}

}
測試啟動類
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringbootrabbitmqApplicationTests {

@Autowired
RabbitTemplate rabbitTemplate;

@Autowired
AmqpAdmin amqpAdmin;

@Test
public void createExchange(){

// amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange"));
// System.out.println("創建完成");

// amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true));
// 創建綁定規則

amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE,
"amqpadmin.exchange","amqp.haha",null));

}

/**
* 1、單播(點對點)
*/
@Test
public void contextLoads() {
//Message需要自己構造一個;定義消息體內容和消息頭
//rabbitTemplate.send(exchage,routeKey,message);

//object默認當成消息體,只需要傳入要發送的對象,自動序列化發送給rabbitmq;
//rabbitTemplate.convertAndSend(exchage,routeKey,object);
Map<String,Object> map = new HashMap<>();
map.put("msg","這是第一個消息");
map.put("data", Arrays.asList("helloworld",123,true));
//對象被默認序列化以后發送出去
//rabbitTemplate.convertAndSend("exchange.direct","atguigu.news",map);
rabbitTemplate.convertAndSend("exchange.direct","atguigu.news",new com.hourui.bean.Book("西游記22","吳承恩22"));

}

//接受數據,如何將數據自動的轉為json發送出去
@Test
public void receive(){
Object o = rabbitTemplate.receiveAndConvert("atguigu.news");
System.out.println(o.getClass());
System.out.println(o);
}

/**
* 廣播
*/
@Test
public void sendMsg(){
rabbitTemplate.convertAndSend("exchange.fanout","",new com.hourui.bean.Book("全職法師","莫凡"));
}

}

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM