broker在接收到producer發送來的Message后(其實接收client發來的命令並不屬於broker的職責,broker真正要做的是將處理這些命令,比如將消息路由置對應的destination,而接收client命令的任務是由TransportServer完成的),就需要持久化、抓發消息了。
1 //org.apache.activemq.broker.region.Queue的send方法 2 public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception { 3 final ConnectionContext context = producerExchange.getConnectionContext(); 4 // There is delay between the client sending it and it arriving at the 5 // destination.. it may have expired. 6 message.setRegionDestination(this); 7 ProducerState state = producerExchange.getProducerState(); 8 if (state == null) { 9 LOG.warn("Send failed for: {}, missing producer state for: {}", message, producerExchange); 10 throw new JMSException("Cannot send message to " + getActiveMQDestination() + " with invalid (null) producer state"); 11 } 12 final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo(); 13 //ProducerAck有一個重要字段就是size,表示message的size, 14 //意在告訴producer,broker已經收下了size大小的message(還有一個producerId,因為一個connection可能有多個producer), 15 //這時producer的window的剩余空間就會變大,producer就可以發送更多的message。 16 //ProducerAck的作用就在於釋放producer的window空間。 17 final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 18 && !context.isInRecoveryMode(); 19 //檢查message是否過期,可以通過Message.setExpiration(或setJMSExpiration)設置絕對時間, 20 //也可以通過producer.setTimeToLive設置相對時間,setTimeToLive會在send前被轉換成Expiration(now + timeToLive) 21 if (message.isExpired()) { 22 // message not stored - or added to stats yet - so chuck here 23 broker.getRoot().messageExpired(context, message, null); 24 if (sendProducerAck) { 25 //如果message過期,直接發送ProducerAck至producer 26 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 27 context.getConnection().dispatchAsync(ack); 28 } 29 return; 30 } 31 //broker內存使用量達到設置上限 32 if (memoryUsage.isFull()) { 33 //...... 34 } 35 //發送message並不是發送message至consumer,只是broker接收該消息 36 doMessageSend(producerExchange, message); 37 //回復ProducerAck 38 if (sendProducerAck) { 39 //一個連接可能有多個producer,所以要producerId, 40 //傳回messageSize是為了釋放該message所占的window空間(window只是一個數字) 41 ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize()); 42 context.getConnection().dispatchAsync(ack); 43 } 44 }
doMessageSend主要的任務是持久化消息、添加消息置cursor。
1 void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException, 2 Exception { 3 final ConnectionContext context = producerExchange.getConnectionContext(); 4 ListenableFuture<Object> result = null; 5 6 producerExchange.incrementSend(); 7 do { 8 //檢查broker存儲空間使用量 9 checkUsage(context, producerExchange, message); 10 message.getMessageId().setBrokerSequenceId(getDestinationSequenceId()); 11 12 //message持久化 13 if (store != null && message.isPersistent()) { 14 message.getMessageId().setFutureOrSequenceLong(null); 15 try { 16 if (messages.isCacheEnabled() && !isPersistJMSRedelivered()) { 17 result = store.asyncAddQueueMessage(context, message, isOptimizeStorage());//異步 18 result.addListener(new PendingMarshalUsageTracker(message)); 19 } else { 20 store.addMessage(context, message);//同步 21 } 22 } catch (Exception e) { 23 // we may have a store in inconsistent state, so reset the cursor 24 // before restarting normal broker operations 25 resetNeeded = true; 26 throw e; 27 } 28 } 29 30 //Clear the unmarshalled state if the message is marshalled 31 //Persistent messages will always be marshalled but non-persistent may not be 32 //Specially non-persistent messages over the VM transport won't be 33 if (isReduceMemoryFootprint() && message.isMarshalled()) { 34 message.clearUnMarshalledState(); 35 } 36 37 //添加至cursor,cursor可以有message的緩存。 38 //cursor是store的游標,可以讀取store中下一個message。 39 //存入cursor后會有一個wakeup操作,wakeup會引發Queue的iterate方法的執行, 40 //iterate會page in messages,同時會將這些messages發送(輪詢發送)置consumer。 41 //page in的數量是consumers的prefetchSize、maxPageSize、總共消息數的最小值,如果此時沒有consumer則不會page in。 42 if(tryOrderedCursorAdd(message, context)) { 43 break; 44 } 45 } while (started.get()); 46 47 if (result != null && message.isResponseRequired() && !result.isCancelled()) { 48 try { 49 result.get(); 50 } catch (CancellationException e) { 51 // ignore - the task has been cancelled if the message 52 // has already been deleted 53 } 54 } 55 }
producer流量控制
window是activemq發送異步消息時進行流量控制的一種手段,org.apache.activemq.ActiveMQMessageProducer如果設置了window,發送消息會減小window,broker確認消息(ProducerAck)會增大window(不超過設定值)。send前會先檢查window大小(window只是一個數值,並沒有存儲能力),如果剩余空間足夠容納即將發送的消息,則可以發送該消息,如果空間不足,則會阻塞send方法。
如果Producer不設置window,並且是異步發送,producer就會不停的發送(無視broker的響應,broker也不會對無window的producer發出ProducerAck),當broker內存達到閾值時,broker就會阻塞broker端與該connection對應的線程,直至有空間來存放新message。由於線程阻塞了,也就不能繼續讀取tcp數據了,tcp緩存滿后對端也就發送不了數據了,這是依靠tcp本身的流量控制實現的。
如果Producer設置了window,只要window空間足夠,producer就可以發送message,如果此時broker的使用內存已達閾值,broker並不會阻塞線程,而是將message存儲等后續操作放入隊列等待執行,當producer端的window達到閾值時,producer的send就會阻塞,這樣就達到了流量控制的目的。這里無法模擬tcp那樣的流量控制(Min(接受窗口, 發送窗口)),因為broker中的destination可以有多個producer同時發送,無法很好的確定發送窗口大小。
消息持久化
Cursor是持久化系統的游標,其內部會持有一個鏈表(batchList)作為消息緩存,在向consumer發送消息時,會先通過page in將持久化消息恢復到cursor的鏈表緩存中,然后通過cursor.next挨個發送消息。由於從磁盤恢復消息時,batchList可能已經緩存了數據,所以會在持久化系統中記錄一個相對位置,該位置就指向了batchList的下一個數據。
producer發來的數據先會持久化(同步或異步),然后才會放入cursor中,放入cursor后就可以向consumer轉發消息了,消息轉發后會在cursor清除(沒有ack就不會清除持久化數據,redelivery只是client自己重發給自己,然后發送一個redelivered命令給broker,如果redelivery數達到閾值broker會清除該message對應的message並放入DLQ),如果cursor緩存(batchList)滿了,就需要在持久化系統中做偏移量標記。
同步持久化的處理相對簡單,持久化、cursor、轉發(同步或異步)、回復producer一條線。異步持久化時,轉發消息、回復producer可能發生在真正持久化前,這種異步操作極大提高了cpu利用率。但是異步持久化也是有風險的,試想如果回復producer在持久化之前完成,此時broker掛掉,導致broker持久化失敗,而producer已經得到確認回復,認為消息轉發成功,而由於broker並沒有持久化消息,重啟broker后,消息也不會恢復,consumer就永遠也得不到該消息了。
StoreQueueCursor
參考:https://access.redhat.com/documentation/en-US/Fuse_Message_Broker/5.4/html/Using_Persistent_Messages/files/FuseMBPersistCursorsTypes.html#FuseMBPersistCursorsStore