1.AMQP協議
AMQP 0-9-1的工作過程如下圖:消息(message)被發布者(publisher)發送給交換機(exchange),交換機常常被比喻成郵局或者郵箱。然后交換機將收到的消息根據路由規則分發給綁定的隊列(queue)。最后AMQP代理會將消息投遞給訂閱了此隊列的消費者,或者消費者按照需求自行獲取。
2.相關組件
Connection:用於權限控制的虛擬主機,一個虛擬主機可以包含多個交換機、隊列、綁定,可以設置不同賬號,不同的權限
Channel:消息通道
Exchange:交換機,生產者將消息發送至交換機
Queue:隊列,用於存儲消息
Binding:綁定關系,綁定交換機與隊列的關系
3.各個組件的創建
spring-boot-autoconfiguer中關於amqp的自動配置
3.1.關於我們自定義組件的初始化
①Exchange的初始
public class TopicExchange extends AbstractExchange { //交換機名稱,父類AbstractExchange中的屬性 private final String name; //是否持久化(默認true,重啟服務后依然存在),父類AbstractExchange中的屬性 private final boolean durable; //是否自動刪除(默認false,長時間不用自動刪除),父類AbstractExchange中的屬性 private final boolean autoDelete; //參數 private final Map<String, Object> arguments; //是否延遲類型,true 發送消息的時候需要額外添加header(). //注意可能會報異常( unknown exchange type 'x-delayed-message')異常處理方法 private volatile boolean delayed; //是否內部使用,若內部使用則客戶端不能發送消息 private boolean internal; public TopicExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) { super(name, durable, autoDelete, arguments); } @Override public final String getType() { return ExchangeTypes.TOPIC; } }
②Queue的初始
public class Queue extends AbstractDeclarable { //隊列名稱 private final String name; //是否持久化 private final boolean durable; //是否聲明該隊列是否為連接獨占,若為獨占,連接關閉后隊列即被刪除 private final boolean exclusive; //是否自動刪除,若沒有消費者訂閱該隊列,隊列將被刪除 private final boolean autoDelete; //參數,可以指定隊列長度,消息生存時間等隊列的設置 private final java.util.Map<java.lang.String, java.lang.Object> arguments; public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) { Assert.notNull(name, "'name' cannot be null"); this.name = name; this.durable = durable; this.exclusive = exclusive; this.autoDelete = autoDelete; this.arguments = arguments; } }
③Binding的初始
public class Binding extends AbstractDeclarable { //綁定至隊列或交換機 public enum DestinationType { QUEUE, EXCHANGE; } //隊列或交換機名稱 private final String destination; //交換機名稱 private final String exchange; //綁定的路由 private final String routingKey; //參數 private final Map<String, Object> arguments; //綁定至隊列或交換機 private final DestinationType destinationType; public Binding(String destination, DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments) { this.destination = destination; this.destinationType = destinationType; this.exchange = exchange; this.routingKey = routingKey; this.arguments = arguments; } }
3.2.Connection的創建,CachingConnectionFactory定義相關屬性及緩存連接
@Configuration @ConditionalOnMissingBean(ConnectionFactory.class) protected static class RabbitConnectionFactoryCreator { @Bean public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties config) throws Exception { RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean(); if (config.determineHost() != null) { factory.setHost(config.determineHost()); } factory.setPort(config.determinePort()); if (config.determineUsername() != null) { factory.setUsername(config.determineUsername()); } if (config.determinePassword() != null) { factory.setPassword(config.determinePassword()); } if (config.determineVirtualHost() != null) { factory.setVirtualHost(config.determineVirtualHost()); } if (config.getRequestedHeartbeat() != null) { factory.setRequestedHeartbeat(config.getRequestedHeartbeat()); } RabbitProperties.Ssl ssl = config.getSsl(); if (ssl.isEnabled()) { factory.setUseSSL(true); if (ssl.getAlgorithm() != null) { factory.setSslAlgorithm(ssl.getAlgorithm()); } factory.setKeyStore(ssl.getKeyStore()); factory.setKeyStorePassphrase(ssl.getKeyStorePassword()); factory.setTrustStore(ssl.getTrustStore()); factory.setTrustStorePassphrase(ssl.getTrustStorePassword()); } if (config.getConnectionTimeout() != null) { factory.setConnectionTimeout(config.getConnectionTimeout()); } factory.afterPropertiesSet(); CachingConnectionFactory connectionFactory = new CachingConnectionFactory( factory.getObject()); connectionFactory.setAddresses(config.determineAddresses()); connectionFactory.setPublisherConfirms(config.isPublisherConfirms()); connectionFactory.setPublisherReturns(config.isPublisherReturns()); //緩存通道數量,若未配置則默認為25 if (config.getCache().getChannel().getSize() != null) { connectionFactory .setChannelCacheSize(config.getCache().getChannel().getSize()); } //緩存模式,分為兩種,1.緩存連接即connection模式,2.緩存通道即channel模式,未配置的默認模式。 //connection模式緩存多個Connection,可以配置緩存連接大小,channel模式只有一個connection,緩存多個channel,可以配置 if (config.getCache().getConnection().getMode() != null) { connectionFactory .setCacheMode(config.getCache().getConnection().getMode()); } //連接數,默認一個 if (config.getCache().getConnection().getSize() != null) { connectionFactory.setConnectionCacheSize( config.getCache().getConnection().getSize()); } //設置獲取通道時(緩存的通道都被使用了)等待的毫秒數,默認為0,為0時創建新的通道 if (config.getCache().getChannel().getCheckoutTimeout() != null) { connectionFactory.setChannelCheckoutTimeout( config.getCache().getChannel().getCheckoutTimeout()); } return connectionFactory; } }
3.3.RabbitTemplate的創建,用於發送及接受消息,當我們自己需要發送消息及接收消息時可以注入此對象
@Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnMissingBean(RabbitTemplate.class) public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); MessageConverter messageConverter = this.messageConverter.getIfUnique(); //設置消息轉換器,可以自定義 if (messageConverter != null) { rabbitTemplate.setMessageConverter(messageConverter); } rabbitTemplate.setMandatory(determineMandatoryFlag()); RabbitProperties.Template templateProperties = this.properties.getTemplate(); RabbitProperties.Retry retryProperties = templateProperties.getRetry(); //是否開啟重試,默認false,可配置(spring-retry) if (retryProperties.isEnabled()) { rabbitTemplate.setRetryTemplate(createRetryTemplate(retryProperties)); } //接收超時,默認0 if (templateProperties.getReceiveTimeout() != null) { rabbitTemplate.setReceiveTimeout(templateProperties.getReceiveTimeout()); } //回復超時,默認5000 if (templateProperties.getReplyTimeout() != null) { rabbitTemplate.setReplyTimeout(templateProperties.getReplyTimeout()); } return rabbitTemplate; }
3.4.RabbitAdmin的創建
Spring 容器中獲取 exchange、Bingding、routingkey 以及queue 的 @bean 聲明,然后使用 rabbitTemplate 的 execute 方法進行執行對應的聲明、修改、刪除等一系列 rabbitMQ 基礎功能操作。例如添加交換機、刪除一個綁定、清空一個隊列里的消息等等
@Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true) @ConditionalOnMissingBean(AmqpAdmin.class) public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); }
其實現了諸多借口RabbitAdmin implements AmqpAdmin, ApplicationContextAware, ApplicationEventPublisherAware,InitializingBean
其afterPropertiesSet方法就是在 我們的 bean 加載后進行一些設置,其主要方法為其中的initialize方法
public void afterPropertiesSet() { synchronized (this.lifecycleMonitor) { //........略........... initialize(); //.......略...................... }
public void initialize() { //applicationContext為空直接返回 if (this.applicationContext == null) { this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings"); return; } this.logger.debug("Initializing declarations"); //從spring容器中獲取我們定義的exchange,queue,binding對象 Collection<Exchange> contextExchanges = new LinkedList<Exchange>( this.applicationContext.getBeansOfType(Exchange.class).values()); Collection<Queue> contextQueues = new LinkedList<Queue>( this.applicationContext.getBeansOfType(Queue.class).values()); Collection<Binding> contextBindings = new LinkedList<Binding>( this.applicationContext.getBeansOfType(Binding.class).values()); @SuppressWarnings("rawtypes") Collection<Collection> collections = this.applicationContext.getBeansOfType(Collection.class, false, false) .values(); for (Collection<?> collection : collections) { if (collection.size() > 0 && collection.iterator().next() instanceof Declarable) { for (Object declarable : collection) { if (declarable instanceof Exchange) { contextExchanges.add((Exchange) declarable); } else if (declarable instanceof Queue) { contextQueues.add((Queue) declarable); } else if (declarable instanceof Binding) { contextBindings.add((Binding) declarable); } } } } //過濾三組件 final Collection<Exchange> exchanges = filterDeclarables(contextExchanges); final Collection<Queue> queues = filterDeclarables(contextQueues); final Collection<Binding> bindings = filterDeclarables(contextBindings); //Exchange,Queue為非持久化,自動刪除則打印日志 for (Exchange exchange : exchanges) { if ((!exchange.isDurable() || exchange.isAutoDelete()) && this.logger.isInfoEnabled()) { this.logger.info("Auto-declaring a non-durable or auto-delete Exchange (" + exchange.getName() + ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". " + "It will be deleted by the broker if it shuts down, and can be redeclared by closing and " + "reopening the connection."); } } for (Queue queue : queues) { if ((!queue.isDurable() || queue.isAutoDelete() || queue.isExclusive()) && this.logger.isInfoEnabled()) { this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue (" + queue.getName() + ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:" + queue.isExclusive() + ". " + "It will be redeclared if the broker stops and is restarted while the connection factory is " + "alive, but all messages will be lost."); } } //若三組件都沒有直接返回 if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0) { this.logger.debug("Nothing to declare"); return; } //使用rabbitTemplate連接至服務端創建 this.rabbitTemplate.execute(new ChannelCallback<Object>() { @Override public Object doInRabbit(Channel channel) throws Exception { declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()])); declareQueues(channel, queues.toArray(new Queue[queues.size()])); declareBindings(channel, bindings.toArray(new Binding[bindings.size()])); return null; } }); this.logger.debug("Declarations finished"); }
4.發送消息及接收過程
4.1.rabbitTemplate.send方法
public void send(final String exchange, final String routingKey, final Message message, final CorrelationData correlationData) throws AmqpException { execute(new ChannelCallback<Object>() { @Override public Object doInRabbit(Channel channel) throws Exception { //此方法中調用channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, messageToUse.getBody()); doSend(channel, exchange, routingKey, message, RabbitTemplate.this.returnCallback != null && RabbitTemplate.this.mandatoryExpression.getValue( RabbitTemplate.this.evaluationContext, message, Boolean.class), correlationData); return null; } }, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message)); } protected void doSend(Channel channel, String exchange, String routingKey, Message message, //。。。。。。。。略 BasicProperties convertedMessageProperties = this.messagePropertiesConverter .fromMessageProperties(messageProperties, this.encoding); channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, messageToUse.getBody()); //。。。。。。。。略 }
4.2.CachingConnectionFactory.CachedChannelInvocationHandler.invoke()。動態代理
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); //if methodName為其他.進行操作后 return。省略。。 try { if (this.target == null || !this.target.isOpen()) { if (this.target instanceof PublisherCallbackChannel) { this.target.close(); throw new InvocationTargetException(new AmqpException("PublisherCallbackChannel is closed")); } else if (this.txStarted) { this.txStarted = false; throw new IllegalStateException("Channel closed during transaction"); } this.target = null; } synchronized (this.targetMonitor) { if (this.target == null) { this.target = createBareChannel(this.theConnection, this.transactional); } Object result = method.invoke(this.target, args); if (this.transactional) { if (txStarts.contains(methodName)) { this.txStarted = true; } else if (txEnds.contains(methodName)) { this.txStarted = false; } } return result; } } //異常處理,省略。。 }