前言
最近幾篇文章將圍繞消息中間件RabbitMQ展開,對於RabbitMQ基本概念這里不闡述,主要講解RabbitMQ的基本用法、Java客戶端API介紹、spring Boot與RabbitMQ整合、
Spring Boot與RabbitMQ整合源碼分析。
RabbitMQ安裝
在使用消息中間件RabbitMQ之前就是安裝RabbitMQ。
- 安裝erlang:yum install erlang
- 下載RabbitMQ安裝包: https://www.rabbitmq.com/releases/rabbitmq-server/v3.5.6/rabbitmq-server-generic-unix-3.5.6.tar.gz
- 解壓安裝包、配置環境變量RABBITMQ_HOME
RabbitMQ配置
- rabbitmq-env.conf 環境信息配置
-
rabbitmq.config 核心配置文件
2.啟動RabbitMQ 執行命令 rabbitmq-server
表明WEB-UI控制台啟動成功,訪問:http://localhost:15672/
創建Test用戶
- rabbitmqctl add_user test test
-
rabbitmqctl set_user_tags test administrator
tag分為四種"management", "policymaker", "monitoring" "administrator" 詳見 http://www.rabbitmq.com/management.html
RabbitMQ 其他
RabbitMQ Java Client
消息消費者
- 創建連接工廠ConnectionFactory
- 獲取連接Connection
- 通過連接獲取通信通道Channel
- 聲明交換機Exchange:交換機類型分為四類:
FanoutExchange: 將消息分發到所有的綁定隊列,無routingkey的概念
HeadersExchange :通過添加屬性key-value匹配
DirectExchange:按照routingkey分發到指定隊列
TopicExchange:多關鍵字匹配
-
聲明隊列Queue
-
將隊列和交換機綁定
-
創建消費者
-
執行消息的消費
消息生產者
- 創建連接工廠ConnectionFactory
- 獲取連接Connection
- 通過連接獲取通信通道Channel
- 發送消息
package org.lkl.mq.rabbitmq.test; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; /** * 消息publish * * @author liaokailin * @version $Id: Send.java, v 0.1 2015年10月22日 下午3:48:09 liaokailin Exp $ */ public class Send { public final static String EXCHANGE_NAME = "test-exchange"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { /** * 配置amqp broker 連接信息 */ ConnectionFactory facotry = new ConnectionFactory(); facotry.setUsername("test"); facotry.setPassword("test"); facotry.setVirtualHost("test"); facotry.setHost("localhost"); Connection conn = facotry.newConnection(); //獲取一個鏈接 //通過Channel進行通信 Channel channel = conn.createChannel(); // channel.exchangeDeclare(Send.EXCHANGE_NAME, "direct", true); //如果消費者已創建,這里可不聲明 channel.confirmSelect(); //Enables publisher acknowledgements on this channel channel.addConfirmListener(new ConfirmListener() { @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("[handleNack] :" + deliveryTag + "," + multiple); } @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("[handleAck] :" + deliveryTag + "," + multiple); } }); String message = "lkl-"; //消息持久化 MessageProperties.PERSISTENT_TEXT_PLAIN //發送多條信息,每條消息對應routekey都不一致 for (int i = 0; i < 10; i++) { channel.basicPublish(EXCHANGE_NAME, message + (i % 2), MessageProperties.PERSISTENT_TEXT_PLAIN, (message + i).getBytes()); System.out.println("[send] msg " + (message + i) + " of routingKey is " + (message + (i % 2))); } } }
前言
本篇主要講述spring Boot與RabbitMQ的整合,內容非常簡單,純API的調用操作。 操作之間需要加入依賴Jar
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId>
/dependency>
消息生產者
ConnectionFactory配置
@Configuration
public class AmqpConfig {
public static final String EXCHANGE = "spring-boot-exchange";
public static final String ROUTINGKEY = "spring-boot-routingKey";
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("127.0.0.1:5672");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true); //必須要設置
return connectionFactory;
}
}
這里需要顯示調用
connectionFactory.setPublisherConfirms(true);
RabbitTemplate
通過使用RabbitTemplate來對開發者提供API操作@Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) //必須是prototype類型 public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template; }
public void convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData)
- exchange:交換機名稱
-
routingKey:路由關鍵字
-
object:發送的消息內容
-
correlationData:消息ID
Send.java
@Component public class Send { private RabbitTemplate rabbitTemplate; /** * 構造方法注入 */ @Autowired public Send(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } public void sendMsg(String content) { CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId); } }
如果需要在生產者需要消息發送后的回調,需要對rabbitTemplate設置ConfirmCallback對象,由於不同的生產者需要對應不同的ConfirmCallback,如果rabbitTemplate設置為單例bean,則所有的rabbitTemplate
實際的ConfirmCallback為最后一次申明的ConfirmCallback。
下面給出完整的生產者代碼:
package com.lkl.springboot.amqp; import java.util.UUID; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 消息生產者 * * @author liaokailin * @version $Id: Send.java, v 0.1 2015年11月01日 下午4:22:25 liaokailin Exp $ */ @Component public class Send implements RabbitTemplate.ConfirmCallback { private RabbitTemplate rabbitTemplate; /** * 構造方法注入 */ @Autowired public Send(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果為單例的話,那回調就是最后設置的內容 } public void sendMsg(String content) { CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId); } /** * 回調 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println(" 回調id:" + correlationData); if (ack) { System.out.println("消息成功消費"); } else { System.out.println("消息消費失敗:" + cause); } } }
消息消費者
消費者負責申明交換機(生產者也可以申明)、隊列、兩者的綁定操作。
交換機
/** * 針對消費者配置 FanoutExchange: 將消息分發到所有的綁定隊列,無routingkey的概念 HeadersExchange :通過添加屬性key-value匹配 DirectExchange:按照routingkey分發到指定隊列 TopicExchange:多關鍵字匹配 */ @Bean public DirectExchange defaultExchange() { return new DirectExchange(EXCHANGE); }
隊列
@Bean public Queue queue() { return new Queue("spring-boot-queue", true); //隊列持久 }
綁定
消息消費
@Bean public SimpleMessageListenerContainer messageContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory()); container.setQueues(queue()); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設置確認模式手工確認 container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { byte[] body = message.getBody(); System.out.println("receive msg : " + new String(body)); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認消息成功消費 } }); return container; }
下面給出完整的配置文件:
package com.lkl.springboot.amqp; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; import com.rabbitmq.client.Channel; /** * Qmqp Rabbitmq * * http://docs.spring.io/spring-amqp/docs/1.4.5.RELEASE/reference/html/ * * @author lkl * @version $Id: AmqpConfig.java, v 0.1 2015年11月01日 下午2:05:37 lkl Exp $ */ @Configuration public class AmqpConfig { public static final String EXCHANGE = "spring-boot-exchange"; public static final String ROUTINGKEY = "spring-boot-routingKey"; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses("127.0.0.1:5672"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); //必須要設置 return connectionFactory; } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) //必須是prototype類型 public RabbitTemplate rabbitTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template; } /** * 針對消費者配置 * 1. 設置交換機類型 * 2. 將隊列綁定到交換機 * * FanoutExchange: 將消息分發到所有的綁定隊列,無routingkey的概念 HeadersExchange :通過添加屬性key-value匹配 DirectExchange:按照routingkey分發到指定隊列 TopicExchange:多關鍵字匹配 */ @Bean public DirectExchange defaultExchange() { return new DirectExchange(EXCHANGE); } @Bean public Queue queue() { return new Queue("spring-boot-queue", true); //隊列持久 } @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY); } @Bean public SimpleMessageListenerContainer messageContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory()); container.setQueues(queue()); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設置確認模式手工確認 container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { byte[] body = message.getBody(); System.out.println("receive msg : " + new String(body)); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認消息成功消費 } }); return container; } }
以上完成 Spring Boot與RabbitMQ的整合
自動配置
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=test spring.rabbitmq.password=test spring.rabbitmq.virtualHost=test
connectionFactory.setPublisherConfirms(true);
=========================================
前言
RabbitAdmin
在上篇中遺留AmqpAdmin沒有講解,現在來看下該部分代碼
創建連接工廠、rabbitTemplate,其中ConnectionFactory采用上一篇中自定義bean
private volatile CacheMode cacheMode = CacheMode.CHANNEL;
public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, InitializingBean { ... }
public void afterPropertiesSet() { synchronized (this.lifecycleMonitor) { if (this.running || !this.autoStartup) { return; } if (this.connectionFactory instanceof CachingConnectionFactory && ((CachingConnectionFactory) this.connectionFactory).getCacheMode() == CacheMode.CONNECTION) { logger.warn("RabbitAdmin auto declaration is not supported with CacheMode.CONNECTION"); return; } this.connectionFactory.addConnectionListener(new ConnectionListener() { // Prevent stack overflow... private final AtomicBoolean initializing = new AtomicBoolean(false); @Override public void onCreate(Connection connection) { if (!initializing.compareAndSet(false, true)) { // If we are already initializing, we don't need to do it again... return; } try { initialize(); } finally { initializing.compareAndSet(true, false); } } @Override public void onClose(Connection connection) { } }); this.running = true; } }
此時connection為null,無法執行到listener.onCreate(this.connection); 往CompositeConnectionListener connectionListener中添加監聽信息,最終保證在集合中
Exchange
在申明交換機時需要指定交換機名稱, 默認創建可持久交換機
Queue
Binding
public static DestinationConfigurer bind(Queue queue) { return new DestinationConfigurer(queue.getName(), DestinationType.QUEUE); }
DestinationConfigurer通過name、type區分不同配置信息,其to()方法為重載方法,傳遞參數為四種交換機,分別返回XxxExchangeRoutingKeyConfigurer,其中with方法返回Bingding實例,因此在Binding信息中存儲了
以上信息理解都非常簡單,下面來看比較復雜點的SimpleMessageListenerContainer
SimpleMessageListenerContainer
查看其實現的接口,注意SmartLifecycle
private volatile List<String> queueNames = new CopyOnWriteArrayList<String>();
-
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設置確認模式手工確認
- 自動模式,默認模式,在RabbitMQ Broker消息發送到消費者后自動刪除
- 手動模式,消費者客戶端顯示編碼確認消息消費完成,Broker給生產者發送回調,消息刪除
SmartLifecycle
在 spring boot實戰(第十篇)Spring boot Bean加載源碼分析中講到執行Bean加載時,調用AbstractApplicationContext#refresh(),其中存在一個方法調用finishRefresh()
- protected void finishRefresh() {
- // Initialize lifecycle processor for this context.
- initLifecycleProcessor();
- // Propagate refresh to lifecycle processor first.
- getLifecycleProcessor().onRefresh();
- // Publish the final event.
- publishEvent(new ContextRefreshedEvent(this));
- // Participate in LiveBeansView MBean, if active.
- LiveBeansView.registerApplicationContext(this);
- }
其中initLifecycleProcessor初始化生命周期處理器,
- protected void initLifecycleProcessor() {
- ConfigurableListableBeanFactory beanFactory = getBeanFactory();
- if (beanFactory.containsLocalBean(LIFECYCLE_PROCESSOR_BEAN_NAME)) {
- this.lifecycleProcessor =
- beanFactory.getBean(LIFECYCLE_PROCESSOR_BEAN_NAME, LifecycleProcessor.class);
- if (logger.isDebugEnabled()) {
- logger.debug("Using LifecycleProcessor [" + this.lifecycleProcessor + "]");
- }
- }
- else {
- DefaultLifecycleProcessor defaultProcessor = new DefaultLifecycleProcessor();
- defaultProcessor.setBeanFactory(beanFactory);
- this.lifecycleProcessor = defaultProcessor;
- beanFactory.registerSingleton(LIFECYCLE_PROCESSOR_BEAN_NAME, this.lifecycleProcessor);
- if (logger.isDebugEnabled()) {
- logger.debug("Unable to locate LifecycleProcessor with name '" +
- LIFECYCLE_PROCESSOR_BEAN_NAME +
- "': using default [" + this.lifecycleProcessor + "]");
- }
- }
- }
注冊DefaultLifecycleProcessor對應bean
getLifecycleProcessor().onRefresh()調用DefaultLifecycleProcessor中方法onRefresh,調用startBeans(true)
- private void startBeans(boolean autoStartupOnly) {
- Map<String, Lifecycle> lifecycleBeans = getLifecycleBeans();
- Map<Integer, LifecycleGroup> phases = new HashMap<Integer, LifecycleGroup>();
- for (Map.Entry<String, ? extends Lifecycle> entry : lifecycleBeans.entrySet()) {
- Lifecycle bean = entry.getValue();
- if (!autoStartupOnly || (bean instanceof SmartLifecycle && ((SmartLifecycle) bean).isAutoStartup())) {
- int phase = getPhase(bean);
- LifecycleGroup group = phases.get(phase);
- if (group == null) {
- group = new LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly);
- phases.put(phase, group);
- }
- group.add(entry.getKey(), bean);
- }
- }
- if (phases.size() > 0) {
- List<Integer> keys = new ArrayList<Integer>(phases.keySet());
- Collections.sort(keys);
- for (Integer key : keys) {
- phases.get(key).start();
- }
- }
- }
其中
Map<String, Lifecycle> lifecycleBeans = getLifecycleBeans();
獲取所有實現Lifecycle接口bean,執行bean instanceof SmartLifecycle && ((SmartLifecycle)bean).isAutoStartup()判斷,如果bean同時也為Phased實例,則加入到LifecycleGroup中,隨后phases.get(key).start()調用start方法
接下來要做的事情就很明顯:要了解消費者具體如何實現,查看SimpleMessageListenerContainer中的start是如何實現的。
至此~~整合RabbitMQ源碼分析准備工作完成,下一篇中正式解讀消費者的實現。
==============================
踩坑記錄
近日在用spring boot架構一個微服務框架,服務發現與治理、發布REST接口各種輕松愜意。但是服務當設計MQ入口時,就發現遇到無數地雷,現在整理成下文,供各路大俠圍觀與嘲笑。
版本
當前使用的spring-boot-starter-amqp版本為2016.5發布的1.3.5.RELEASE
也許若干年后,你們版本都不會有這些問題了。:(
RabbitMQ
當需要用到MQ的時候,我的第一反映就是使用RabbitMQ,貓了一眼spring boot的官方說明,上面說spring boot為rabbit准備了spring-boot-starter-amqp,並且為RabbitTemplate和RabbitMQ提供了自動配置選項。暗自竊喜~~
瞅瞅[官方文檔]http://docs.spring.io/spring-boot/docs/current-SNAPSHOT/reference/htmlsingle/#boot-features-rabbitmq和例子,SO EASY,再看一眼GITHUB上的官方例了,也有例子。
心情愉悅的照着例子,開干~~。
踩坑
十五分鍾后的代碼類似這樣:
@Service @RabbitListener(queues = "merchant") public class MQReceiver { protected Logger logger = Logger.getLogger(MQReceiver.class .getName()); @RabbitHandler public void process(@Payload UpdateMerchant request) { UpdateMerchantResponse response = new UpdateMerchantResponse(); logger.info(request.getMerchantId() + "->" + response.getReturnCode()); } }
消費信息后,應該記錄一條日志。
結果得到只有org.springframework.amqp.AmqpException: No method found for class [B 這個異常,並且還無限循環拋出這個異常。。。
記得剛才官方文檔好像說了異常什么的,轉身去貓一眼,果然有:
If retries are not enabled and the listener throws an exception, by default the delivery will be retried indefinitely. You can modify this behavior in two ways; set the defaultRequeueRejected
property to false
and zero re-deliveries will be attempted; or, throw an AmqpRejectAndDontRequeueException
to signal the message should be rejected. This is the mechanism used when retries are enabled and the maximum delivery attempts are reached.
知道了為啥會無限重試了,下面來看看為啥會拋出這個異常,google搜一下,貌似還有一個倒霉鬼遇到了這個問題。
進去看完問題和大神的解答,豁然開朗。
There are two conversions in the @RabbitListener pipeline.
The first converts from a Spring AMQP Message to a spring-messaging Message.
There is currently no way to change the first converter from SimpleMessageConverter which handles String, Serializable and passes everything else as byte[].
The second converter converts the message payload to the method parameter type (if necessary).
With method-level @RabbitListeners there is a tight binding between the handler and the method.
With class-level @RabbitListener s, the message payload from the first conversion is used to select which method to invoke. Only then, is the argument conversion attempted.
This mechanism works fine with Java Serializable objects since the payload has already been converted before the method is selected.
However, with JSON, the first conversion returns a byte[] and hence we find no matching @RabbitHandler.
We need a mechanism such that the first converter is settable so that the payload is converted early enough in the pipeline to select the appropriate handler method.
A ContentTypeDelegatingMessageConverter is probably most appropriate.
And, as stated in AMQP-574, we need to clearly document the conversion needs for a @RabbitListener, especially when using JSON or a custom conversion.
得嘞,官方示例果然是坑,試試大神的解決方案,手動新增下轉換。
@Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMessageConverter(new Jackson2JsonMessageConverter()); return template; } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; }
然后在生產和消費信息的地方使用他們:
@RabbitListener(queues = "merchant", containerFactory="rabbitListenerContainerFactory") public void process(@Payload UpdateMerchant request) { UpdateMerchantResponse response = new UpdateMerchantResponse(); logger.info(request.getMerchantId() + "->" + response.getReturnCode()); }
再來一次,果然可以了
c.l.s.m.service.MQReceiver : 00000001->null
