RabbitMQ(四)Spring集成RabbitMQ


 

前言

在使用Spring整合RabbitMQ時我們主要關注三個核心接口:

  • RabbitAdmin: 用於聲明交換機 隊列 綁定等
  • RabbitTemplate: 用於RabbitMQ消息的發送和接收
  • MessageListenerContainer: 監聽容器 為消息入隊提供異步處理

 

依賴

<dependency>
     <groupId>org.springframework.amqp</groupId>
     <artifactId>spring-rabbit</artifactId>
     <version>1.7.9.RELEASE</version>
</dependency>

 

配置

可通過以下兩種方式進行配置。

  • rabbitmq.xml配置文件
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  xsi:schemaLocation="http://www.springframework.org/schema/beans 
     http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
     http://www.springframework.org/schema/rabbit
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd">

    <!-- 消費者和生產者通用配置(begin) -->
<!-- 創建連接工廠 --> <rabbit:connection-factory id="rabbitmqConnectionFactory" host="${rabbitmq_host}" port="${rabbitmq_port}" username="${rabbitmq_user}" password="${rabbitmq_passwd}" virtual-host="${rabbitmq_vhost}" /> <!-- 創建rabbitAdmin --> <rabbit:admin id="connectAdmin" connection-factory="rabbitmqConnectionFactory"/> <!-- 定義消息對象json轉換類 --> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> <!-- 消費者和生產者通用配置(end) --> <!-- 財務記賬接口配置(begin) --> <!-- 創建rabbitTemplate消息發送模版 --> <rabbit:template id="rabbitTemplate" exchange="${mmc_topic_exchange_name}" routing-key="${mmc_routingkey_name}" connection-factory="rabbitmqConnectionFactory" message-converter="jsonMessageConverter"/> <!-- 財務記賬接口配置(end) -->
<!-- 關閉訂單配置(begin) --> <!-- 創建rabbitTemplate消息發送模版--> <rabbit:template id="orderDelayTemplate" exchange="mmc.order.delay.exchange-delay" connection-factory="rabbitmqConnectionFactory" message-converter="jsonMessageConverter" routing-key="mmc.order.delay.routingkey" /> <!-- 聲明監聽的Queue的名稱 --> <rabbit:queue id="orderQueue" name="mmc.order.delay.queue"/> <!-- 聲明exchange的類型為topic --> <rabbit:direct-exchange name="mmc.order.delay.exchange-delay" declared-by="connectAdmin" delayed="true"> <rabbit:bindings> <rabbit:binding queue="mmc.order.delay.queue" key="mmc.order.delay.routingkey"/> </rabbit:bindings> </rabbit:direct-exchange>
<!-- 創建消息監控容器 --> <bean id="messageHandler" class="com.zat.mmc.service.rabbitmq.MessageHandler"/> <rabbit:listener-container connection-factory="rabbitmqConnectionFactory" message-converter="jsonMessageConverter"> <rabbit:listener queues="mmc.order.delay.queue" ref="messageHandler" /> </rabbit:listener-container> <!-- 關閉訂單配置(end) --> </beans>

 配置文件其實還可以更簡單點:

    <!-- 消費者和生產者通用配置(begin) -->
    <!-- 創建連接 -->
    <rabbit:connection-factory  id="rabbitmqConnectionFactory" host="${rabbitmq_host}" port="${rabbitmq_port}" username="${rabbitmq_user}" password="${rabbitmq_passwd}" virtual-host="${rabbitmq_vhost}" />

    <!-- 創建rabbitAdmin -->
    <rabbit:admin id="connectAdmin" connection-factory="rabbitmqConnectionFactory"/>

    <!-- 消息對象json轉換類 -->
    <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
    <!-- 消費者和生產者通用配置(end) -->


    <!-- 財務記賬接口配置(begin) -->
    <!-- 創建rabbitTemplate消息發送模版 -->
    <rabbit:template id="rabbitTemplate"  exchange="${mmc_topic_exchange_name}" routing-key="${mmc_routingkey_name}" connection-factory="rabbitmqConnectionFactory" message-converter="jsonMessageConverter"/>
    <!-- 財務記賬接口配置(end) -->

中間定義了交換機,定義了隊列,其實都可以省略的,可以到mq的管理后台創建。在代碼中定義交換機和隊列的好處是在使用時會自動創建它。

 

  •  Java配置類

spring在啟動時會掃描到Configuration這個注解是一個配置文件的注解。

@Configuration
public class RabbitMQConfig {

  public final static String QUEUE_NAME = "spring-queue";
  public final static String EXCHANGE_NAME = "spring-exchange";
  public final static String ROUTING_KEY = "spring-key";

  // 創建隊列
  @Bean
  public Queue queue() {
    return new Queue(QUEUE_NAME);
  }

  // 創建一個 topic 類型的交換器
  @Bean
  public TopicExchange exchange() {
    return new TopicExchange(EXCHANGE_NAME);
  }

  // 使用路由鍵(routingKey)把隊列(Queue)綁定到交換器(Exchange)
  @Bean
  public Binding binding(Queue queue, TopicExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
  }

  @Bean
  public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory("xx", 5670);
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    return connectionFactory;
  }

  @Bean
  public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
    RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
    // 注意,autoStartup 必須設置為 true,否則 Spring 容器不會加載 RabbitAdmin 類
    rabbitAdmin.setAutoStartup(true);
    return rabbitAdmin;

  } 

  @Bean
  public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    return new RabbitTemplate(connectionFactory);
  }
}

 

生產者

@Component
public class DemoProducer {

    @Resource(name = "rabbitTemplate")
    private RabbitTemplate rabbitTemplate; 

    public void sendProducer() {
        // 生產者發送消息(因為在配置文件里面已經為rabbitTemplate指定了交換機和routing,所以可以省去它們)
        rabbitTemplate.convertAndSend(jsonObject.toJSONString());   
// 生產者往solutionInfo_exchange這個交換機,這個info_queue_key路由中發送消息 rabbitTemplate.convertAndSend("solutionInfo_exchange","info_queue_key", jsonObject.toJSONString()); } }

 

消費者

package com.rabbitmq.demo

@Component
public class DemoReceiver extends AbstractAdaptableMessageListener {

    @Override
    public void onMessage(Message message, Channel channel) {
        try {
            String result=new String(message.getBody(),"UTF-8")
            log.info("bodyJson:" + result);
         } catch (Throwable e) {
            log.error("WechatMsgReceiver exception", e);
         }
    }

}

 

其它

  • 批量自動聲明交換機、隊列和綁定

可以批量創建Queue和Exchange,批量創建綁定關系並將其放進List集合中返回使用:

 

  •  手動聲明交換機、隊列和綁定

可通過RabbitAdmin來實現:

/**
 * 聲明一個direct類型的、持久化、非排他的交換器
 */
rabbitAdmin.declareExchange(new DirectExchange(EXCHANGE_NAME, true, false, new HashMap<String, Object>()));
/** * 聲明一個持久化、非排他、非自動刪除的隊列 */ rabbitAdmin.declareQueue(new Queue(QUEUE_NAME, true, false, false, new HashMap<String, Object>()));
/** * 將交換器和隊列綁定 */ rabbitAdmin.declareBinding(BindingBuilder.bind(new Queue(QUEUE_NAME)). to(new DirectExchange(EXCHANGE_NAME)).with(ROUTING_KEY));

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM