springboot集成使用rabbitmq筆記(3.基本過程)


  1. 使用rabbitmq筆記一
  2. 使用rabbitmq筆記二
  3. 使用rabbitmq筆記三

1.AMQP協議

AMQP 0-9-1的工作過程如下圖:消息(message)被發布者(publisher)發送給交換機(exchange),交換機常常被比喻成郵局或者郵箱。然后交換機將收到的消息根據路由規則分發給綁定的隊列(queue)。最后AMQP代理會將消息投遞給訂閱了此隊列的消費者,或者消費者按照需求自行獲取。

2.相關組件

Connection:用於權限控制的虛擬主機,一個虛擬主機可以包含多個交換機、隊列、綁定,可以設置不同賬號,不同的權限
Channel:消息通道
Exchange:交換機,生產者將消息發送至交換機
Queue:隊列,用於存儲消息
Binding:綁定關系,綁定交換機與隊列的關系

3.各個組件的創建

spring-boot-autoconfiguer中關於amqp的自動配置

3.1.關於我們自定義組件的初始化

①Exchange的初始

                    public class TopicExchange extends AbstractExchange {
                        //交換機名稱,父類AbstractExchange中的屬性
                        private final String name;
                        //是否持久化(默認true,重啟服務后依然存在),父類AbstractExchange中的屬性
                        private final boolean durable;
                        //是否自動刪除(默認false,長時間不用自動刪除),父類AbstractExchange中的屬性
                        private final boolean autoDelete;
                        //參數
                        private final Map<String, Object> arguments;
                        //是否延遲類型,true 發送消息的時候需要額外添加header().
                        //注意可能會報異常( unknown exchange type 'x-delayed-message')異常處理方法
                        private volatile boolean delayed;
                        //是否內部使用,若內部使用則客戶端不能發送消息
                        private boolean internal;
                        public TopicExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
                            super(name, durable, autoDelete, arguments);
                        }

                        @Override
                        public final String getType() {
                            return ExchangeTypes.TOPIC;
                        }
                    }

②Queue的初始

                    public class Queue extends AbstractDeclarable {
                        //隊列名稱
                        private final String name;
                        //是否持久化
                        private final boolean durable;
                        //是否聲明該隊列是否為連接獨占,若為獨占,連接關閉后隊列即被刪除
                        private final boolean exclusive;
                        //是否自動刪除,若沒有消費者訂閱該隊列,隊列將被刪除
                        private final boolean autoDelete;
                        //參數,可以指定隊列長度,消息生存時間等隊列的設置
                        private final java.util.Map<java.lang.String, java.lang.Object> arguments;
                        public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) {
                        Assert.notNull(name, "'name' cannot be null");
                        this.name = name;
                        this.durable = durable;
                        this.exclusive = exclusive;
                        this.autoDelete = autoDelete;
                        this.arguments = arguments;
                        }
                    }

③Binding的初始

                    public class Binding extends AbstractDeclarable {
                        //綁定至隊列或交換機
                        public enum DestinationType {
                            QUEUE, EXCHANGE;
                        }
                        //隊列或交換機名稱
                        private final String destination;
                        //交換機名稱
                        private final String exchange;
                        //綁定的路由
                        private final String routingKey;
                        //參數
                        private final Map<String, Object> arguments;
                        //綁定至隊列或交換機
                        private final DestinationType destinationType;

                        public Binding(String destination, DestinationType destinationType, String exchange, String routingKey,
                                Map<String, Object> arguments) {
                            this.destination = destination;
                            this.destinationType = destinationType;
                            this.exchange = exchange;
                            this.routingKey = routingKey;
                            this.arguments = arguments;
                        }
                    }

 

3.2.Connection的創建,CachingConnectionFactory定義相關屬性及緩存連接

                @Configuration
                @ConditionalOnMissingBean(ConnectionFactory.class)
                protected static class RabbitConnectionFactoryCreator {
                    @Bean
                    public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties config)
                            throws Exception {
                        RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean();
                        if (config.determineHost() != null) {
                            factory.setHost(config.determineHost());
                        }
                        factory.setPort(config.determinePort());
                        if (config.determineUsername() != null) {
                            factory.setUsername(config.determineUsername());
                        }
                        if (config.determinePassword() != null) {
                            factory.setPassword(config.determinePassword());
                        }
                        if (config.determineVirtualHost() != null) {
                            factory.setVirtualHost(config.determineVirtualHost());
                        }
                        if (config.getRequestedHeartbeat() != null) {
                            factory.setRequestedHeartbeat(config.getRequestedHeartbeat());
                        }
                        RabbitProperties.Ssl ssl = config.getSsl();
                        if (ssl.isEnabled()) {
                            factory.setUseSSL(true);
                            if (ssl.getAlgorithm() != null) {
                                factory.setSslAlgorithm(ssl.getAlgorithm());
                            }
                            factory.setKeyStore(ssl.getKeyStore());
                            factory.setKeyStorePassphrase(ssl.getKeyStorePassword());
                            factory.setTrustStore(ssl.getTrustStore());
                            factory.setTrustStorePassphrase(ssl.getTrustStorePassword());
                        }
                        if (config.getConnectionTimeout() != null) {
                            factory.setConnectionTimeout(config.getConnectionTimeout());
                        }
                        factory.afterPropertiesSet();
                        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
                                factory.getObject());
                        connectionFactory.setAddresses(config.determineAddresses());
                        connectionFactory.setPublisherConfirms(config.isPublisherConfirms());
                        connectionFactory.setPublisherReturns(config.isPublisherReturns());
                        //緩存通道數量,若未配置則默認為25
                        if (config.getCache().getChannel().getSize() != null) {
                            connectionFactory
                                    .setChannelCacheSize(config.getCache().getChannel().getSize());
                        }
                        //緩存模式,分為兩種,1.緩存連接即connection模式,2.緩存通道即channel模式,未配置的默認模式。
                        //connection模式緩存多個Connection,可以配置緩存連接大小,channel模式只有一個connection,緩存多個channel,可以配置
                        if (config.getCache().getConnection().getMode() != null) {
                            connectionFactory
                                    .setCacheMode(config.getCache().getConnection().getMode());
                        }
                        //連接數,默認一個
                        if (config.getCache().getConnection().getSize() != null) {
                            connectionFactory.setConnectionCacheSize(
                                    config.getCache().getConnection().getSize());
                        }
                        //設置獲取通道時(緩存的通道都被使用了)等待的毫秒數,默認為0,為0時創建新的通道
                        if (config.getCache().getChannel().getCheckoutTimeout() != null) {
                            connectionFactory.setChannelCheckoutTimeout(
                                    config.getCache().getChannel().getCheckoutTimeout());
                        }
                        return connectionFactory;
                    }
                }

3.3.RabbitTemplate的創建,用於發送及接受消息,當我們自己需要發送消息及接收消息時可以注入此對象

                @Bean
                @ConditionalOnSingleCandidate(ConnectionFactory.class)
                @ConditionalOnMissingBean(RabbitTemplate.class)
                public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
                    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
                    MessageConverter messageConverter = this.messageConverter.getIfUnique();
                    //設置消息轉換器,可以自定義
                    if (messageConverter != null) {
                        rabbitTemplate.setMessageConverter(messageConverter);
                    }
                    rabbitTemplate.setMandatory(determineMandatoryFlag());
                    RabbitProperties.Template templateProperties = this.properties.getTemplate();
                    RabbitProperties.Retry retryProperties = templateProperties.getRetry();
                    //是否開啟重試,默認false,可配置(spring-retry)
                    if (retryProperties.isEnabled()) {
                        rabbitTemplate.setRetryTemplate(createRetryTemplate(retryProperties));
                    }
                    //接收超時,默認0
                    if (templateProperties.getReceiveTimeout() != null) {
                        rabbitTemplate.setReceiveTimeout(templateProperties.getReceiveTimeout());
                    }
                    //回復超時,默認5000
                    if (templateProperties.getReplyTimeout() != null) {
                        rabbitTemplate.setReplyTimeout(templateProperties.getReplyTimeout());
                    }
                    return rabbitTemplate;
                }

3.4.RabbitAdmin的創建

Spring 容器中獲取 exchange、Bingding、routingkey 以及queue 的 @bean 聲明,然后使用 rabbitTemplate 的 execute 方法進行執行對應的聲明、修改、刪除等一系列 rabbitMQ 基礎功能操作。例如添加交換機、刪除一個綁定、清空一個隊列里的消息等等

                @Bean
                @ConditionalOnSingleCandidate(ConnectionFactory.class)
                @ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
                @ConditionalOnMissingBean(AmqpAdmin.class)
                public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
                    return new RabbitAdmin(connectionFactory);
                }

其實現了諸多借口RabbitAdmin implements AmqpAdmin, ApplicationContextAware, ApplicationEventPublisherAware,InitializingBean
其afterPropertiesSet方法就是在 我們的 bean 加載后進行一些設置,其主要方法為其中的initialize方法

                public void afterPropertiesSet() {
                    synchronized (this.lifecycleMonitor) {
                        //........略...........
                                    initialize();
                            //.......略......................
                }
                public void initialize() {
                    //applicationContext為空直接返回
                    if (this.applicationContext == null) {
                        this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings");
                        return;
                    }

                    this.logger.debug("Initializing declarations");
                    //從spring容器中獲取我們定義的exchange,queue,binding對象
                    Collection<Exchange> contextExchanges = new LinkedList<Exchange>(
                            this.applicationContext.getBeansOfType(Exchange.class).values());
                    Collection<Queue> contextQueues = new LinkedList<Queue>(
                            this.applicationContext.getBeansOfType(Queue.class).values());
                    Collection<Binding> contextBindings = new LinkedList<Binding>(
                            this.applicationContext.getBeansOfType(Binding.class).values());

                    @SuppressWarnings("rawtypes")
                    Collection<Collection> collections = this.applicationContext.getBeansOfType(Collection.class, false, false)
                            .values();
                    for (Collection<?> collection : collections) {
                        if (collection.size() > 0 && collection.iterator().next() instanceof Declarable) {
                            for (Object declarable : collection) {
                                if (declarable instanceof Exchange) {
                                    contextExchanges.add((Exchange) declarable);
                                }
                                else if (declarable instanceof Queue) {
                                    contextQueues.add((Queue) declarable);
                                }
                                else if (declarable instanceof Binding) {
                                    contextBindings.add((Binding) declarable);
                                }
                            }
                        }
                    }
                    //過濾三組件
                    final Collection<Exchange> exchanges = filterDeclarables(contextExchanges);
                    final Collection<Queue> queues = filterDeclarables(contextQueues);
                    final Collection<Binding> bindings = filterDeclarables(contextBindings);
                    //Exchange,Queue為非持久化,自動刪除則打印日志
                    for (Exchange exchange : exchanges) {
                        if ((!exchange.isDurable() || exchange.isAutoDelete())  && this.logger.isInfoEnabled()) {
                            this.logger.info("Auto-declaring a non-durable or auto-delete Exchange ("
                                    + exchange.getName()
                                    + ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". "
                                    + "It will be deleted by the broker if it shuts down, and can be redeclared by closing and "
                                    + "reopening the connection.");
                        }
                    }

                    for (Queue queue : queues) {
                        if ((!queue.isDurable() || queue.isAutoDelete() || queue.isExclusive()) && this.logger.isInfoEnabled()) {
                            this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue ("
                                    + queue.getName()
                                    + ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:"
                                    + queue.isExclusive() + ". "
                                    + "It will be redeclared if the broker stops and is restarted while the connection factory is "
                                    + "alive, but all messages will be lost.");
                        }
                    }
                    //若三組件都沒有直接返回
                    if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0) {
                        this.logger.debug("Nothing to declare");
                        return;
                    }
                    //使用rabbitTemplate連接至服務端創建
                    this.rabbitTemplate.execute(new ChannelCallback<Object>() {
                        @Override
                        public Object doInRabbit(Channel channel) throws Exception {
                            declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
                            declareQueues(channel, queues.toArray(new Queue[queues.size()]));
                            declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
                            return null;
                        }
                    });
                    this.logger.debug("Declarations finished");
                }

4.發送消息及接收過程

4.1.rabbitTemplate.send方法
                public void send(final String exchange, final String routingKey,
                        final Message message, final CorrelationData correlationData)
                        throws AmqpException {
                    execute(new ChannelCallback<Object>() {

                        @Override
                        public Object doInRabbit(Channel channel) throws Exception {
                            //此方法中調用channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, messageToUse.getBody());
                            doSend(channel, exchange, routingKey, message, RabbitTemplate.this.returnCallback != null
                                    && RabbitTemplate.this.mandatoryExpression.getValue(
                                            RabbitTemplate.this.evaluationContext, message, Boolean.class),
                                    correlationData);
                            return null;
                        }
                    }, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));
                }
                protected void doSend(Channel channel, String exchange, String routingKey, Message message,
                        //。。。。。。。。略
                    BasicProperties convertedMessageProperties = this.messagePropertiesConverter
                            .fromMessageProperties(messageProperties, this.encoding);
                    channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, messageToUse.getBody());
                    //。。。。。。。。略
                }

4.2.CachingConnectionFactory.CachedChannelInvocationHandler.invoke()。動態代理

                public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                    String methodName = method.getName();
                    //if methodName為其他.進行操作后 return。省略。。
                    try {
                        if (this.target == null || !this.target.isOpen()) {
                            if (this.target instanceof PublisherCallbackChannel) {
                                this.target.close();
                                throw new InvocationTargetException(new AmqpException("PublisherCallbackChannel is closed"));
                            }
                            else if (this.txStarted) {
                                this.txStarted = false;
                                throw new IllegalStateException("Channel closed during transaction");
                            }
                            this.target = null;
                        }
                        synchronized (this.targetMonitor) {
                            if (this.target == null) {
                                this.target = createBareChannel(this.theConnection, this.transactional);
                            }
                            Object result = method.invoke(this.target, args);
                            if (this.transactional) {
                                if (txStarts.contains(methodName)) {
                                    this.txStarted = true;
                                }
                                else if (txEnds.contains(methodName)) {
                                    this.txStarted = false;
                                }
                            }
                            return result;
                        }
                    }
                    //異常處理,省略。。
                }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM