ActiveMQ producer同步/異步發送消息


http://activemq.apache.org/async-sends.html

producer發送消息有同步和異步兩種模式,可以通過代碼配置:

((ActiveMQConnection)connection).setUseAsyncSend(true);

producer默認是異步發送消息。在沒有開啟事務的情況下,producer發送持久化消息是同步的,調用send會阻塞直到broker把消息保存到磁盤並返回確認。

消息設置為持久:

MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);

消息設置為非持久:

MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

producer發送消息的調用棧如下:

// ActiveMQSession
protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, 
                    Message message, int deliveryMode, int priority, long timeToLive,
                    MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
    // 省略其他代碼
    // 消息的持久類型和和連接模式是或的:所以只要connection配置為異步,就走異步發送
    if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() 
            && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
        this.connection.asyncSendPacket(msg);
        if (producerWindow != null) {
            int size = msg.getSize();
            producerWindow.increaseUsage(size);
        }
    } else { // 同步發送
        if (sendTimeout > 0 && onComplete==null) {
            this.connection.syncSendPacket(msg,sendTimeout);
        }else {
            this.connection.syncSendPacket(msg, onComplete);
        }
    }
}

 

producer發送同步消息的調用棧:

// org.apache.activemq.transport.ResponseCorrelator
public Object request(Object command) throws IOException {
    FutureResponse response = asyncRequest(command, null);
    return response.getResult();
}

public FutureResponse asyncRequest(Object o, ResponseCallback responseCallback) throws IOException {
    Command command = (Command) o;
    command.setCommandId(sequenceGenerator.getNextSequenceId());
    // 需要回復
    command.setResponseRequired(true);
    FutureResponse future = new FutureResponse(responseCallback);
    IOException priorError = null;
    synchronized (requestMap) {
        priorError = this.error;
        if (priorError == null) {
            requestMap.put(new Integer(command.getCommandId()), future);
        }
    }

    if (priorError != null) {
        future.set(new ExceptionResponse(priorError));
        throw priorError;
    }

    next.oneway(command);
    return future;
}

 

producer發送異步消息的調用棧:

//org.apache.activemq.transport.ResponseCorrelator
public void oneway(Object o) throws IOException {
    Command command = (Command)o;
    command.setCommandId(sequenceGenerator.getNextSequenceId());
    // 不需要回復
    command.setResponseRequired(false);
    next.oneway(command);
}

 

在不考慮事務的情況下:

producer發送持久化消息是同步發送,發送是阻塞的,直到收到確認。同步發送肯定是有流量控制的。

producer默認是異步發送,異步發送不會等待broker的確認, 所以就需要考慮流量控制了:

ActiveMQConnectionFactory.setProducerWindowSize(int producerWindowSize)

ProducerWindowSize的含義:producer每發送一個消息,統計一下發送的字節數,當字節數達到ProducerWindowSize值時,需要等待broker的確認,才能繼續發送。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM