持久化消息和非持久化消息的發送策略:消息同步發送和異步發送
ActiveMQ支持同步、異步兩種發送模式將消息發送到broker上。同步發送過程中,發送者發送一條消息會阻塞直到broker反饋一個確認消息,表示消息已經被broker處理。這個機制提供了消息的安全性保障,但是由於是阻塞的操作,會影響到客戶端消息發送的性能。異步發送的過程中,發送者不需要等待broker提供反饋,所以性能相對較高。但是可能會出現消息丟失的情況。所以使用異步發送的前提是在某些情況下允許出現數據丟失的情況。
默認情況下,非持久化消息是異步發送的,持久化消息並且是在非事務模式下是同步發送的。但是在開啟事務的情況下,消息都是異步發送。由於異步發送的效率會比同步發送性能更高。所以在發送持久化消息的時候,盡量去開啟事務會話。除了持久化消息和非持久化消息的同步和異步特性以外,我們還可以通過以下幾種方式來設置異步發送:
1.ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.11.153:61616?jms.useAsyncSend=true");
2.((ActiveMQConnectionFactory) connectionFactory).setUseAsyncSend(true);
3.((ActiveMQConnection)connection).setUseAsyncSend(true);
消息發送的源碼:
以producer.send為入口 進入的是ActiveMQSession 實現:
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException {
checkClosed(); //檢查session的狀態,如果session關閉則拋異常
if (destination == null) {
if (info.getDestination() == null) {
throw new UnsupportedOperationException("A destination must be specified.");
}
throw new InvalidDestinationException("Don't understand null destinations");
}
//檢查destination的類型,如果符合要求,就轉變為ActiveMQDestination
ActiveMQDestination dest;
if (destination.equals(info.getDestination())) {
dest = (ActiveMQDestination)destination;
} else if (info.getDestination() == null) {
dest = ActiveMQDestination.transform(destination);
} else {
throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName());
}
if (dest == null) {
throw new JMSException("No destination specified");
}
if (transformer != null) {
Message transformedMessage = transformer.producerTransform(session, this, message);
if (transformedMessage != null) {
message = transformedMessage;
}
}
//如果發送窗口大小不為空,則判斷發送窗口的大小決定是否阻塞
if (producerWindow != null) {
try {
producerWindow.waitForSpace();
} catch (InterruptedException e) {
throw new JMSException("Send aborted due to thread interrupt.");
}
}
//發送消息到broker的topic
this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete);
stats.onMessage();
}
ActiveMQSession的send方法,this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete):
protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
checkClosed();
if (destination.isTemporary() && connection.isDeleted(destination)) {
throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
}
//互斥鎖,如果一個session的多個producer發送消息到這里,會保證消息發送的有序性
synchronized (sendMutex) {
// tell the Broker we are about to start a new transaction
doStartTransaction();//告訴broker開始一個新事務,只有事務型會話中才會開啟
TransactionId txid = transactionContext.getTransactionId();//從事務上下文中獲取事務id
long sequenceNumber = producer.getMessageSequence();
//Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
message.setJMSDeliveryMode(deliveryMode); //在JMS協議頭中設置是否持久化標識
long expiration = 0L;//計算消息過期時間
if (!producer.getDisableMessageTimestamp()) {
long timeStamp = System.currentTimeMillis();
message.setJMSTimestamp(timeStamp);
if (timeToLive > 0) {
expiration = timeToLive + timeStamp;
}
}
message.setJMSExpiration(expiration);//設置消息過期時間
message.setJMSPriority(priority);//設置消息的優先級
message.setJMSRedelivered(false);;//設置消息為非重發
// transform to our own message format here
ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
msg.setDestination(destination);
msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
// Set the message id.
if (msg != message) {//如果消息是經過轉化的,則更新原來的消息id和目的地
message.setJMSMessageID(msg.getMessageId().toString());
// Make sure the JMS destination is set on the foreign messages too.
message.setJMSDestination(destination);
}
//clear the brokerPath in case we are re-sending this message
msg.setBrokerPath(null);
msg.setTransactionId(txid);
if (connection.isCopyMessageOnSend()) {
msg = (ActiveMQMessage)msg.copy();
}
msg.setConnection(connection);
msg.onSend();//把消息屬性和消息體都設置為只讀,防止被修改
msg.setProducerId(msg.getMessageId().getProducerId());
if (LOG.isTraceEnabled()) {
LOG.trace(getSessionId() + " sending message: " + msg);
}
//如果onComplete沒有設置(這里傳進來就是null),且發送超時時間小於0,且消息不需要反饋,且連接器不是同步發送模式,且消息非持久化或者連接器是異步發送模式
//或者存在事務id的情況下,走異步發送,否則走同步發送
if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
this.connection.asyncSendPacket(msg);
if (producerWindow != null) {
int size = msg.getSize();//異步發送的情況下,需要設置producerWindow的大小
producerWindow.increaseUsage(size);
}
} else {
if (sendTimeout > 0 && onComplete==null) {
this.connection.syncSendPacket(msg,sendTimeout);//帶超時時間的同步發送
}else {
this.connection.syncSendPacket(msg, onComplete);//帶回調的同步發送
}
}
}
}
我們從上面的代碼可以看到,在執行發送操作之前需要把消息做一個轉化,並且將我們設置的一些屬性注入導指定的屬性中,我們先來看看異步發送,會發現異步發送的時候涉及到producerWindowSize的大小:
ProducerWindowSize的含義
producer每發送一個消息,統計一下發送的字節數,當字節數達到ProducerWindowSize值時,需要等待broker的確認,才能繼續發送。
主要用來約束在異步發送時producer端允許積壓的(尚未ACK)的消息的大小,且只對異步發送有意義。每次發送消息之后,都將會導致memoryUsage大小增加(+message.size),當broker返回producerAck時,memoryUsage尺寸減少(producerAck.size,此size表示先前發送消息的大小)。
可以通過如下2種方式設置:
Ø 在brokerUrl中設置: "tcp://localhost:61616?jms.producerWindowSize=1048576",這種設置將會對所有的producer生效。
Ø 在destinationUri中設置: "myQueue?producer.windowSize=1048576",此參數只會對使用此Destination實例的producer生效,將會覆蓋brokerUrl中的producerWindowSize值。
注意:此值越大,意味着消耗Client端的內存就越大。
接下去我們進入異步發送流程,看看消息是怎么異步發送的this.connection.asyncSendPacket(msg):
private void doAsyncSendPacket(Command command) throws JMSException {
try {
this.transport.oneway(command);
} catch (IOException e) {
throw JMSExceptionSupport.create(e);
}
}
這里的 Command 其實就是之前一步所轉化的message ,並且經過一系列的屬性注入。因為ActiveMQMessage 繼承了 baseCommand ,該類實現了 Command 。所以可以轉化,然后我們發現 oneway 方法又很多的實現,都是基於 transport ,那么我們就需要來看看這個 transport 是什么。這里我們把代碼往前翻並沒有發現他的初始化,按照我們以往的思路,這里就會在初始化連接的時候進行初始化該對象:
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.254.135:61616");
Connection connection= connectionFactory.createConnection();
這里進入 ActiveMQConnectionFactory 的 createConnection方法會來到:
protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
if (brokerURL == null) {
throw new ConfigurationException("brokerURL not set.");
}
ActiveMQConnection connection = null;
try {// 果然發現了這個東東的初始化
Transport transport = createTransport();
// 創建連接
connection = createActiveMQConnection(transport, factoryStats);
// 設置用戶密碼
connection.setUserName(userName);
connection.setPassword(password);
// 對連接做包裝
configureConnection(connection);
// 啟動一個后台傳輸線程
transport.start();
// 設置客戶端消費的id
if (clientID != null) {
connection.setDefaultClientID(clientID);
}
return connection;
} ......
}
這里我們發現了 Transport transport = createTransport(); 這就是他的初始化:我們可以發現
protected Transport createTransport() throws JMSException {
try {
URI connectBrokerUL = brokerURL;
String scheme = brokerURL.getScheme();
if (scheme == null) {
throw new IOException("Transport not scheme specified: [" + brokerURL + "]");
}
if (scheme.equals("auto")) {
connectBrokerUL = new URI(brokerURL.toString().replace("auto", "tcp"));
} else if (scheme.equals("auto+ssl")) {
connectBrokerUL = new URI(brokerURL.toString().replace("auto+ssl", "ssl"));
} else if (scheme.equals("auto+nio")) {
connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio", "nio"));
} else if (scheme.equals("auto+nio+ssl")) {
connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio+ssl", "nio+ssl"));
}
return TransportFactory.connect(connectBrokerUL);
} catch (Exception e) {
throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
}
}
這里有點類似於基於URL驅動的意思,這里進來先是構建一個 URI ,根據URL去創建一個連接TransportFactory.connect,會發現默認使用的是tcp的協議。這里由於我們在創建連接的時候就已經指定了tcp所以這里的判斷都沒用,直接進入創建連接TransportFactory.connect(connectBrokerUL):
public static Transport connect(URI location) throws Exception {
TransportFactory tf = findTransportFactory(location);
return tf.doConnect(location);
}
這里做連接需要創建一個 tf 對象。這就要看看findTransportFactory(location) :
public static TransportFactory findTransportFactory(URI location) throws IOException {
String scheme = location.getScheme();
if (scheme == null) {
throw new IOException("Transport not scheme specified: [" + location + "]");
}
TransportFactory tf = TRANSPORT_FACTORYS.get(scheme);
if (tf == null) {
// Try to load if from a META-INF property.
try {
tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme);
TRANSPORT_FACTORYS.put(scheme, tf);
} catch (Throwable e) {
throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e);
}
}
return tf;
}
不難理解以上的 代碼是根據 scheme通過TRANSPORT_FACTORYS 這個map 來創建的 TransportFactory ,如果獲取不到,就會通過TRANSPORT_FACTORY_FINDER 去獲取一個實例。TRANSPORT_FACTORY_FINDER 這個FINDER是什么東西呢? 我們看看他的初始化:
private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");
我們通過源碼中指定路徑以下的東西:

這有點類似於 java 中SPI規范的意思。我們可以看看 tcp 其中的內容:
class=org.apache.activemq.transport.tcp.TcpTransportFactory
這里是鍵值對的方式,上述獲取實例的代碼中其實就是獲取一個 TcpTransportFactory 實例,那么我們就知道tf.doConnect(location) 是哪個實現類做的,就是TcpTransportFactory,但是我們點開一看並未發現 TcpTransportFactory實現,這就說明該類使用的是父類里面的方法,這里就是TransportFactory 類:
public Transport doConnect(URI location) throws Exception {
try {
Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
if( !options.containsKey("wireFormat.host") ) {
options.put("wireFormat.host", location.getHost());
}
WireFormat wf = createWireFormat(options);
//創建一個Transport 這里才是我們要找的真相
Transport transport = createTransport(location, wf);
//配置configure,這個里面是對Transport做鏈路包裝,思想類似於dubbo的cluster
Transport rc = configure(transport, wf, options);
//remove auto
IntrospectionSupport.extractProperties(options, "auto.");
if (!options.isEmpty()) {
throw new IllegalArgumentException("Invalid connect parameters: " + options);
}
return rc;
} catch (URISyntaxException e) {
throw IOExceptionSupport.create(e);
}
}
我們進入 createTransport(location, wf) 方法,這里是使用Tcp子類的實現。會發現里面創建了一個 Sokect 連接 ,這就是准備后來進行發送的Sokect。然后這里返回的 Transport 就是 TcpTransport .接下去就是對這個 transport 進行包裝 configure(transport, wf, options):
public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
//組裝一個復合的transport,這里會包裝兩層,一個是IactivityMonitor.另一個是WireFormatNegotiator
transport = compositeConfigure(transport, wf, options);
//再做一層包裝,MutexTransport
transport = new MutexTransport(transport);
//包裝ResponseCorrelator
transport = new ResponseCorrelator(transport);
return transport;
}
到目前為止,這個transport實際上就是一個調用鏈了,他的鏈結構為ResponseCorrelator(MutexTransport(WireFormatNegotiator(IactivityMonitor(TcpTransport()))每一層包裝表示什么意思呢?
ResponseCorrelator 用於實現異步請求。
MutexTransport 實現寫鎖,表示同一時間只允許發送一個請求
WireFormatNegotiator 實現了客戶端連接broker的時候先發送數據解析相關的協議信息,比如解析版本號,是否使用緩存等
InactivityMonitor 用於實現連接成功成功后的心跳檢查機制,客戶端每10s發送一次心跳信息。服務端每30s讀取一次心跳信息。
通過這層層的分析,我們回到 ActiveMQConnection 發送消息的doAsyncSendPacket 方法:
private void doAsyncSendPacket(Command command) throws JMSException {
try {
this.transport.oneway(command);
} catch (IOException e) {
throw JMSExceptionSupport.create(e);
}
}
這里的 oneway(command)方法會先后經歷上述調用鏈的處理最后調用到 TcpTransport 的oneway(command) ,我們一步一步來看看都做了些什么:
ResponseCorrelator.oneway(command):里面就設置了兩個屬性
public void oneway(Object o) throws IOException {
Command command = (Command)o; //對前面的對象做一個強轉,組裝一些信息
command.setCommandId(sequenceGenerator.getNextSequenceId());
command.setResponseRequired(false);
next.oneway(command);
}
MutexTransport.oneway(command):
public void oneway(Object command) throws IOException {
writeLock.lock();// 通過 ReentrantLock做加鎖
try {
next.oneway(command);
} finally {
writeLock.unlock();
}
}
WireFormatNegotiator.oneway(command):這個里面調用了父類的 oneway ,父類是 TransportFilter 類
public void oneway(Object command) throws IOException {
boolean wasInterrupted = Thread.interrupted();
try {
if (readyCountDownLatch.getCount() > 0 && !readyCountDownLatch.await(negotiateTimeout, TimeUnit.MILLISECONDS)) {
throw new IOException("Wire format negotiation timeout: peer did not send his wire format.");
}
} catch (InterruptedException e) {
InterruptedIOException interruptedIOException = new InterruptedIOException("Interrupted waiting for wire format negotiation");
interruptedIOException.initCause(e);
try {
onException(interruptedIOException);
} finally {
Thread.currentThread().interrupt();
wasInterrupted = false;
}
throw interruptedIOException;
} finally {
if (wasInterrupted) {
Thread.currentThread().interrupt();
}
}
super.oneway(command); //里面沒做什么事情進入下一個調用鏈
}
從WireFormatNegotiator的父類TransportFilter進入下一個調用鏈應該調用的是InactivityMonitor.oneway(command),可是並未發現又該類實現,所以這里進入InactivityMonitor 的父類AbstractInactivityMonitor:
public void oneway(Object o) throws IOException {
// To prevent the inactivity monitor from sending a message while we
// are performing a send we take a read lock. The inactivity monitor
// sends its Heart-beat commands under a write lock. This means that
// the MutexTransport is still responsible for synchronizing sends
sendLock.readLock().lock();//獲取發送讀鎖 鎖定
inSend.set(true);//設置屬性
try {
doOnewaySend(o);//通過這個邏輯進入下一個調用鏈
} finally {
commandSent.set(true);
inSend.set(false);
sendLock.readLock().unlock();
}
}
在doOnewaySend 里面的next.oneway(command) 方法最終調用 TcpTransport 的實現:
public void oneway(Object command) throws IOException {
checkStarted();
//進行格式化內容 通過Sokct 發送
wireFormat.marshal(command, dataOut);
// 流的刷新
dataOut.flush();
}
最后通過Sokect進行數據的傳輸。這樣子異步發送的流程就結束了。下面來走一下同步的流程:通過this.connection.syncSendPacket() 進入同步發送流程。
public Response syncSendPacket(Command command, int timeout) throws JMSException {
if (isClosed()) {
throw new ConnectionClosedException();
} else {
try {// 進行發送,阻塞獲取結果
Response response = (Response)(timeout > 0
? this.transport.request(command, timeout)
: this.transport.request(command));
if (response.isException()) {
ExceptionResponse er = (ExceptionResponse)response;
if (er.getException() instanceof JMSException) {
throw (JMSException)er.getException();
}
。。。。。。。。。
return response;
} catch (IOException e) {
throw JMSExceptionSupport.create(e);
}
}
}
這里的 transport 跟異步發送過程中的transport時一樣的,即 ResponseCorrelator(MutexTransport(WireFormatNegotiator(IactivityMonitor(TcpTransport())) 一個調用鏈,進入ResponseCorrelator 的實現:
public Object request(Object command, int timeout) throws IOException {
FutureResponse response = asyncRequest(command, null);
return response.getResult(timeout);
}
從這個方法我們可以得到的信息時,在發送的時候采用的是 asyncRequest 方法,意思是異步請求,但是在下行采用 response.getResult(timeout) 去同步阻塞的方式去獲取結果:
public Response getResult(int timeout) throws IOException {
final boolean wasInterrupted = Thread.interrupted();
try {
Response result = responseSlot.poll(timeout, TimeUnit.MILLISECONDS);
.........
}
這里會從 ArrayBlockingQueue 去 阻塞的處理請求。其實這里的同步發送實質上采用的不阻塞發送,阻塞的去等待broker 的反饋結果。
最后整理一下這個發送流程圖

延遲和定時消息投遞(Delay and Schedule Message Delivery):
有時候我們不希望消息馬上被broker投遞出去,而是想要消息60秒以后發給消費者,或者我們想讓消息沒隔一定時間投遞一次,一共投遞指定的次數。。。類似這種需求,ActiveMQ提供了一種broker端消息定時調度機制。我們只需要把幾個描述消息定時調度方式的參數作為屬性添加到消息,broker端的調度器就會按照我們想要的行為去處理消息。當然需要在xml中配置schedulerSupport屬性為true(broker的屬性)即:<broker schedulerSupport="true">
使用延遲消息必須遵守如下配置屬性:屬性名稱 類型 描述
- AMQ_SCHEDULED_DELAY (long) 消息延遲時間單位:毫秒
- AMQ_SCHEDULED_PERIOD( long) 消息發送周期單位時間:毫秒。如 5秒一次 配置 AMQ_SCHEDULED_PERIOD = 5*1000
- AMQ_SCHEDULED_REPEAT (int) 消息重復發送次數
- AMQ_SCHEDULED_CRON (string) 使用Cron 表達式 設置定時發送
延遲60秒發送消息
MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage("test msg"); long time = 60 * 1000; message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time); producer.send(message);
開始延遲30秒發送,重復發送10次,每次之間間隔10秒
MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage("test msg"); long delay = 30 * 1000; long period = 10 * 1000; int repeat = 9; message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period); message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat); producer.send(message);
使用Cron 表示式定時發送消息
MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage("test msg"); message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *"); producer.send(message);
Cron 的優先級大於消息延遲,只要設置了Cron 表達式會優先執行Cron規則,如下:消息定時發送10次,每個小時執行,延遲1秒之后發送。
MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage("test msg"); message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *"); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 1000); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1000); message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 9); producer.send(message);
