Spring boot + RabbitMQ


本篇主要講述Spring Boot與RabbitMQ的整合,內容非常簡單,純API的調用操作。 操作之間需要加入依賴Jar

 

			<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>

 

消息生產者

不論是創建消息消費者或生產者都需要ConnectionFactory
 
 

ConnectionFactory配置

創建AmqpConfig文件AmqpConfig.java(后期的配置都在該文件中)
 
@Configuration
public class AmqpConfig {

    public static final String EXCHANGE   = "spring-boot-exchange";
    public static final String ROUTINGKEY = "spring-boot-routingKey";

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("127.0.0.1:5672");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true); //必須要設置
        return connectionFactory;
    }
}

這里需要顯示調用
 connectionFactory.setPublisherConfirms(true);
才能進行消息的回調。
 
 

RabbitTemplate

通過使用RabbitTemplate來對開發者提供API操作
 
    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    //必須是prototype類型
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }
這里設置為原型,具體的原因在后面會講到
 
  在發送消息時通過調用RabbitTemplate中的如下方法
public void convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData)
  • exchange:交換機名稱
  • routingKey:路由關鍵字

  • object:發送的消息內容

  • correlationData:消息ID

 
因此生產者代碼詳單簡潔

Send.java

@Component
public class Send  {

    private RabbitTemplate rabbitTemplate;

    /**
     * 構造方法注入
     */
    @Autowired
    public Send(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendMsg(String content) {
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId);
    }

     
}

 

如果需要在生產者中添加消息消費后的回調,需要對rabbitTemplate設置ConfirmCallback對象,由於不同的生產者需要對應不同的ConfirmCallback,如果rabbitTemplate設置為單例bean,則所有的rabbitTemplate

實際的ConfirmCallback為最后一次申明的ConfirmCallback。

下面給出完整的生產者代碼:

 

package com.u51.lkl.springboot.amqp;

import java.util.UUID;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 消息生產者
 * 
 * @author liaokailin
 * @version $Id: Send.java, v 0.1 2015年11月01日 下午4:22:25 liaokailin Exp $
 */
@Component
public class Send implements RabbitTemplate.ConfirmCallback {

    private RabbitTemplate rabbitTemplate;

    /**
     * 構造方法注入
     */
    @Autowired
    public Send(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果為單例的話,那回調就是最后設置的內容
    }

    public void sendMsg(String content) {
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId);
    }

    /**
     * 回調
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println(" 回調id:" + correlationData);
        if (ack) {
            System.out.println("消息成功消費");
        } else {
            System.out.println("消息消費失敗:" + cause);
        }
    }

}

 

消息消費者

消費者負責申明交換機(生產者也可以申明)、隊列、兩者的綁定操作。

交換機

/**
     * 針對消費者配置
        FanoutExchange: 將消息分發到所有的綁定隊列,無routingkey的概念
        HeadersExchange :通過添加屬性key-value匹配
        DirectExchange:按照routingkey分發到指定隊列
        TopicExchange:多關鍵字匹配
     */
    @Bean
    public DirectExchange defaultExchange() {
        return new DirectExchange(EXCHANGE);
    }

在Spring Boot中交換機繼承AbstractExchange類
 

 

隊列

 
@Bean
    public Queue queue() {
        return new Queue("spring-boot-queue", true); //隊列持久

    }

綁定

  @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);
    }
完成以上工作后,在spring boot中通過消息監聽容器實現消息的監聽,在消息到來時執行回調操作。
 

消息消費

  @Bean
    public SimpleMessageListenerContainer messageContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
        container.setQueues(queue());
        container.setExposeListenerChannel(true);
        container.setMaxConcurrentConsumers(1);
        container.setConcurrentConsumers(1);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設置確認模式手工確認
        container.setMessageListener(new ChannelAwareMessageListener() {

            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                byte[] body = message.getBody();
                System.out.println("receive msg : " + new String(body));
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認消息成功消費
            }
        });
        return container;
    }

下面給出完整的配置文件:
 
package com.u51.lkl.springboot.amqp;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

import com.rabbitmq.client.Channel;

/**
 * Qmqp Rabbitmq
 * 
 * http://docs.spring.io/spring-amqp/docs/1.4.5.RELEASE/reference/html/
 * 
 * @author lkl
 * @version $Id: AmqpConfig.java, v 0.1 2015年11月01日 下午2:05:37 lkl Exp $
 */

@Configuration
public class AmqpConfig {

    public static final String EXCHANGE   = "spring-boot-exchange";
    public static final String ROUTINGKEY = "spring-boot-routingKey";

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("127.0.0.1:5672");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true); //必須要設置
        return connectionFactory;
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    //必須是prototype類型
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }

    /**
     * 針對消費者配置
     * 1. 設置交換機類型
     * 2. 將隊列綁定到交換機
     * 
     * 
        FanoutExchange: 將消息分發到所有的綁定隊列,無routingkey的概念
        HeadersExchange :通過添加屬性key-value匹配
        DirectExchange:按照routingkey分發到指定隊列
        TopicExchange:多關鍵字匹配
     */
    @Bean
    public DirectExchange defaultExchange() {
        return new DirectExchange(EXCHANGE);
    }

    @Bean
    public Queue queue() {
        return new Queue("spring-boot-queue", true); //隊列持久

    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);
    }

    @Bean
    public SimpleMessageListenerContainer messageContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
        container.setQueues(queue());
        container.setExposeListenerChannel(true);
        container.setMaxConcurrentConsumers(1);
        container.setConcurrentConsumers(1);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設置確認模式手工確認
        container.setMessageListener(new ChannelAwareMessageListener() {

            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                byte[] body = message.getBody();
                System.out.println("receive msg : " + new String(body));
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認消息成功消費
            }
        });
        return container;
    }

}


以上完成 Spring Boot與RabbitMQ的整合 
 
 

自動配置

在Spring Boot中實現了RabbitMQ的自動配置,在配置文件中添加如下配置信息
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=test
spring.rabbitmq.virtualHost=test

后會自動創建ConnectionFactory以及RabbitTemplate對應Bean,為什么上面我們還需要手動什么呢?
 
自動創建的ConnectionFactory無法完成事件的回調,即沒有設置下面的代碼
connectionFactory.setPublisherConfirms(true);


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM