spring boot實戰(第十二篇)整合RabbitMQ


前言

最近幾篇文章將圍繞消息中間件RabbitMQ展開,對於RabbitMQ基本概念這里不闡述,主要講解RabbitMQ的基本用法、Java客戶端API介紹、spring Boot與RabbitMQ整合、

Spring Boot與RabbitMQ整合源碼分析。

 

RabbitMQ安裝

 

在使用消息中間件RabbitMQ之前就是安裝RabbitMQ。

 

  • 安裝erlang:yum install erlang 
  • 下載RabbitMQ安裝包: https://www.rabbitmq.com/releases/rabbitmq-server/v3.5.6/rabbitmq-server-generic-unix-3.5.6.tar.gz
  • 解壓安裝包、配置環境變量RABBITMQ_HOME
 
 
參考網址:https://www.rabbitmq.com/install-generic-unix.html
windows:  https://www.rabbitmq.com/install-windows.html
 

RabbitMQ配置

1.安裝完成后需要對RabbitMQ進行配置,在etc/rabbitmq目錄下創建兩個文件:
  • rabbitmq-env.conf 環境信息配置
RABBITMQ_NODE_IP_ADDRESS=127.0.0.1  
RABBITMQ_NODE_PORT=5672  
RABBITMQ_NODENAME=node01

 

  • rabbitmq.config 核心配置文件

          [{rabbit, [{loopback_users, []}]}].  
該配置表示是的默認用戶guest用戶可以遠程訪問mq(廣域網不能訪問,內網可以訪問)
 

2.啟動RabbitMQ 執行命令 rabbitmq-server
           RabbitMQ 3.5.4. Copyright (C) 2007-2015 Pivotal Software, Inc.  
##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/  
##  ##  
##########  Logs: /Users/liaokailin/software/rabbitmq_server-3.5.4/sbin/../var/log/rabbitmq/node01.log  
######  ##        /Users/liaokailin/software/rabbitmq_server-3.5.4/sbin/../var/log/rabbitmq/node01-sasl.log  
##########  
            Starting broker... completed with 0 plugins.  

 

3. RabbitMQ提供WEB-UI管理控制台,使用 rabbitmq-plugins enable rabbitmq_management命令啟用,重啟后可以看到
 
        Starting broker... completed with 6 plugins.  

表明WEB-UI控制台啟動成功,訪問:http://localhost:15672/
 
 
登陸進入:
 
通過該控制台可以方便管理RabbitMQ。
 

創建Test用戶

RabbitMQ默認使用guest用戶,下面講述如何創建一個test用戶,最快捷的做法使用web管理控制台

 
 
這里使用命令創建:
  • rabbitmqctl add_user test test
  • rabbitmqctl set_user_tags test  administrator

    tag分為四種"management", "policymaker", "monitoring" "administrator" 詳見 http://www.rabbitmq.com/management.html

RabbitMQ 其他

 
在實際使用RabbitMQ中還需要涉及到 RabbitMQ的集群、高可用(采用鏡像隊列實現)以后有機會再詳細闡述,有興趣可參考https://www.rabbitmq.com/documentation.html
 
 

RabbitMQ Java Client

 
RabbitMQ 客戶端支持語言種類繁多,官方都一一舉例:https://www.rabbitmq.com/getstarted.html
 
這里主要自己開發一個小的demo
 

消息消費者

操作步驟:
  1. 創建連接工廠ConnectionFactory
  2. 獲取連接Connection
  3. 通過連接獲取通信通道Channel
  4. 聲明交換機Exchange:交換機類型分為四類

        FanoutExchange: 將消息分發到所有的綁定隊列,無routingkey的概念

            HeadersExchange :通過添加屬性key-value匹配

            DirectExchange:按照routingkey分發到指定隊列

            TopicExchange:多關鍵字匹配

  5. 聲明隊列Queue

  6. 將隊列和交換機綁定

  7. 創建消費者

  8. 執行消息的消費

package org.lkl.mq.rabbitmq.test;  
  
import java.io.IOException;  
import java.util.concurrent.TimeUnit;  
import java.util.concurrent.TimeoutException;  
  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import com.rabbitmq.client.ConsumerCancelledException;  
import com.rabbitmq.client.QueueingConsumer;  
import com.rabbitmq.client.QueueingConsumer.Delivery;  
import com.rabbitmq.client.ShutdownSignalException;  
  
/**  
 * 客戶端01  
 *   
 * @author liaokailin  
 * @version $Id: Receive01.java, v 0.1 2015年11月01日 下午3:47:58 liaokailin Exp $  
 */  
public class Receive01 {  
    public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException,  
                                          ConsumerCancelledException, InterruptedException {  
        ConnectionFactory facotry = new ConnectionFactory();  
        facotry.setUsername("test");  
        facotry.setPassword("test");  
        facotry.setVirtualHost("test");  
        facotry.setHost("localhost");  
  
        Connection conn = facotry.newConnection(); //獲取一個鏈接  
        //通過Channel進行通信  
        Channel channel = conn.createChannel();  
        int prefetchCount = 1;  
        channel.basicQos(prefetchCount); //保證公平分發  
  
        boolean durable = true;  
        //聲明交換機  
        channel.exchangeDeclare(Send.EXCHANGE_NAME, "direct", durable); //按照routingKey過濾  
        //聲明隊列  
        String queueName = channel.queueDeclare("queue-01", true, true, false, null).getQueue();  
        //將隊列和交換機綁定  
        String routingKey = "lkl-0";  
        //隊列可以多次綁定,綁定不同的交換機或者路由key  
        channel.queueBind(queueName, Send.EXCHANGE_NAME, routingKey);  
  
        //創建消費者  
        QueueingConsumer consumer = new QueueingConsumer(channel);  
          
        //將消費者和隊列關聯  
        channel.basicConsume(queueName, false, consumer); // 設置為false表面手動確認消息消費  
  
        //獲取消息  
  
        System.out.println(" Wait message ....");  
        while (true) {  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());  
            String key = delivery.getEnvelope().getRoutingKey();  
  
            System.out.println("  Received '" + key + "':'" + msg + "'");  
            System.out.println(" Handle message");  
            TimeUnit.SECONDS.sleep(3); //mock handle message  
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //確定該消息已成功消費  
        }  
  
    }  
}  

消息生產者

操作步驟:
  1. 創建連接工廠ConnectionFactory
  2. 獲取連接Connection
  3. 通過連接獲取通信通道Channel
  4. 發送消息
 
package org.lkl.mq.rabbitmq.test;  
  
import java.io.IOException;  
import java.util.concurrent.TimeoutException;  
  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.ConfirmListener;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import com.rabbitmq.client.MessageProperties;  
  
/**  
 * 消息publish  
 *   
 * @author liaokailin  
 * @version $Id: Send.java, v 0.1 2015年10月22日 下午3:48:09 liaokailin Exp $  
 */  
public class Send {  
    public final static String EXCHANGE_NAME = "test-exchange";  
  
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {  
        /**  
         * 配置amqp broker 連接信息  
         */  
        ConnectionFactory facotry = new ConnectionFactory();  
        facotry.setUsername("test");  
        facotry.setPassword("test");  
        facotry.setVirtualHost("test");  
        facotry.setHost("localhost");  
  
        Connection conn = facotry.newConnection(); //獲取一個鏈接  
        //通過Channel進行通信  
        Channel channel = conn.createChannel();  
  
        // channel.exchangeDeclare(Send.EXCHANGE_NAME, "direct", true); //如果消費者已創建,這里可不聲明  
        channel.confirmSelect(); //Enables publisher acknowledgements on this channel  
        channel.addConfirmListener(new ConfirmListener() {  
  
            @Override  
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {  
                System.out.println("[handleNack] :" + deliveryTag + "," + multiple);  
  
            }  
  
            @Override  
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {  
                System.out.println("[handleAck] :" + deliveryTag + "," + multiple);  
            }  
        });  
  
        String message = "lkl-";  
        //消息持久化 MessageProperties.PERSISTENT_TEXT_PLAIN  
        //發送多條信息,每條消息對應routekey都不一致  
        for (int i = 0; i < 10; i++) {  
            channel.basicPublish(EXCHANGE_NAME, message + (i % 2), MessageProperties.PERSISTENT_TEXT_PLAIN,  
                (message + i).getBytes());  
            System.out.println("[send] msg " + (message + i) + " of routingKey is " + (message + (i % 2)));  
        }  
  
    }  
}  

 

在設置消息被消費的回調前需顯示調用
 
channel.confirmSelect()  

 

否則回調函數無法調用
 
先執行消費者,消費者會輪詢是否有消息的到來,在web控制也可以觀察哦~~,再啟動生產者發送消息。

================================

前言

本篇主要講述spring Boot與RabbitMQ的整合,內容非常簡單,純API的調用操作。 操作之間需要加入依賴Jar

<dependency>  
<groupId>org.springframework.boot</groupId>  
<artifactId>spring-boot-starter-amqp</artifactId>  
/dependency>

消息生產者

 
        
不論是創建消息消費者或生產者都需要ConnectionFactory
 
        
 

ConnectionFactory配置

 
        
創建AmqpConfig文件AmqpConfig.java(后期的配置都在該文件中)
 
        
@Configuration  
public class AmqpConfig {  
  
    public static final String EXCHANGE   = "spring-boot-exchange";  
    public static final String ROUTINGKEY = "spring-boot-routingKey";  
  
    @Bean  
    public ConnectionFactory connectionFactory() {  
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();  
        connectionFactory.setAddresses("127.0.0.1:5672");  
        connectionFactory.setUsername("guest");  
        connectionFactory.setPassword("guest");  
        connectionFactory.setVirtualHost("/");  
        connectionFactory.setPublisherConfirms(true); //必須要設置  
        return connectionFactory;  
    }  
}

這里需要顯示調用
connectionFactory.setPublisherConfirms(true);  
才能進行消息的回調。
 
        

RabbitTemplate

通過使用RabbitTemplate來對開發者提供API操作
 
        
@Bean  
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)  
//必須是prototype類型  
public RabbitTemplate rabbitTemplate() {  
    RabbitTemplate template = new RabbitTemplate(connectionFactory());  
    return template;  
}  

 

這里設置為原型,具體的原因在后面會講到
 
 
        
  在發送消息時通過調用RabbitTemplate中的如下方法
public void convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData)  
  • exchange:交換機名稱
  • routingKey:路由關鍵字

  • object:發送的消息內容

  • correlationData:消息ID

 
        
因此生產者代碼詳單簡潔
 
        

Send.java

@Component  
public class Send  {  
  
    private RabbitTemplate rabbitTemplate;  
  
    /**  
     * 構造方法注入  
     */  
    @Autowired  
    public Send(RabbitTemplate rabbitTemplate) {  
        this.rabbitTemplate = rabbitTemplate;  
    }  
  
    public void sendMsg(String content) {  
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());  
        rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId);  
    }  
  
       
}  

     

       如果需要在生產者需要消息發送后的回調,需要對rabbitTemplate設置ConfirmCallback對象,由於不同的生產者需要對應不同的ConfirmCallback,如果rabbitTemplate設置為單例bean,則所有的rabbitTemplate

實際的ConfirmCallback為最后一次申明的ConfirmCallback。

下面給出完整的生產者代碼:

 

 

package com.lkl.springboot.amqp;  
  
import java.util.UUID;  
  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.amqp.rabbit.support.CorrelationData;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.stereotype.Component;  
  
/**  
 * 消息生產者  
 *   
 * @author liaokailin  
 * @version $Id: Send.java, v 0.1 2015年11月01日 下午4:22:25 liaokailin Exp $  
 */  
@Component  
public class Send implements RabbitTemplate.ConfirmCallback {  
  
    private RabbitTemplate rabbitTemplate;  
  
    /**  
     * 構造方法注入  
     */  
    @Autowired  
    public Send(RabbitTemplate rabbitTemplate) {  
        this.rabbitTemplate = rabbitTemplate;  
        rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果為單例的話,那回調就是最后設置的內容  
    }  
  
    public void sendMsg(String content) {  
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());  
        rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId);  
    }  
  
    /**  
     * 回調  
     */  
    @Override  
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {  
        System.out.println(" 回調id:" + correlationData);  
        if (ack) {  
            System.out.println("消息成功消費");  
        } else {  
            System.out.println("消息消費失敗:" + cause);  
        }  
    }  
  
}  

 

消息消費者

消費者負責申明交換機(生產者也可以申明)、隊列、兩者的綁定操作。

交換機

/**  
     * 針對消費者配置  
        FanoutExchange: 將消息分發到所有的綁定隊列,無routingkey的概念  
        HeadersExchange :通過添加屬性key-value匹配  
        DirectExchange:按照routingkey分發到指定隊列  
        TopicExchange:多關鍵字匹配  
     */  
    @Bean  
    public DirectExchange defaultExchange() {  
        return new DirectExchange(EXCHANGE);  
    }  

 

在Spring Boot中交換機繼承AbstractExchange類
 
        

隊列

@Bean  
    public Queue queue() {  
        return new Queue("spring-boot-queue", true); //隊列持久  
  
    } 

 

綁定

@Bean  
  public Binding binding() {  
      return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);  
  } 
完成以上工作后,在spring boot中通過消息監聽容器實現消息的監聽,在消息到來時執行回調操作。
 
        
 
 
        

消息消費

@Bean  
  public SimpleMessageListenerContainer messageContainer() {  
      SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());  
      container.setQueues(queue());  
      container.setExposeListenerChannel(true);  
      container.setMaxConcurrentConsumers(1);  
      container.setConcurrentConsumers(1);  
      container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設置確認模式手工確認  
      container.setMessageListener(new ChannelAwareMessageListener() {  
  
          @Override  
          public void onMessage(Message message, Channel channel) throws Exception {  
              byte[] body = message.getBody();  
              System.out.println("receive msg : " + new String(body));  
              channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認消息成功消費  
          }  
      });  
      return container;  
  }  

 


下面給出完整的配置文件:
package com.lkl.springboot.amqp;  
  
import org.springframework.amqp.core.AcknowledgeMode;  
import org.springframework.amqp.core.Binding;  
import org.springframework.amqp.core.BindingBuilder;  
import org.springframework.amqp.core.DirectExchange;  
import org.springframework.amqp.core.Message;  
import org.springframework.amqp.core.Queue;  
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;  
import org.springframework.amqp.rabbit.connection.ConnectionFactory;  
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;  
import org.springframework.beans.factory.config.ConfigurableBeanFactory;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
import org.springframework.context.annotation.Scope;  
  
import com.rabbitmq.client.Channel;  
  
/**  
 * Qmqp Rabbitmq  
 *   
 * http://docs.spring.io/spring-amqp/docs/1.4.5.RELEASE/reference/html/  
 *   
 * @author lkl  
 * @version $Id: AmqpConfig.java, v 0.1 2015年11月01日 下午2:05:37 lkl Exp $  
 */  
  
@Configuration  
public class AmqpConfig {  
  
    public static final String EXCHANGE   = "spring-boot-exchange";  
    public static final String ROUTINGKEY = "spring-boot-routingKey";  
  
    @Bean  
    public ConnectionFactory connectionFactory() {  
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();  
        connectionFactory.setAddresses("127.0.0.1:5672");  
        connectionFactory.setUsername("guest");  
        connectionFactory.setPassword("guest");  
        connectionFactory.setVirtualHost("/");  
        connectionFactory.setPublisherConfirms(true); //必須要設置  
        return connectionFactory;  
    }  
  
    @Bean  
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)  
    //必須是prototype類型  
    public RabbitTemplate rabbitTemplate() {  
        RabbitTemplate template = new RabbitTemplate(connectionFactory());  
        return template;  
    }  
  
    /**  
     * 針對消費者配置  
     * 1. 設置交換機類型  
     * 2. 將隊列綁定到交換機  
     *   
     *   
        FanoutExchange: 將消息分發到所有的綁定隊列,無routingkey的概念  
        HeadersExchange :通過添加屬性key-value匹配  
        DirectExchange:按照routingkey分發到指定隊列  
        TopicExchange:多關鍵字匹配  
     */  
    @Bean  
    public DirectExchange defaultExchange() {  
        return new DirectExchange(EXCHANGE);  
    }  
  
    @Bean  
    public Queue queue() {  
        return new Queue("spring-boot-queue", true); //隊列持久  
  
    }  
  
    @Bean  
    public Binding binding() {  
        return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);  
    }  
  
    @Bean  
    public SimpleMessageListenerContainer messageContainer() {  
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());  
        container.setQueues(queue());  
        container.setExposeListenerChannel(true);  
        container.setMaxConcurrentConsumers(1);  
        container.setConcurrentConsumers(1);  
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設置確認模式手工確認  
        container.setMessageListener(new ChannelAwareMessageListener() {  
  
            @Override  
            public void onMessage(Message message, Channel channel) throws Exception {  
                byte[] body = message.getBody();  
                System.out.println("receive msg : " + new String(body));  
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認消息成功消費  
            }  
        });  
        return container;  
    }  
  
}  

 


以上完成 Spring Boot與RabbitMQ的整合 
 
        
 
 
        
 
 
        

自動配置

 
        
在Spring Boot中實現了RabbitMQ的自動配置,在配置文件中添加如下配置信息
spring.rabbitmq.host=localhost  
spring.rabbitmq.port=5672  
spring.rabbitmq.username=test  
spring.rabbitmq.password=test  
spring.rabbitmq.virtualHost=test  
后會自動創建ConnectionFactory以及RabbitTemplate對應Bean,為什么上面我們還需要手動什么呢?
 
        
自動創建的ConnectionFactory無法完成事件的回調,即沒有設置下面的代碼
connectionFactory.setPublisherConfirms(true);  
 
        
 
具體分析見后續文章的源碼解讀.
 
        

 

=========================================

 

前言

本篇開始講述Spring Boot如何整合RabbitMQ(實際上Spring就整合了RabbitMQ)。
 

RabbitAdmin

 

在上篇中遺留AmqpAdmin沒有講解,現在來看下該部分代碼
public AmqpAdmin amqpAdmin(CachingConnectionFactory connectionFactory) {  
        return new RabbitAdmin(connectionFactory);  
    }  

 

創建RabbitAdmin實例,調用構造方法
public RabbitAdmin(ConnectionFactory connectionFactory) {  
    this.connectionFactory = connectionFactory;  
    Assert.notNull(connectionFactory, "ConnectionFactory must not be null");  
    this.rabbitTemplate = new RabbitTemplate(connectionFactory);  
} 

 


創建連接工廠、rabbitTemplate,其中ConnectionFactory采用上一篇中自定義bean
public ConnectionFactory connectionFactory() {  
     CachingConnectionFactory connectionFactory = new CachingConnectionFactory();  
     connectionFactory.setAddresses("127.0.0.1:5672");  
     connectionFactory.setUsername("guest");  
     connectionFactory.setPassword("guest");  
     connectionFactory.setPublisherConfirms(true); //必須要設置  
     return connectionFactory;  
 }  

 

為CachingConnectionFactory實例,其緩存模式為通道緩存
private volatile CacheMode cacheMode = CacheMode.CHANNEL;  

 

 
接下來看下RabbitAdmin類定義:
public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, InitializingBean {  
...  
}  

 

實現接口AmqpAdmin(定義若干RabbitMQ操作父接口),這里需要強調的是InitializingBean,實現該接口則會調用afterPropertiesSet方法
public void afterPropertiesSet() {  
  
        synchronized (this.lifecycleMonitor) {  
  
            if (this.running || !this.autoStartup) {  
                return;  
            }  
  
            if (this.connectionFactory instanceof CachingConnectionFactory &&  
                    ((CachingConnectionFactory) this.connectionFactory).getCacheMode() == CacheMode.CONNECTION) {  
                logger.warn("RabbitAdmin auto declaration is not supported with CacheMode.CONNECTION");  
                return;  
            }  
  
            this.connectionFactory.addConnectionListener(new ConnectionListener() {  
  
                // Prevent stack overflow...  
                private final AtomicBoolean initializing = new AtomicBoolean(false);  
  
                @Override  
                public void onCreate(Connection connection) {  
                    if (!initializing.compareAndSet(false, true)) {  
                        // If we are already initializing, we don't need to do it again...  
                        return;  
                    }  
                    try {  
                           
                        initialize();  
                    }  
                    finally {  
                        initializing.compareAndSet(true, false);  
                    }  
                }  
  
                @Override  
                public void onClose(Connection connection) {  
                }  
  
            });  
  
            this.running = true;  
  
        }  
    }  

 

synchronized (this.lifecycleMonitor)加鎖保證同一時間只有一個線程訪問該代碼,隨后調用this.connectionFactory.addConnectionListener添加連接監聽,各連接工廠關系:


實際調用為CachingConnectionFactory
public void addConnectionListener(ConnectionListener listener) {  
        super.addConnectionListener(listener);  
        // If the connection is already alive we assume that the new listener wants to be notified  
        if (this.connection != null) {  
            listener.onCreate(this.connection);  
        }  
    }  

 


此時connection為null,無法執行到listener.onCreate(this.connection); 往CompositeConnectionListener connectionListener中添加監聽信息,最終保證在集合中
private List<ConnectionListener> delegates = new CopyOnWriteArrayList<ConnectionListener>();  

 

這里添加的監聽代碼執行,在后面調用時再來講解。
 
至此~~ RabbitAdmin創建完成。 
 
 

Exchange

接下來繼續來看AmqpConfig.java中的代碼
@Bean  
  public DirectExchange defaultExchange() {  
      return new DirectExchange(EXCHANGE);  
  }  

 

以上代碼創建一個交換機,交換機類型為direct
 

在申明交換機時需要指定交換機名稱, 默認創建可持久交換機
 

Queue

public Queue queue() {  
       return new Queue("spring-boot-queue", true); //隊列持久  
   }  
 
 
默認創建可持久隊列
 

Binding

@Bean  
   public Binding binding() {  
       return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);  
   }  
 
BindingBuilder.bind(queue()) 實現為:
public static DestinationConfigurer bind(Queue queue) {  
        return new DestinationConfigurer(queue.getName(), DestinationType.QUEUE);  
    }  

 


DestinationConfigurer通過name、type區分不同配置信息,其to()方法為重載方法,傳遞參數為四種交換機,分別返回XxxExchangeRoutingKeyConfigurer,其中with方法返回Bingding實例,因此在Binding信息中存儲了
隊列、交換機、路由key等相關信息
public class Binding extends AbstractDeclarable {  
  
    public static enum DestinationType {  
        QUEUE, EXCHANGE;  
    }  
  
    private final String destination;  
  
    private final String exchange;  
  
    private final String routingKey;  
  
    private final Map<String, Object> arguments;  
  
    private final DestinationType destinationType;  
...  
}  

以上信息理解都非常簡單,下面來看比較復雜點的SimpleMessageListenerContainer

SimpleMessageListenerContainer

@Bean  
    public SimpleMessageListenerContainer messageContainer() {  
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());  
        container.setQueues(queue());  
        container.setExposeListenerChannel(true);  
        container.setMaxConcurrentConsumers(1);  
        container.setConcurrentConsumers(1);  
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設置確認模式手工確認  
        container.setMessageListener(new ChannelAwareMessageListener() {  
  
            @Override  
            public void onMessage(Message message, Channel channel) throws Exception {  
                byte[] body = message.getBody();  
                System.out.println("receive msg : " + new String(body));  
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認消息成功消費  
            }  
        });  
        return container;  
    }  

 


查看其實現的接口,注意SmartLifecycle

 
接下來設置隊列信息,在AbstractMessageListenerContainer
 

       private volatile List<StringqueueNames = new CopyOnWriteArrayList<String>();  

添加隊列信息
    AbstractMessageListenerContainer#exposeListenerChannel設置為true

  
container.setMaxConcurrentConsumers(1);  
container.setConcurrentConsumers(1);  

 

設置並發消費者數量,默認情況為1
 
  1. container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設置確認模式手工確認  

     

設置消費者成功消費消息后確認模式,分為兩種
  • 自動模式,默認模式,在RabbitMQ Broker消息發送到消費者后自動刪除
  • 手動模式,消費者客戶端顯示編碼確認消息消費完成,Broker給生產者發送回調,消息刪除
接下來設置消費者端消息監聽,為private volatile Object messageListener 賦值
 
到這里消息監聽容器也創建完成了,但令人納悶的時,消費者如何去消費消息呢?從這里完全看不出來。那么接下來看下SmartLifecycle接口
 

SmartLifecycle

熟悉Spring都應該知道該接口,其定義為:

 
public interface SmartLifecycle extends Lifecycle, Phased {  
  
    boolean isAutoStartup();  
    void stop(Runnable callback);  
  
}

 

其中的isAutoStartup設置為true時,會自動調用Lifecycle接口中的start方法,既然我們為源碼分析,也簡單看下這個聰明的聲明周期接口是如何實現它的聰明方法的
 

spring boot實戰(第十篇)Spring boot Bean加載源碼分析中講到執行Bean加載時,調用AbstractApplicationContext#refresh(),其中存在一個方法調用finishRefresh()
[html]  view plain  copy
 
  1. protected void finishRefresh() {  
  2.     // Initialize lifecycle processor for this context.  
  3.     initLifecycleProcessor();  
  4.   
  5.     // Propagate refresh to lifecycle processor first.  
  6.     getLifecycleProcessor().onRefresh();  
  7.   
  8.     // Publish the final event.  
  9.     publishEvent(new ContextRefreshedEvent(this));  
  10.   
  11.     // Participate in LiveBeansView MBean, if active.  
  12.     LiveBeansView.registerApplicationContext(this);  
  13. }  

其中initLifecycleProcessor初始化生命周期處理器,

[html]  view plain  copy
 
  1. protected void initLifecycleProcessor() {  
  2.     ConfigurableListableBeanFactory beanFactory = getBeanFactory();  
  3.     if (beanFactory.containsLocalBean(LIFECYCLE_PROCESSOR_BEAN_NAME)) {  
  4.         this.lifecycleProcessor =  
  5.                 beanFactory.getBean(LIFECYCLE_PROCESSOR_BEAN_NAME, LifecycleProcessor.class);  
  6.         if (logger.isDebugEnabled()) {  
  7.             logger.debug("Using LifecycleProcessor [" + this.lifecycleProcessor + "]");  
  8.         }  
  9.     }  
  10.     else {  
  11.         DefaultLifecycleProcessor defaultProcessor = new DefaultLifecycleProcessor();  
  12.         defaultProcessor.setBeanFactory(beanFactory);  
  13.         this.lifecycleProcessor = defaultProcessor;  
  14.         beanFactory.registerSingleton(LIFECYCLE_PROCESSOR_BEAN_NAME, this.lifecycleProcessor);  
  15.         if (logger.isDebugEnabled()) {  
  16.             logger.debug("Unable to locate LifecycleProcessor with name '" +  
  17.                     LIFECYCLE_PROCESSOR_BEAN_NAME +  
  18.                     "': using default [" + this.lifecycleProcessor + "]");  
  19.         }  
  20.     }  
  21. }  

注冊DefaultLifecycleProcessor對應bean

getLifecycleProcessor().onRefresh()調用DefaultLifecycleProcessor中方法onRefresh,調用startBeans(true)

 

[html]  view plain  copy
 
  1. private void startBeans(boolean autoStartupOnly) {  
  2.     Map<String, LifecyclelifecycleBeans = getLifecycleBeans();  
  3.     Map<Integer, LifecycleGroupphases = new HashMap<Integer, LifecycleGroup>();  
  4.     for (Map.Entry<String, ? extends Lifecycle> entry : lifecycleBeans.entrySet()) {  
  5.         Lifecycle bean = entry.getValue();  
  6.         if (!autoStartupOnly || (bean instanceof SmartLifecycle && ((SmartLifecycle) bean).isAutoStartup())) {  
  7.             int phase = getPhase(bean);  
  8.             LifecycleGroup group = phases.get(phase);  
  9.             if (group == null) {  
  10.                 group = new LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly);  
  11.                 phases.put(phase, group);  
  12.             }  
  13.             group.add(entry.getKey(), bean);  
  14.         }  
  15.     }  
  16.     if (phases.size() > 0) {  
  17.         List<Integerkeys = new ArrayList<Integer>(phases.keySet());  
  18.         Collections.sort(keys);  
  19.         for (Integer key : keys) {  
  20.             phases.get(key).start();  
  21.         }  
  22.     }  
  23. }  

其中

Map<String, LifecyclelifecycleBeans = getLifecycleBeans();  

 

獲取所有實現Lifecycle接口bean,執行bean instanceof SmartLifecycle && ((SmartLifecycle)bean).isAutoStartup()判斷,如果bean同時也為Phased實例,則加入到LifecycleGroup中,隨后phases.get(key).start()調用start方法

 

接下來要做的事情就很明顯:要了解消費者具體如何實現,查看SimpleMessageListenerContainer中的start是如何實現的。

 

至此~~整合RabbitMQ源碼分析准備工作完成,下一篇中正式解讀消費者的實現。

 

==============================

 

踩坑記錄

近日在用spring boot架構一個微服務框架,服務發現與治理、發布REST接口各種輕松愜意。但是服務當設計MQ入口時,就發現遇到無數地雷,現在整理成下文,供各路大俠圍觀與嘲笑。

版本

當前使用的spring-boot-starter-amqp版本為2016.5發布的1.3.5.RELEASE

也許若干年后,你們版本都不會有這些問題了。:(

RabbitMQ

當需要用到MQ的時候,我的第一反映就是使用RabbitMQ,貓了一眼spring boot的官方說明,上面說spring boot為rabbit准備了spring-boot-starter-amqp,並且為RabbitTemplate和RabbitMQ提供了自動配置選項。暗自竊喜~~

瞅瞅[官方文檔]http://docs.spring.io/spring-boot/docs/current-SNAPSHOT/reference/htmlsingle/#boot-features-rabbitmq和例子,SO EASY,再看一眼GITHUB上的官方例了,也有例子。

心情愉悅的照着例子,開干~~。

踩坑

十五分鍾后的代碼類似這樣:

@Service @RabbitListener(queues = "merchant") public class MQReceiver { protected Logger logger = Logger.getLogger(MQReceiver.class .getName()); @RabbitHandler public void process(@Payload UpdateMerchant request) { UpdateMerchantResponse response = new UpdateMerchantResponse(); logger.info(request.getMerchantId() + "->" + response.getReturnCode()); } }

消費信息后,應該記錄一條日志。
結果得到只有org.springframework.amqp.AmqpException: No method found for class [B 這個異常,並且還無限循環拋出這個異常。。。

記得剛才官方文檔好像說了異常什么的,轉身去貓一眼,果然有:

If retries are not enabled and the listener throws an exception, by default the delivery will be retried indefinitely. You can modify this behavior in two ways; set the defaultRequeueRejected
 property to false
 and zero re-deliveries will be attempted; or, throw an AmqpRejectAndDontRequeueException
 to signal the message should be rejected. This is the mechanism used when retries are enabled and the maximum delivery attempts are reached.

知道了為啥會無限重試了,下面來看看為啥會拋出這個異常,google搜一下,貌似還有一個倒霉鬼遇到了這個問題

進去看完問題和大神的解答,豁然開朗。

There are two conversions in the @RabbitListener pipeline.
The first converts from a Spring AMQP Message to a spring-messaging Message.
There is currently no way to change the first converter from SimpleMessageConverter which handles String, Serializable and passes everything else as byte[].
The second converter converts the message payload to the method parameter type (if necessary).
With method-level @RabbitListeners there is a tight binding between the handler and the method.
With class-level @RabbitListener s, the message payload from the first conversion is used to select which method to invoke. Only then, is the argument conversion attempted.
This mechanism works fine with Java Serializable objects since the payload has already been converted before the method is selected.
However, with JSON, the first conversion returns a byte[] and hence we find no matching @RabbitHandler.
We need a mechanism such that the first converter is settable so that the payload is converted early enough in the pipeline to select the appropriate handler method.
A ContentTypeDelegatingMessageConverter is probably most appropriate.
And, as stated in AMQP-574, we need to clearly document the conversion needs for a @RabbitListener, especially when using JSON or a custom conversion.

得嘞,官方示例果然是坑,試試大神的解決方案,手動新增下轉換。

  @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMessageConverter(new Jackson2JsonMessageConverter()); return template; } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; }

然后在生產和消費信息的地方使用他們:

@RabbitListener(queues = "merchant", containerFactory="rabbitListenerContainerFactory") public void process(@Payload UpdateMerchant request) { UpdateMerchantResponse response = new UpdateMerchantResponse(); logger.info(request.getMerchantId() + "->" + response.getReturnCode()); }

再來一次,果然可以了

c.l.s.m.service.MQReceiver : 00000001->null

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM