5.1.1、基本測試
1.引 spring-boot-starter-amqp**
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
自動配置原理說明 RabbitAutoConfiguration
1、有自動配置了連接工廠CachingConnectionFactory;獲取與rabbitmq連接信息
@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
@Configuration
@ConditionalOnMissingBean(ConnectionFactory.class)
protected static class RabbitConnectionFactoryCreator {
@Bean
public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties properties,
ObjectProvider<ConnectionNameStrategy> connectionNameStrategy) throws Exception {
...
}
...
2、RabbitProperties封裝了 RabbitMQ的配置
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitProperties {
/** * RabbitMQ host. */
private String host = "localhost";
/** * RabbitMQ port. */
private int port = 5672;
....
application.yml配置
spring:
rabbitmq:
host: xxx.xxx.xxx.xxx
username: guest
password: guest
port: 5672
3、RabbitTemplate:給RabbitMQ發送和接收消息
@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
....
@Configuration
@Import(RabbitConnectionFactoryCreator.class)
protected static class RabbitTemplateConfiguration {
private final RabbitProperties properties;
private final ObjectProvider<MessageConverter> messageConverter;
private final ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers;
public RabbitTemplateConfiguration(RabbitProperties properties,
ObjectProvider<MessageConverter> messageConverter,
ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers) {
this.properties = properties;
this.messageConverter = messageConverter;
this.retryTemplateCustomizers = retryTemplateCustomizers;
}
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnMissingBean(RabbitOperations.class)
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
PropertyMapper map = PropertyMapper.get();
RabbitTemplate template = new RabbitTemplate(connectionFactory);
MessageConverter messageConverter = this.messageConverter.getIfUnique();
if (messageConverter != null) {
template.setMessageConverter(messageConverter);
}
template.setMandatory(determineMandatoryFlag());
RabbitProperties.Template properties = this.properties.getTemplate();
if (properties.getRetry().isEnabled()) {
template.setRetryTemplate(new RetryTemplateFactory(
this.retryTemplateCustomizers.orderedStream().collect(Collectors.toList())).createRetryTemplate(
properties.getRetry(), RabbitRetryTemplateCustomizer.Target.SENDER));
}
map.from(properties::getReceiveTimeout).whenNonNull().as(Duration::toMillis)
.to(template::setReceiveTimeout);
map.from(properties::getReplyTimeout).whenNonNull().as(Duration::toMillis).to(template::setReplyTimeout);
map.from(properties::getExchange).to(template::setExchange);
map.from(properties::getRoutingKey).to(template::setRoutingKey);
map.from(properties::getDefaultReceiveQueue).whenNonNull().to(template::setDefaultReceiveQueue);
return template;
}
}
4、AmqpAdmin:RabbitMQ系統管理組件,用來聲明隊列,交換器等 , 當沒有在網頁端自己創建queue、exchange、Binding時可采用 AmqpAdmin:創建和刪除queue、exchange、Binding
@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
...
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
@ConditionalOnMissingBean
public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
}
2.測試RabbitMQ
1)單波-點對點
@RunWith(SpringRunner.class)
@SpringBootTest
public class Springboot02AmqpApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void contextLoads() {
//message需要自己定義,定義一個消息體內容
//rabbitTemplate.send(exchage,routeKey,message);
//常用的convertAndSend,消息體會自動轉換,object:默認當成消息體,只要傳入要發送的對象,自動序列化Babbitmq
//rabbitTemplate.convertAndSend(exchange,routeKey,object);
Map<String,Object> maps = new HashMap<String,Object>();
maps.put("msg","這是一個消息");
maps.put("data", Arrays.asList("helloworld",123,true));
//對象被默認序列化以后發送出去(jdk)
rabbitTemplate.convertAndSend("exchange.direct","jatpeo.news",new Book("西游記","吳承恩"));
}
//接收數據,如何將數據自動轉為json發送出去?
@Test
public void receive(){
Object o = rabbitTemplate.receiveAndConvert("jatpeo.news");
System.out.println(o.getClass());
System.out.println(o);
}
常用的convertAndSend,消息體會自動轉換,object:默認當成消息體,只要傳入要發送的對象,自動序列化Babbitmq,對象被默認序列化以后發送出去(jdk)
源碼分析:
public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count
implements BeanFactoryAware, RabbitOperations, MessageListener,
ListenerContainerAware, PublisherCallbackChannel.Listener, Lifecycle, BeanNameAware {
private MessageConverter messageConverter = new SimpleMessageConverter();
調用SimpleMessageConverter
public class SimpleMessageConverter extends WhiteListDeserializingMessageConverter implements BeanClassLoaderAware {
content = new String(message.getBody(), encoding);
} catch (UnsupportedEncodingException var8) {
throw new MessageConversionException("failed to convert text-based Message content", var8);
}
} else if (contentType != null && contentType.equals("application/x-java-serialized-object")) {
try {
content = SerializationUtils.deserialize(this.createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl));
} catch (IllegalArgumentException | IllegalStateException | IOException var7) {
throw new MessageConversionException("failed to convert serialized Message content", var7);
}
}
}
自定義MessageConvert
新建MyAMQPConfig
@EnableRabbit//開啟基於注解的RabbitMQ
@Configuration
public class MyAMQPConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
2)廣播
@RunWith(SpringRunner.class)
@SpringBootTest
public class Springboot02AmqpApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
/** * 2、廣播 * * 廣播路由鍵無所謂 * */
@Test
public void Test(){
rabbitTemplate.convertAndSend("exchange.fanout","",new Book("紅樓夢","曹雪芹"));
}
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-TNAhBooF-1571057027287)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\1571056120180.png)]
5.1.2、@RabbitListener和@EnableRabbit
@EnableRabbit + @RabbitListener 監聽消息隊列的內容
@RabbitListener:監聽隊列
@EnableRabbit:開啟基於注解的RabbitMq
@Service
public class BookService {
//只要這個消息隊列收到消息就打印消息,要讓此注解生效要在配置類中開啟注解@EnableRabbit
@RabbitListener(queues = "jatpeo.news")
public void receive(Book book){
System.out.println("收到消息。。。打印");
}
@RabbitListener(queues = "jatpeo")
public void receive02(Message message){
System.out.println(message.getBody());
System.out.println(message.getMessageProperties());
}
}
5.1.3、AmqpAdmin:RabbitMQ
AmqpAdmin:RabbitMQ系統管理組件,用來聲明隊列,交換器等
當沒有在網頁端自己創建queue、exchange、Binding時可采用* AmqpAdmin:創建和刪除queue、exchange、Binding
注入AmqpAdmin
@RunWith(SpringRunner.class)
@SpringBootTest
public class Springboot02AmqpApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
@Autowired
AmqpAdmin amqpAdmin;
@Test
public void createExchange(){
//創建DirectExchange
/* amqpAdmin.declareExchange(new DirectExchange("amqpAdmin.exchange")); System.out.println("創建完成");*/
//創建隊列
//amqpAdmin.declareQueue(new Queue("amqpAdmin.queue",true));
//創建綁定規則
amqpAdmin.declareBinding(new Binding("amqpAdmin.queue", Binding.DestinationType.QUEUE,
"amqpAdmin.exchange","amqpAdmin.haha",null));
}
網頁端查看:
等。