http://activemq.apache.org/async-sends.html
producer發送消息有同步和異步兩種模式,可以通過代碼配置:
((ActiveMQConnection)connection).setUseAsyncSend(true);
producer默認是異步發送消息。在沒有開啟事務的情況下,producer發送持久化消息是同步的,調用send會阻塞直到broker把消息保存到磁盤並返回確認。
消息設置為持久:
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
消息設置為非持久:
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer發送消息的調用棧如下:
// ActiveMQSession protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive, MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException { // 省略其他代碼 // 消息的持久類型和和連接模式是或的:所以只要connection配置為異步,就走異步發送 if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) { this.connection.asyncSendPacket(msg); if (producerWindow != null) { int size = msg.getSize(); producerWindow.increaseUsage(size); } } else { // 同步發送 if (sendTimeout > 0 && onComplete==null) { this.connection.syncSendPacket(msg,sendTimeout); }else { this.connection.syncSendPacket(msg, onComplete); } } }
producer發送同步消息的調用棧:
// org.apache.activemq.transport.ResponseCorrelator public Object request(Object command) throws IOException { FutureResponse response = asyncRequest(command, null); return response.getResult(); } public FutureResponse asyncRequest(Object o, ResponseCallback responseCallback) throws IOException { Command command = (Command) o; command.setCommandId(sequenceGenerator.getNextSequenceId()); // 需要回復 command.setResponseRequired(true); FutureResponse future = new FutureResponse(responseCallback); IOException priorError = null; synchronized (requestMap) { priorError = this.error; if (priorError == null) { requestMap.put(new Integer(command.getCommandId()), future); } } if (priorError != null) { future.set(new ExceptionResponse(priorError)); throw priorError; } next.oneway(command); return future; }
producer發送異步消息的調用棧:
//org.apache.activemq.transport.ResponseCorrelator public void oneway(Object o) throws IOException { Command command = (Command)o; command.setCommandId(sequenceGenerator.getNextSequenceId()); // 不需要回復 command.setResponseRequired(false); next.oneway(command); }
在不考慮事務的情況下:
producer發送持久化消息是同步發送,發送是阻塞的,直到收到確認。同步發送肯定是有流量控制的。
producer默認是異步發送,異步發送不會等待broker的確認, 所以就需要考慮流量控制了:
ActiveMQConnectionFactory.setProducerWindowSize(int producerWindowSize)
ProducerWindowSize的含義:producer每發送一個消息,統計一下發送的字節數,當字節數達到ProducerWindowSize值時,需要等待broker的確認,才能繼續發送。