spring boot 整合 RabbitMq (注解)


1、增加rabbitmq的依賴包

<!-- ampq 依賴包 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>

 

2、application.yaml文件中配置

spring:
  rabbitmq:
      host: localhost
      port: 5672
      username: admin
      password: admin
      publisher-confirms: true
      virtual-host: /

 

3、RabbitMq的工廠連接和模板創建

@Configuration
public class RabbitConfig
{
    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.port}")
    private String port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.publisher-confirms}")
    private Boolean publisherConfirms;

    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    //創建工廠連接
    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(this.host);
        connectionFactory.setUsername(this.username);
        connectionFactory.setPassword(this.password);
        connectionFactory.setVirtualHost(this.virtualHost);
        connectionFactory.setPublisherConfirms(this.publisherConfirms); //必須要設置
        return connectionFactory;
    }

    //rabbitmq的模板配置
    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) //必須是prototype類型
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(this.connectionFactory());
        //template.setConfirmCallback(); 設置消息確認
        //template.setReturnCallback();
        return template;
    }
}

 

4、創建交換機、創建隊列、綁定交換機和隊列

@Configuration
public class RabbitExchangeConfig
{
    //直連交換機
    @Bean
    public DirectExchange defaultExchange() {
        return new DirectExchange(RabbitConstant.EXCHANGE_NAME); 
    }

    //隊列
    @Bean
    public Queue queue() {
        return QueueBuilder.durable(RabbitConstant.QUEUE_NAME).build();
    }

    //綁定
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(this.queue()).to(this.defaultExchange()).with(RabbitConstant.ROUTING_KEY);
    }
}

 

5、消費者

@Component
public class RabbitSender implements RabbitTemplate.ConfirmCallback
{
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMsg(String content) {
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        this.rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果為單例的話,那回調就是最后設置的內容
        this.rabbitTemplate.convertAndSend(RabbitConstant.EXCHANGE_NAME,
                RabbitConstant.ROUTING_KEY, content, correlationId);
    }

    //回調
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println(" 回調id:" + correlationData);
        if (ack) {
            System.out.println("消息成功消費");
        } else {
            System.out.println("消息消費失敗:" + cause);
        }
    }
}

 

6、消費者

@Configuration
public class RabbitReceive
{
    @Autowired
    private RabbitConfig rabbitConfig;

    @Autowired
    private RabbitExchangeConfig rabbitExchangeConfig;

    @Bean
    public SimpleMessageListenerContainer messageContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(rabbitConfig.connectionFactory());
        container.setQueues(rabbitExchangeConfig.queue()); //設置要監聽的隊列
        container.setExposeListenerChannel(true);
        container.setMaxConcurrentConsumers(1);
        container.setConcurrentConsumers(1);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設置確認模式手工確認
        container.setMessageListener(new ChannelAwareMessageListener() {
                @Override
                public void onMessage(Message message, Channel channel) throws Exception {
                    byte[] body = message.getBody();
                    System.out.println("receive msg : " + new String(body));
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認消息成功消費
            }
        });
        return container;
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM