1. JMS和AMQP
- JMS(Java Message Service):
- ActiveMQ是JMS實現;
- AMQP(Advanced Message Queuing Protocol)
- 兼容JMS
- RabbitMQ是AMQP的實現
2. RabbitMQ 簡介
Message
:由消息頭和消息體組成,消息體是不透明的,而消息頭則由一系列的可選屬性組成;Publisher
:一個向交換器發布消息的客戶端應用程序;Exchange
:用來接收生產者發送的消息並將這些消息路由給服務器中的隊列;- 有四種類型:direct(默認),fanout,topic和headers;
Queue
:用來保存消息直到發送給消費者,是消息的容器;Binding
:用於消息隊列和交換器之間的關聯;Connection
:網絡連接,比如一個TCP連接;Channel
:多路復用連接中的一條獨立的雙向數據流通道;Consumer
:消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序;Virtual Host
:虛擬主機,表示一批交換器,消息隊列和相關對象;每個vhost本質上就是一個mini版的RabbitMQ服務器;Broker
:表示消息隊列服務器實體;
3. RabbitMQ 整合(SpringBoot)
- 自動配置:
RabbitAutoConfiguration
- 自動配置了連接工廠
ConnectionFactory
; RabbitProperties
封裝了RabbitMQ的配置;RabbitTemplate
:給RabbitMQ發送和接收消息;AmpqAdmin
:RabbitMQ系統管理功能組件;@EnableRabbit
:開啟基於注解的RabbitMQ模式;@EnableRabbit
和@RabbitListener
用於監聽消息隊列的內容;
// application.properties 配置文件
spring.rabbitmq.host=localhost
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
// 測試類
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests{
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void contextLoads(){
// 點對點消息
// Message 需要自己構造一個,定義消息體內容和消息頭
// rabbitTemplate.send(exchange, routeKey, message);
// rabbitTemplate.convertAndSend(exchange, routeKey, object)
// 只需要傳入要發送的對象, 會自動序列化發送給rabbitmq, object 默認當成消息體
Map<String, Object> map = new HashMap<>();
map.put("msg","匆匆的我來了...");
map.put("data",Arrays.asList("777477",232,true));
rabbitTemplate.convertAndSend("exchange.direct", "atnoodles.news",map);
}
// 接收消息
@Test
public void receive(){
Object o = rabbitTemplate.receiveAndConvert("atnoodles.news");
System.out.println(o.getClass()); // Class java.util.HashMap
System.our.println(o);
}
}
// 如果需要將發送的數據自動轉換為JSON,發送出去
// com.noodles.springboot.rabbitmq.config.MyAMQPConfig.java
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MyAMQPConfig{
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
4. AmpqAdmin
- 創建和刪除 Queue, Exchange, Binding
// 測試類
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests{
@Autowired
AmqpAdmin amqpAdmin;
@Test
public void createExchange(){
// 創建 Exchange
amqpAdmin.declareExchange(new DirectExchange("amqpAdmin.exchange"));
System.out.println("創建完成...");
// 創建 Queue
amqpAdmin.declareQueue(new Queue("amqpAdmin.queue", true));
}
}
參考資料: