消費端消費消息的原理
我們通過上一節課的講解,知道有兩種方法可以接收消息,一種是使用同步阻塞的MessageConsumer#receive方
法。另一種是使用消息監聽器MessageListener。這里需要注意的是,在同一個session下,這兩者不能同時工
作,也就是說不能針對不同消息采用不同的接收方式。否則會拋出異常。
至於為什么這么做,最大的原因還是在事務性會話中,兩種消費模式的事務不好管控
消費端消費消息源碼分析
ActiveMQMessageConsumer.receive消費端同步接收消息的源碼入口
public Message receive() throws JMSException { checkClosed(); checkMessageListener(); //檢查receive和MessageListener是否同時配置在當前的會話中 sendPullCommand(0); //如果PrefetchSizeSize為0並且unconsumerMessage為空,則發起pull命令 MessageDispatch md = dequeue(-1); //從unconsumerMessage出隊列獲取消息 if (md == null) { return null; } beforeMessageIsConsumed(md); afterMessageIsConsumed(md, false); //發送ack給到broker return createActiveMQMessage(md);//獲取消息並返回 }
unconsumedMessages 數據的獲取過程
那我們來看看 ActiveMQConnectionFactory. createConnection 里面做了什么事情。
1. 動態創建一個傳輸協議
2. 創建一個連接
3. 通過 transport.start()
protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException { if (brokerURL == null) { throw new ConfigurationException("brokerURL not set."); } ActiveMQConnection connection = null; try { Transport transport = createTransport(); connection = createActiveMQConnection(transport, factoryStats); connection.setUserName(userName); connection.setPassword(password); configureConnection(connection); transport.start(); if (clientID != null) { connection.setDefaultClientID(clientID); } return connection; }
transport.start()
我們前面在分析消息發送的時候,已經知道 transport 是一個鏈式的調用,是一個多層包裝的對象。
ResponseCorrelator(MutexTransport(WireFormatNegotiator(InactivityMonitor(TcpTransport())))最終調用 TcpTransport.start()方法,然而這個類中並沒有 start,而是在父類ServiceSupport.start()中。
public void start() throws Exception { if (started.compareAndSet(false, true)) { boolean success = false; stopped.set(false); try { preStart(); doStart(); success = true; } finally { started.set(success); } for (ServiceListener l : this.serviceListeners) { l.started(this); } } }
這塊代碼看起來就比較熟悉了,我們之前看過的中間件的源碼,通信層都是獨立來實現及解耦的。而 ActiveMQ 也是一樣,提供了 Transport 接口和TransportSupport 類。這個接口的主要作用是為了讓客戶端有消息被異步發
送、同步發送和被消費的能力。接下來沿着 doStart()往下看,又調用TcpTransport.doStart() ,接着通過 super.doStart(),調用TransportThreadSupport.doStart(). 創建了一個線程,傳入的是 this,調用
子類的 run 方法,也就是 TcpTransport.run()
TcpTransport.run
run 方法主要是從 socket 中讀取數據包,只要 TcpTransport 沒有停止,它就會不斷去調用 doRun
public void run() { LOG.trace("TCP consumer thread for " + this + " starting"); this.runnerThread = Thread.currentThread(); try { while (!isStopped()) { doRun(); } } catch (IOException e) { stoppedLatch.get().countDown(); onException(e); } catch (Throwable e) { stoppedLatch.get().countDown(); IOException ioe = new IOException("Unexpected error occurred: " + e); ioe.initCause(e); onException(ioe); } finally { stoppedLatch.get().countDown(); } }
TcpTransport.doRun
doRun 中,通過 readCommand 去讀取數據
protected void doRun() throws IOException { try { Object command = readCommand(); doConsume(command); } catch (SocketTimeoutException e) { } catch (InterruptedIOException e) { } }
TcpTransport.readCommand
這里面,通過 wireFormat 對數據進行格式化,可以認為這是一個反序列化過程。wireFormat 默認實現是 OpenWireFormat,activeMQ 自定義的跨語言的wire 協議
protected Object readCommand() throws IOException {
return wireFormat.unmarshal(dataIn);
}
public void doConsume(Object command) { if (command != null) { if (transportListener != null) { transportListener.onCommand(command); } else { LOG.error("No transportListener available to process inbound command: " + command); } } }
TransportSupport 類中唯一的成員變量是 TransportListener transportListener;,這也意味着一個 Transport 支持類綁定一個傳送監聽器類,傳送監聽器接口 TransportListener 最重要的方法就是 void
onCommand(Object command);,它用來處理命令,這個 transportListener 是在哪里賦值的呢?再回到 ActiveMQConnection 的構造方法中。->246 行傳遞了 ActiveMQConnection 自己本身,(ActiveMQConnection 是
TransportListener 接口的實現類之一)於是,消息就這樣從傳送層到達了我們的連接層上。
分析到這,我們差不多明白了傳輸層的主要工作是獲得數據並且把數據轉換為對象,再把對象對象傳給 ActiveMQConnection
TransportSupport.doConsume
TransportSupport 類中最重要的方法是 doConsume,它的作用就是用來“消費消息”
protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception { this.transport = transport; 綁定傳輸對象 this.clientIdGenerator = clientIdGenerator; this.factoryStats = factoryStats; // Configure a single threaded executor who's core thread can timeout if // idle executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport); //Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796 //thread.setDaemon(true); return thread; } }); // asyncConnectionThread.allowCoreThreadTimeOut(true); String uniqueId = connectionIdGenerator.generateId(); this.info = new ConnectionInfo(new ConnectionId(uniqueId)); this.info.setManageable(true); this.info.setFaultTolerant(transport.isFaultTolerant()); this.connectionSessionId = new SessionId(info.getConnectionId(),-1); this.transport.setTransportListener(this); //當 transport 綁定為自己 this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection); this.factoryStats.addConnection(this); this.timeCreated = System.currentTimeMillis(); this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant()); }
從構造函數可以看出,創建 ActiveMQConnection 對象時,除了和 Transport相互綁定,還對線程池執行器 executor 進行了初始化。下面我們看看該類的核心方法
onCommand
Ø 這里面會針對不同的消息做分發,比如傳入的 command 是MessageDispatch,那么這個 command 的 visit 方法就會調用processMessageDispatch 方法
public void onCommand(final Object o) { final Command command = (Command)o; if (!closed.get() && command != null) { try { command.visit(new CommandVisitorAdapter() { @Override public Response processMessageDispatch(MessageDispatchmd) throws Exception { // 等待 Transport 中斷處理完成 waitForTransportInterruptionProcessingToComplete() ; // 這里通過消費者 ID 來獲取消費者對象 //(ActiveMQMessageConsumer 實現了 ActiveMQDispatcher 接口),所以MessageDispatch 包含了消息應該被分配到那個消費者的映射信息 //在創建 MessageConsumer 的時候,調用 ActiveMQMessageConsumer 的第 282行,調用 ActiveMQSession 的 1798 行將當前的消費者綁定到 dispatchers 所以這里拿到的是 ActiveMQSession ActiveMQDispatcher dispatcher =dispatchers.get(md.getConsumerId()); if (dispatcher != null) { // Copy in case a embedded broker is dispatching via // vm:// // md.getMessage() == null to signal end of queue // browse. Message msg = md.getMessage(); if (msg != null) { msg = msg.copy(); msg.setReadOnlyBody(true); msg.setReadOnlyProperties(true); msg.setRedeliveryCounter(md.getRedeliveryC ounter()); msg.setConnection(ActiveMQConnection.this) ; msg.setMemoryUsage(null); md.setMessage(msg); } dispatcher.dispatch(md); // 調用會話ActiveMQSession 自己的 dispatch 方法來處理這條消息 } else { LOG.debug("{} no dispatcher for {} in {}", this, md, dispatchers); } return null; } //如果傳入的是 ProducerAck,則調用的是下面這個方法,這里我們僅僅關注 MessageDispatch 就行了 @Override public Response processProducerAck(ProducerAck pa) throws Exception { if (pa != null && pa.getProducerId() != null) { ActiveMQMessageProducer producer = producers.get(pa.getProducerId()); if (producer != null) { producer.onProducerAck(pa); } } return null; } //…后續的方法就不分析了
在現在這個場景中,我們只關注 processMessageDispatch 方法,在這個方法中,只是簡單的去調用 ActiveMQSession 的 dispatch 方法來處理消息,
Ø tips: command.visit, 這里使用了適配器模式,如果 command 是一個MessageDispatch,那么它就會調用 processMessageDispatch 方法,其他方法他不會關心,代碼如下:MessageDispatch.visit
@Override public Response visit(CommandVisitor visitor) throws Exception { return visitor.processMessageDispatch(this); }
ActiveMQSession.dispatch(md)
executor 這個對象其實是一個成員對象 ActiveMQSessionExecutor,專門負責來處理消息分發
@Override public void dispatch(MessageDispatch messageDispatch) { try { executor.execute(messageDispatch); } catch (InterruptedException e) { Thread.currentThread().interrupt(); connection.onClientInternalException(e); } }
ActiveMQSessionExecutor.execute
Ø 這個方法的核心功能就是處理消息的分發。
void execute(MessageDispatch message) throws InterruptedException { if (!startedOrWarnedThatNotStarted) { ActiveMQConnection connection = session.connection; long aboutUnstartedConnectionTimeout = connection.getWarnAboutUnstartedConnectionTimeout(); if (connection.isStarted() || aboutUnstartedConnectionTimeout< 0L) { startedOrWarnedThatNotStarted = true; } else { long elapsedTime = System.currentTimeMillis() - connection.getTimeCreated(); // lets only warn when a significant amount of time has passed // just in case its normal operation if (elapsedTime > aboutUnstartedConnectionTimeout) { LOG.warn("Received a message on a connection which is not yet started. Have you forgotten to call Connection.start()? Connection: " + connection+ " Received: " + message); startedOrWarnedThatNotStarted = true; } } } //如果會話不是異步分發並且沒有使用 sessionpool 分發,則調用 dispatch 發送消息(這里不展開) if (!session.isSessionAsyncDispatch() && !dispatchedBySessionPool) { dispatch(message); } else { //將消息直接放到隊列里 messageQueue.enqueue(message); wakeup(); } }
默認是采用異步消息分發。所以,直接調用 messageQueue.enqueue,把消息放到隊列中,並且調用 wakeup 方法
異步分發的流程
public void wakeup() { if (!dispatchedBySessionPool) { //進一步驗證 if (session.isSessionAsyncDispatch()) { //判斷 session 是否為異 步分發 try { TaskRunner taskRunner = this.taskRunner; if (taskRunner == null) { synchronized (this) { if (this.taskRunner == null) { if (!isRunning()) { // stop has been called return; } //通過 TaskRunnerFactory 創建了一個任務運行類 taskRunner,這里把自己作為一 個 task 傳入到 createTaskRunner 中,說明當前 //的類一定是實現了 Task 接口的. 簡單來說,就是通過線程池去執行一個任務,完 成異步調度,簡單吧 this.taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this, "ActiveMQ Session: " + session.getSessionId()); } taskRunner = this.taskRunner; } } taskRunner.wakeup(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } else { while (iterate()) { //同步分發 } } } }
所以,對於異步分發的方式,會調用 ActiveMQSessionExecutor 中的 iterate方法,我們來看看這個方法的代碼
iterate
這個方法里面做兩個事Ø 把消費者監聽的所有消息轉存到待消費隊列中
Ø 如果 messageQueue 還存在遺留消息,同樣把消息分發出
public boolean iterate() { // Deliver any messages queued on the consumer to their listeners. for (ActiveMQMessageConsumer consumer : this.session.consumers) { if (consumer.iterate()) { return true; } } // No messages left queued on the listeners.. so now dispatch messages // queued on the session MessageDispatch message = messageQueue.dequeueNoWait(); if (message == null) { return false; } else { dispatch(message); return !messageQueue.isEmpty(); } }
ActiveMQMessageConsumer.iterate
public boolean iterate() { MessageListener listener = this.messageListener.get(); if (listener != null) { MessageDispatch md = unconsumedMessages.dequeueNoWait(); if (md != null) { dispatch(md); return true; } } return false; }
同步分發的流程
同步分發的流程,直接調用 ActiveMQSessionExcutor 中的 dispatch 方法,代碼如下
public class JMSQueueProducer { void dispatch(MessageDispatch message) { // TODO - we should use a Map for this indexed by consumerId for (ActiveMQMessageConsumer consumer : this.session.consumers) { ConsumerId consumerId = message.getConsumerId(); if (consumerId.equals(consumer.getConsumerId())) { consumer.dispatch(message); break; } } }
ActiveMQMessageConsumer.dispatch
調用 ActiveMQMessageConsumer.dispatch 方法,把消息轉存到unconsumedMessages 消息隊列中。
public void dispatch(MessageDispatch md) { MessageListener listener = this.messageListener.get(); try { clearMessagesInProgress(); clearDeliveredList(); synchronized (unconsumedMessages.getMutex()) { if (!unconsumedMessages.isClosed()) { if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) { if (listener != null && unconsumedMessages.isRunning()) { if (redeliveryExceeded(md)) { posionAck(md, "listener dispatch[" +md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy); return; } ActiveMQMessage message = createActiveMQMessage(md); beforeMessageIsConsumed(md); try { boolean expired = isConsumerExpiryCheckEnabled() && message.isExpired(); if (!expired) { listener.onMessage(message); } afterMessageIsConsumed(md, expired); catch (RuntimeException e) { LOG.error("{} Exception while processing message: {}", getConsumerId(), md.getMessage().getMessageId(), e); md.setRollbackCause(e); if (isAutoAcknowledgeBatch() || isAutoAcknowledgeEach() || session.isIndividualAcknowledge()) { // schedual redelivery and possible dlq processing rollback(); } else { // Transacted or Client ack: Deliver the next message. afterMessageIsConsumed(md, false); } } } else { if (!unconsumedMessages.isRunning()) { // delayed redelivery, ensure it can be re delivered session.connection.rollbackDuplicate(this,md.getMessage()); } if (md.getMessage() == null) { // End of browse or pull request timeout. unconsumedMessages.enqueue(md); } else { if (!consumeExpiredMessage(md)) {unconsumedMessages.enqueue(md); if (availableListener != null) { availableListener.onMessageAvailable(this); } } else { beforeMessageIsConsumed(md); afterMessageIsConsumed(md, true); // Pull consumer needs to check if pull timed out and send // a new pull command if not. if (info.getCurrentPrefetchSize() == 0) { unconsumedMessages.enqueue(null); } } } } } else { // deal with duplicate delivery ConsumerId consumerWithPendingTransaction; if (redeliveryExpectedInCurrentTransaction(md,true)) { LOG.debug("{} tracking transacted redelivery {}", getConsumerId(), md.getMessage()); if (transactedIndividualAck) { immediateIndividualTransactedAck(md); } else { session.sendAck(new MessageAck(md, MessageAck.DELIVERED_ACK_TYPE, 1)); } } else if ((consumerWithPendingTransaction = redeliveryPendingInCompetingTransaction(md)) != null) { LOG.warn("{} delivering duplicate {}, pending transaction completion on {} will rollback", getConsumerId(),md.getMessage(), consumerWithPendingTransaction);session.getConnection().rollbackDuplicate(this, md.getMessage()); dispatch(md); } else { LOG.warn("{} suppressing duplicate delivery on connection, poison acking: {}", getConsumerId(), md);posionAck(md, "Suppressing duplicate delivery on connection, consumer " + getConsumerId()); } } } } if (++dispatchedCount % 1000 == 0) { dispatchedCount = 0; Thread.yield(); } } catch (Exception e) { session.connection.onClientInternalException(e); } }
Ø 到這里為止,消息如何接受以及他的處理方式的流程,我們已經搞清楚了,
希望對大家理解 activeMQ 的核心機制有一定的幫助
消費端的 PrefetchSize
還記得我們在分析消費端的源碼的時候,所講到的 prefetchsize 嗎?這個prefetchsize 是做什么的?我們接下來去研究一下
原理剖析
activemq 的 consumer 端也有窗口機制,通過 prefetchSize 就可以設置窗口大小。不同的類型的隊列,prefetchSize 的默認值也是不一樣的
持久化隊列和非持久化隊列的默認值為 1000
Ø 持久化 topic 默認值為 100
Ø 非持久化隊列的默認值為 Short.MAX_VALUE-1
通過上面的例子,我們基本上應該知道 prefetchSize 的作用了,消費端會根據prefetchSize 的大小批量獲取數據,比如默認值是 1000,那么消費端會預先加載 1000 條數據到本地的內存中
prefetchSize 的設置方法
在 createQueue 中添加 consumer.prefetchSize,就可以看到效果
Destination destination=session.createQueue("myQueue?consumer.prefetchSize=10");
既然有批量加載,那么一定有批量確認,這樣才算是徹底的優化
optimizeAcknowledge
ActiveMQ 提供了 optimizeAcknowledge 來優化確認,它表示是否開啟“優化ACK”,只有在為 true 的情況下,prefetchSize 以及optimizeAcknowledgeTimeout 參數才會有意義優化確認一方面可以減輕 client 負擔(不需要頻繁的確認消息)、減少通信開
銷,另一方面由於延遲了確認(默認 ack 了 0.65*prefetchSize 個消息才確認),broker 再次發送消息時又可以批量發送如果只是開啟了 prefetchSize,每條消息都去確認的話,broker 在收到確認后也只是發送一條消息,並不是批量發布,當然也可以通過設置 DUPS_OK_ACK來手動延遲確認, 我們需要在 brokerUrl 指定 optimizeACK 選項
ConnectionFactory connectionFactory= new ActiveMQConnectionFactory
("tcp://192.168.11.153:61616?jms.optimizeAcknowledge=true&jms.optimizeAcknowledgeTimeOut=10000");
Ø 注意,如果 optimizeAcknowledge 為 true,那么 prefetchSize 必須大於 0.
當 prefetchSize=0 的時候,表示 consumer 通過 PULL 方式從 broker 獲取消息
總結
到目前為止,我們知道了 optimizeAcknowledge 和 prefetchSize 的作用,兩者協同工作,通過批量獲取消息、並延遲批量確認,來達到一個高效的消息消費模型。它比僅減少了客戶端在獲取消息時的阻塞次數,還能減少每次獲取消息時的網絡通信開銷
Ø 需要注意的是,如果消費端的消費速度比較高,通過這兩者組合是能大大提升 consumer 的性能。如果 consumer 的消費性能本身就比較慢,設置比較大的 prefetchSize 反而不能有效的達到提升消費性能的目的。因為過大的
prefetchSize 不利於 consumer 端消息的負載均衡。因為通常情況下,我們都會部署多個 consumer 節點來提升消費端的消費性能。這個優化方案還會存在另外一個潛在風險,當消息被消費之后還沒有來得及確
認時,client 端發生故障,那么這些消息就有可能會被重新發送給其他consumer,那么這種風險就需要 client 端能夠容忍“重復”消息。
消息的確認過程
ACK_MODE
通過前面的源碼分析,基本上已經知道了消息的消費過程,以及消息的批量獲取和批量確認,那么接下來再了解下消息的確認過程從第一節課的學習過程中,我們知道,消息確認有四種 ACK_MODE,分別是
AUTO_ACKNOWLEDGE = 1 自動確認
CLIENT_ACKNOWLEDGE = 2 客戶端手動確認
DUPS_OK_ACKNOWLEDGE = 3 自動批量確認
SESSION_TRANSACTED = 0 事務提交並確認
雖然 Client 端指定了 ACK 模式,但是在 Client 與 broker 在交換 ACK 指令的時候,還需要告知 ACK_TYPE,ACK_TYPE 表示此確認指令的類型,不同的ACK_TYPE 將傳遞着消息的狀態,broker 可以根據不同的 ACK_TYPE 對消息進行不同的操作。
ACK_TYPE
DELIVERED_ACK_TYPE = 0 消息"已接收",但尚未處理結束STANDARD_ACK_TYPE = 2 "標准"類型,通常表示為消息"處理成功",broker 端可以刪除消息了
POSION_ACK_TYPE = 1 消息"錯誤",通常表示"拋棄"此消息,比如消息重發多次后,都無法正確處理時,消息將會被刪除或者 DLQ(死信隊列)REDELIVERED_ACK_TYPE = 3 消息需"重發",比如 consumer 處理消息時拋出
了異常,broker 稍后會重新發送此消息INDIVIDUAL_ACK_TYPE = 4 表示只確認"單條消息",無論在任何 ACK_MODE 下UNMATCHED_ACK_TYPE = 5 在 Topic 中,如果一條消息在轉發給“訂閱者”時,發現此消息不符合 Selector 過濾條件,那么此消息將 不會轉發給訂閱
者,消息將會被存儲引擎刪除(相當於在 Broker 上確認了消息)。Client 端在不同的 ACK 模式時,將意味着在不同的時機發送 ACK 指令,每個 ACK Command 中會包含 ACK_TYPE,那么 broker 端就可以根據 ACK_TYPE 來決定此消息的后續操作
消息的重發機制原理
消息重發的情況
在正常情況下,有幾中情況會導致消息重新發送
Ø 在事務性會話中,沒有調用 session.commit 確認消息或者調用session.rollback 方法回滾消息
Ø 在非事務性會話中,ACK 模式為 CLIENT_ACKNOWLEDGE 的情況下,沒有調用 acknowledge 或者調用了 recover 方法;一個消息被 redelivedred 超過默認的最大重發次數(默認 6 次)時,消費端會給 broker 發送一個”poison ack”(ActiveMQMessageConsumer#dispatch:1460 行),表示這個消息有毒,告訴 broker 不要再發了。這個時候 broker 會把這個消息放到 DLQ(死信隊列)。
死信隊列
ActiveMQ 中默認的死信隊列是 ActiveMQ.DLQ,如果沒有特別的配置,有毒的消息都會被發送到這個隊列。默認情況下,如果持久消息過期以后,也會被送到 DLQ 中。
死信隊列配置策略
缺省所有隊列的死信消息都被發送到同一個缺省死信隊列,不便於管理,可以通過 individualDeadLetterStrategy 或 sharedDeadLetterStrategy 策略來進行修改
<destinationPolicy> <policyMap> <policyEntries> <policyEntry topic=">"> <pendingMessageLimitStrategy> <constantPendingMessageLimitStrategy limit="1000" /> </pendingMessageLimitStrategy> </policyEntry> // “>”表示對所有隊列生效,如果需要設置指定隊列,則直接寫隊 列名稱 <policyEntry queue=">"> <deadLetterStrategy> //queuePrefix:設置死信隊列前綴 //useQueueForQueueMessage 設置隊列保存到死信。 <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" /> </deadLetterStrategy> </policyEntry> </policyEntries> </policyMap> </destinationPolicy>
自動丟棄過期消息
<deadLetterStrategy> <sharedDeadLetterStrategy processExpired="false" /> </deadLetterStrategy>
死信隊列的再次消費
當定位到消息不能消費的原因后,就可以在解決掉這個問題之后,再次消費死信隊列中的消息。因為死信隊列仍然是一個隊列
ActiveMQ 靜態網絡配置
配置說明
修改 activeMQ 服務器的 activeMQ.xml, 增加如下配置
<networkConnectors> <networkConnector uri="static://(tcp://192.168.11.153:61616,tcp://192.168.11.154:61616)" /> </networkConnectors>
兩個 Brokers 通過一個 static 的協議來進行網絡連接。一個 Consumer 連接到BrokerB 的一個地址上,當 Producer 在 BrokerA 上以相同的地址發送消息是,此時消息會被轉移到 BrokerB 上,也就是說 BrokerA 會轉發消息到BrokerB 上
消息回流
從 5.6 版本開始,在 destinationPolicy 上新增了一個選項replayWhenNoConsumers 屬性,這個屬性可以用來解決當 broker1 上有需要轉發的消息但是沒有消費者時,把消息回流到它原始的 broker。同時把
enableAudit 設置為 false,為了防止消息回流后被當作重復消息而不被分發通過如下配置,在 activeMQ.xml 中。 分別在兩台服務器都配置。即可完成消息回流處理
<policyEntry queue=">" enableAudit="false"> <networkBridgeFilterFactory> <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true" /> </networkBridgeFilterFactory> </policyEntry>
動態網絡連接
ActiveMQ 使用 Multicast 協議將一個 Service 和其他的 Broker 的 Service 連接起來。Multicast 能夠自動的發現其他 broker,從而替代了使用 static 功能列表 brokers。用 multicast 協議可以在網絡中頻繁
multicast://ipadaddress:port?transportOptions
基於 zookeeper+levelDB 的 HA 集群搭建
activeMQ5.9 以后推出的基於 zookeeper 的 master/slave 主從實現。雖然ActiveMQ 不建議使用 LevelDB 作為存儲,主要原因是,社區的主要精力都幾種在 kahadb 的維護上,包括 bug 修復等。所以並沒有對 LevelDB 做太多的關
注,所以他在是不做為推薦商用。但實際上在很多公司,仍然采用了LevelDB+zookeeper 的高可用集群方案。而實際推薦的方案,仍然是基於KahaDB 的文件共享以及 Jdbc 的方式來實現。
配置
在三台機器上安裝 activemq,通過三個實例組成集群。
修改配置
directory:表示 LevelDB 所在的主工作目錄
replicas:表示總的節點數。比如我們的及群眾有 3 個節點,且最多允許一個節點出現故障,那么這個值可以設置為 2,也可以設置為 3. 因為計算公式為(replicas/2)+1. 如果我們設置為 4, 就表示不允許 3 個節點的任何一個節點出錯。
bind:當當前的節點為 master 時,它會根據綁定好的地址和端口來進行主從復制協議
zkAddress:zk 的地址
hostname:本機 IP
sync:在認為消息被消費完成前,同步信息所存儲的策略。
local_mem/local_disk
ActiveMQ 的優缺點
ActiveMQ 采用消息推送方式,所以最適合的場景是默認消息都可在短時間內被消費。數據量越大,查找和消費消息就越慢,消息積壓程度與消息速度成反比。
缺點
1.吞吐量低。由於 ActiveMQ 需要建立索引,導致吞吐量下降。這是無法克服的缺點,只要使用完全符合 JMS 規范的消息中間件,就要接受這個級別的TPS。
2.無分片功能。這是一個功能缺失,JMS 並沒有規定消息中間件的集群、分片機制。而由於 ActiveMQ 是偉企業級開發設計的消息中間件,初衷並不是為了處理海量消息和高並發請求。如果一台服務器不能承受更多消息,則需要橫向
拆分。ActiveMQ 官方不提供分片機制,需要自己實現。
適用場景
1. 對 TPS 要求比較低的系統,可以使用 ActiveMQ 來實現,一方面比較簡單,能夠快速上手開發,另一方面可控性也比較好,還有比較好的監控機制和界面
不適用的場景
1.消息量巨大的場景。ActiveMQ 不支持消息自動分片機制,如果消息量巨大,導致一台服務器不能處理全部消息,就需要自己開發消息分片功能。