2.2_springboot2.x消息RabbitMQ整合&amqpAdmin管理組件的使用


5.1.1、基本測試

1.引 spring-boot-starter-amqp**

 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

自動配置原理說明 RabbitAutoConfiguration

1、有自動配置了連接工廠CachingConnectionFactory;獲取與rabbitmq連接信息

@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {

	@Configuration
	@ConditionalOnMissingBean(ConnectionFactory.class)
	protected static class RabbitConnectionFactoryCreator {

		@Bean
		public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties properties,
				ObjectProvider<ConnectionNameStrategy> connectionNameStrategy) throws Exception {
            
           ... 
        }
        ...

2、RabbitProperties封裝了 RabbitMQ的配置

@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitProperties {

	/** * RabbitMQ host. */
	private String host = "localhost";

	/** * RabbitMQ port. */
	private int port = 5672;
    ....

application.yml配置

spring:
  rabbitmq:
    host: xxx.xxx.xxx.xxx
    username: guest
    password: guest
    port: 5672

3、RabbitTemplate:給RabbitMQ發送和接收消息

@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
    ....
        
	@Configuration
	@Import(RabbitConnectionFactoryCreator.class)
	protected static class RabbitTemplateConfiguration {

		private final RabbitProperties properties;

		private final ObjectProvider<MessageConverter> messageConverter;

		private final ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers;

		public RabbitTemplateConfiguration(RabbitProperties properties,
				ObjectProvider<MessageConverter> messageConverter,
				ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers) {
			this.properties = properties;
			this.messageConverter = messageConverter;
			this.retryTemplateCustomizers = retryTemplateCustomizers;
		}

		@Bean
		@ConditionalOnSingleCandidate(ConnectionFactory.class)
		@ConditionalOnMissingBean(RabbitOperations.class)
		public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
			PropertyMapper map = PropertyMapper.get();
			RabbitTemplate template = new RabbitTemplate(connectionFactory);
			MessageConverter messageConverter = this.messageConverter.getIfUnique();
			if (messageConverter != null) {
				template.setMessageConverter(messageConverter);
			}
			template.setMandatory(determineMandatoryFlag());
			RabbitProperties.Template properties = this.properties.getTemplate();
			if (properties.getRetry().isEnabled()) {
				template.setRetryTemplate(new RetryTemplateFactory(
						this.retryTemplateCustomizers.orderedStream().collect(Collectors.toList())).createRetryTemplate(
								properties.getRetry(), RabbitRetryTemplateCustomizer.Target.SENDER));
			}
			map.from(properties::getReceiveTimeout).whenNonNull().as(Duration::toMillis)
					.to(template::setReceiveTimeout);
			map.from(properties::getReplyTimeout).whenNonNull().as(Duration::toMillis).to(template::setReplyTimeout);
			map.from(properties::getExchange).to(template::setExchange);
			map.from(properties::getRoutingKey).to(template::setRoutingKey);
			map.from(properties::getDefaultReceiveQueue).whenNonNull().to(template::setDefaultReceiveQueue);
			return template;
		}
}

4、AmqpAdmin:RabbitMQ系統管理組件,用來聲明隊列,交換器等 , 當沒有在網頁端自己創建queue、exchange、Binding時可采用 AmqpAdmin:創建和刪除queue、exchange、Binding

@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {
    
    ...
        @Bean
		@ConditionalOnSingleCandidate(ConnectionFactory.class)
		@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
		@ConditionalOnMissingBean
		public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
			return new RabbitAdmin(connectionFactory);
		}
}

2.測試RabbitMQ

1)單波-點對點

@RunWith(SpringRunner.class)
@SpringBootTest
public class Springboot02AmqpApplicationTests {

    @Autowired
    RabbitTemplate rabbitTemplate;
      @Test
    public void contextLoads() {
        //message需要自己定義,定義一個消息體內容
        //rabbitTemplate.send(exchage,routeKey,message);

        //常用的convertAndSend,消息體會自動轉換,object:默認當成消息體,只要傳入要發送的對象,自動序列化Babbitmq
        //rabbitTemplate.convertAndSend(exchange,routeKey,object);
        Map<String,Object> maps = new HashMap<String,Object>();
        maps.put("msg","這是一個消息");
        maps.put("data", Arrays.asList("helloworld",123,true));
        //對象被默認序列化以后發送出去(jdk)
        rabbitTemplate.convertAndSend("exchange.direct","jatpeo.news",new Book("西游記","吳承恩"));
    }
    //接收數據,如何將數據自動轉為json發送出去?
    @Test
    public void receive(){

        Object o = rabbitTemplate.receiveAndConvert("jatpeo.news");
        System.out.println(o.getClass());
        System.out.println(o);
    }

​ 常用的convertAndSend,消息體會自動轉換,object:默認當成消息體,只要傳入要發送的對象,自動序列化Babbitmq,對象被默認序列化以后發送出去(jdk)

源碼分析:

public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count
		implements BeanFactoryAware, RabbitOperations, MessageListener,
			ListenerContainerAware, PublisherCallbackChannel.Listener, Lifecycle, BeanNameAware {
                
                	private MessageConverter messageConverter = new SimpleMessageConverter();

調用SimpleMessageConverter

public class SimpleMessageConverter extends WhiteListDeserializingMessageConverter implements BeanClassLoaderAware {

      content = new String(message.getBody(), encoding);
                } catch (UnsupportedEncodingException var8) {
                    throw new MessageConversionException("failed to convert text-based Message content", var8);
                }
            } else if (contentType != null && contentType.equals("application/x-java-serialized-object")) {
                try {
                    content = SerializationUtils.deserialize(this.createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl));
                } catch (IllegalArgumentException | IllegalStateException | IOException var7) {
                    throw new MessageConversionException("failed to convert serialized Message content", var7);
                }
            }

}

自定義MessageConvert

新建MyAMQPConfig

@EnableRabbit//開啟基於注解的RabbitMQ
@Configuration
public class MyAMQPConfig {

    @Bean
    public MessageConverter messageConverter(){

        return new Jackson2JsonMessageConverter();
    }




} 

2)廣播

@RunWith(SpringRunner.class)
@SpringBootTest
public class Springboot02AmqpApplicationTests {

    @Autowired
    RabbitTemplate rabbitTemplate;
     /** * 2、廣播 * * 廣播路由鍵無所謂 * */

    @Test
    public void Test(){
        rabbitTemplate.convertAndSend("exchange.fanout","",new Book("紅樓夢","曹雪芹"));

    }

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-TNAhBooF-1571057027287)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\1571056120180.png)]

5.1.2、@RabbitListener和@EnableRabbit

@EnableRabbit + @RabbitListener 監聽消息隊列的內容

@RabbitListener:監聽隊列

@EnableRabbit:開啟基於注解的RabbitMq

@Service
public class BookService {


    //只要這個消息隊列收到消息就打印消息,要讓此注解生效要在配置類中開啟注解@EnableRabbit
    @RabbitListener(queues = "jatpeo.news")
    public void receive(Book book){

        System.out.println("收到消息。。。打印");
    }

    @RabbitListener(queues = "jatpeo")
    public void receive02(Message message){
        System.out.println(message.getBody());
        System.out.println(message.getMessageProperties());


    }
}

5.1.3、AmqpAdmin:RabbitMQ

AmqpAdmin:RabbitMQ系統管理組件,用來聲明隊列,交換器等

當沒有在網頁端自己創建queue、exchange、Binding時可采用* AmqpAdmin:創建和刪除queue、exchange、Binding

注入AmqpAdmin

@RunWith(SpringRunner.class)
@SpringBootTest
public class Springboot02AmqpApplicationTests {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Autowired
    AmqpAdmin amqpAdmin;


    @Test
    public void createExchange(){
        //創建DirectExchange
       /* amqpAdmin.declareExchange(new DirectExchange("amqpAdmin.exchange")); System.out.println("創建完成");*/
        //創建隊列
       //amqpAdmin.declareQueue(new Queue("amqpAdmin.queue",true));

        //創建綁定規則
        amqpAdmin.declareBinding(new Binding("amqpAdmin.queue", Binding.DestinationType.QUEUE,
                "amqpAdmin.exchange","amqpAdmin.haha",null));

    }

網頁端查看:

在這里插入圖片描述

等。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM