ActiveMQ消息的消費原理


消費端消費消息:

  在 初識ActiveMQ 中我提到過,兩種方法可以接收消息,一種是使用同步阻塞的ActiveMQMessageConsumer#receive方法。另一種是使用消息監聽器MessageListener。這里需要注意的是,在同一個session下,這兩者不能同時工作,也就是說不能針對不同消息采用不同的接收方式。否則會拋出異常。至於為什么這么做,最大的原因還是在事務性會話中,兩種消費模式的事務不好管控。

  先通過ActiveMQMessageConsumer#receive 方法來對消息的接受一探究竟:

public Message receive() throws JMSException {
        checkClosed();
        //檢查receive和MessageListener是否同時配置在當前的會話中,有則拋出異常
        checkMessageListener();
        //如果PrefetchSizeSize為0並且unconsumerMessage為空,則發起pull命令
        sendPullCommand(0);
        MessageDispatch md = dequeue(-1);//出列,獲取消息
        if (md == null) {
            return null;
        }
        beforeMessageIsConsumed(md);
        //發送ack給到broker
        afterMessageIsConsumed(md, false);
        //獲取消息並返回
        return createActiveMQMessage(md);
    }

  下面簡單的說一下以上幾個核心方法中做了什么不為人知的事:

  sendPullCommand(0) :發送pull命令從broker上獲取消息,前提是prefetchSize=0並且unconsumedMessages為空。unconsumedMessage表示未消費的消息,這里面預讀取的消息大小為prefetchSize的值

protected void sendPullCommand(long timeout) throws JMSException {
        clearDeliveredList();
        if (info.getCurrentPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
            MessagePull messagePull = new MessagePull();
            messagePull.configure(info);
            messagePull.setTimeout(timeout);
            //向服務端異步發送messagePull指令
            session.asyncSendPacket(messagePull);
        }
    }

  這里發送異步消息跟消息生產的原理是一樣的。通過包裝鏈去調用 Sokect 發送請求。

  clearDeliveredList():

  在上面的sendPullCommand方法中,會先調用clearDeliveredList方法,主要用來清理已經分發的消息鏈表deliveredMessages,存儲分發給消費者但還為應答的消息鏈表

    Ø 如果session是事務的,則會遍歷deliveredMessage中的消息放入到previouslyDeliveredMessage中來做重發
    Ø 如果session是非事務的,根據ACK的模式來選擇不同的應答操作

  這是個同步的過程:

    private void clearDeliveredList() {
        if (clearDeliveredList) {//判斷是否清楚
            synchronized (deliveredMessages) {//采用雙重檢查鎖
                if (clearDeliveredList) {
                    if (!deliveredMessages.isEmpty()) {
                        if (session.isTransacted()) {//是事務消息
                            if (previouslyDeliveredMessages == null) {
                                previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId, Boolean>(session.getTransactionContext().getTransactionId());
                            }
                            for (MessageDispatch delivered : deliveredMessages) {
                                previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false);
                            }
                            LOG.debug("{} tracking existing transacted {} delivered list ({}) on transport interrupt",
                                      getConsumerId(), previouslyDeliveredMessages.transactionId, deliveredMessages.size());
                        } else {
                            if (session.isClientAcknowledge()) {
                                LOG.debug("{} rolling back delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size());
                                // allow redelivery
                                if (!this.info.isBrowser()) {
                                    for (MessageDispatch md: deliveredMessages) {
                                        this.session.connection.rollbackDuplicate(this, md.getMessage());
                                    }
                                }
                            }
                            LOG.debug("{} clearing delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size());
                            deliveredMessages.clear();
                            pendingAck = null;
                        }
                    }
                    clearDeliveredList = false;
                }
            }
        }
    }

  dequeue(-1) :從unconsumedMessage中取出一個消息,在創建一個消費者時,就會為這個消費者創建一個未消費的消息通道,這個通道分為兩種,一種是簡單優先級隊列分發通道SimplePriorityMessageDispatchChannel ;另一種是先進先出的分發通道FifoMessageDispatchChannel.至於為什么要存在這樣一個消息分發通道,大家可以想象一下,如果消費者每次去消費完一個消息以后再去broker拿一個消息,效率是比較低的。所以通過這樣的設計可以允許session能夠一次性將多條消息分發給一個消費者。默認情況下對於queue來說,prefetchSize的值是1000

private MessageDispatch dequeue(long timeout) throws JMSException {
        try {
            long deadline = 0;
            if (timeout > 0) {
                deadline = System.currentTimeMillis() + timeout;
            }
            while (true) {//protected final MessageDispatchChannel unconsumedMessages;
                MessageDispatch md = unconsumedMessages.dequeue(timeout);

            ...........
    }

  beforeMessageIsConsumed(md):這里面主要是做消息消費之前的一些准備工作,如果ACK類型不是DUPS_OK_ACKNOWLEDGE或者隊列模式(簡單來說就是除了Topic和DupAck這兩種情況),所有的消息先放到deliveredMessages鏈表的開頭。並且如果當前是事務類型的會話,則判斷transactedIndividualAck,如果為true,表示單條消息直接返回ack。

  否則,調用ackLater,批量應答, client端在消費消息后暫且不發送ACK,而是把它緩存下來(pendingACK),等到這些消息的條數達到一定閥值時,只需要通過一個ACK指令把它們全部確認;這比對每條消息都逐個確認,在性能上要提高很多。

private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
        md.setDeliverySequenceId(session.getNextDeliveryId());
        lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
        if (!isAutoAcknowledgeBatch()) {
            synchronized(deliveredMessages) {
                deliveredMessages.addFirst(md);
            }
            if (session.getTransacted()) {
                if (transactedIndividualAck) {
                    immediateIndividualTransactedAck(md);
                } else {
                    ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
                }
            }
        }
    } 

  afterMessageIsConsumed:這個方法的主要作用是執行應答操作,這里面做以下幾個操作
    Ø 如果消息過期,則返回消息過期的ack
    Ø 如果是事務類型的會話,則不做任何處理
    Ø 如果是AUTOACK或者(DUPS_OK_ACK且是隊列),並且是優化ack操作,則走批量確認ack
    Ø 如果是DUPS_OK_ACK,則走ackLater邏輯
    Ø 如果是CLIENT_ACK,則執行ackLater

private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
        if (unconsumedMessages.isClosed()) {
            return;
        }
        if (messageExpired) {
            acknowledge(md, MessageAck.EXPIRED_ACK_TYPE);
            stats.getExpiredMessageCount().increment();
        } else {
            stats.onMessage();
            if (session.getTransacted()) {
                // Do nothing.
            } else if (isAutoAcknowledgeEach()) {
                if (deliveryingAcknowledgements.compareAndSet(false, true)) {
                    synchronized (deliveredMessages) {
                        if (!deliveredMessages.isEmpty()) {
                            if (optimizeAcknowledge) {
                                ackCounter++;

                                // AMQ-3956 evaluate both expired and normal msgs as
                                // otherwise consumer may get stalled
                                if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) {
                                    MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
                                    if (ack != null) {
                                        deliveredMessages.clear();
                                        ackCounter = 0;
                                        session.sendAck(ack);
                                        optimizeAckTimestamp = System.currentTimeMillis();
                                    }
                                    // AMQ-3956 - as further optimization send
                                    // ack for expired msgs when there are any.
                                    // This resets the deliveredCounter to 0 so that
                                    // we won't sent standard acks with every msg just
                                    // because the deliveredCounter just below
                                    // 0.5 * prefetch as used in ackLater()
                                    if (pendingAck != null && deliveredCounter > 0) {
                                        session.sendAck(pendingAck);
                                        pendingAck = null;
                                        deliveredCounter = 0;
                                    }
                                }
                            } else {
                                MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
                                if (ack!=null) {
                                    deliveredMessages.clear();
                                    session.sendAck(ack);
                                }
                            }
                        }
                    }
                    deliveryingAcknowledgements.set(false);
                }
            } else if (isAutoAcknowledgeBatch()) {
                ackLater(md, MessageAck.STANDARD_ACK_TYPE);
            } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
                boolean messageUnackedByConsumer = false;
                synchronized (deliveredMessages) {
                    messageUnackedByConsumer = deliveredMessages.contains(md);
                }
                if (messageUnackedByConsumer) {
                    ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
                }
            }
            else {
                throw new IllegalStateException("Invalid session state.");
            }
        }
    }

  其實在以上消息的接收過程中,我們僅僅能看到這個消息從一個本地變量中出隊,並沒有對遠程消息中心發送通訊獲取,那么這個消息時什么時候過來的呢?也就是消息出隊中  unconsumedMessages 這個東東時什么時候初始化的呢 ?那么接下去我們應該去通過創建連接的時候去看看了,具體連接的時候都做了什么呢:connectionFactory.createConnection()

protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
        if (brokerURL == null) {
            throw new ConfigurationException("brokerURL not set.");
        }
        ActiveMQConnection connection = null;
        try {// 果然發現了這個東東的初始化
            Transport transport = createTransport();
            // 創建連接
            connection = createActiveMQConnection(transport, factoryStats);
            // 設置用戶密碼
            connection.setUserName(userName);
            connection.setPassword(password);
            // 對連接做包裝
            configureConnection(connection);
            // 啟動一個后台傳輸線程
            transport.start();
            // 設置客戶端消費的id
            if (clientID != null) {
                connection.setDefaultClientID(clientID);
            }
 
            return connection;
        } ......
   }

  創建連接的過程就是創建除了一個帶有鏈路包裝的TcpTransport 並且創建連接,最后啟動一個傳輸線程,而這里的 transport.start()  調用的應該是TcpTransport 里面的方法,然而這個類中並沒有 start,而是在父類
ServiceSupport.start()中:

public void start() throws Exception {
        if (started.compareAndSet(false, true)) {
            boolean success = false;
            stopped.set(false);
            try {
                preStart();//一些初始化
                doStart();
                success = true;
            } finally {
                started.set(success);
            }
            for(ServiceListener l:this.serviceListeners) {
                l.started(this);
            }
        }
    }

  doStart 方法前做了一系列的初始化,然后調用 TcpTransport的doStart() 方法:

protected void doStart() throws Exception {
        connect();
        stoppedLatch.set(new CountDownLatch(1));
        super.doStart();
    }

  繼而構建一個連接 設置一個 CountDownLatch 門閂 ,調用父類 TransportThreadSupport 的方法,新建了一個精靈線程並且啟動:

protected void doStart() throws Exception {
        runner = new Thread(null, this, "ActiveMQ Transport: " + toString(), stackSize);
        runner.setDaemon(daemon);
        runner.start();
    }

  調用TransportThreadSupport.doStart(). 創建了一個線程,傳入的是 this,調用子類的 run 方法,也就是 TcpTransport.run().

public void run() {
        LOG.trace("TCP consumer thread for " + this + " starting");
        this.runnerThread=Thread.currentThread();
        try {
            while (!isStopped()) {
                doRun();
            }
        } catch (IOException e) {
            stoppedLatch.get().countDown();
            onException(e);
        } catch (Throwable e){
            stoppedLatch.get().countDown();
            IOException ioe=new IOException("Unexpected error occurred: " + e);
            ioe.initCause(e);
            onException(ioe);
        }finally {
            stoppedLatch.get().countDown();
        }
    }

  run 方法主要是從 socket 中讀取數據包,只要 TcpTransport 沒有停止,它就會不斷去調用 doRun:這里面,通過 wireFormat 對數據進行格式化,可以認為這是一個反序列化過程。wireFormat 默認實現是 OpenWireFormat,activeMQ 自定義的跨語言的wire 協議

protected void doRun() throws IOException {
        try {//通過 readCommand 去讀取數據
            Object command = readCommand();
            //消費消息
            doConsume(command);
        } catch (SocketTimeoutException e) {
        } catch (InterruptedIOException e) {
        }
    }
protected Object readCommand() throws IOException {
        return wireFormat.unmarshal(dataIn);
}

  這里的讀取流的部分就是從Socket里面讀取,而這個連接的 輸入/輸出流的初始化在  TcpTransport

protected void initializeStreams() throws Exception {
        TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize) {
            @Override
            public int read() throws IOException {
                receiveCounter++;
                return super.read();
            }
            @Override
            public int read(byte[] b, int off, int len) throws IOException {
                receiveCounter++;
                return super.read(b, off, len);
            }
            @Override
            public long skip(long n) throws IOException {
                receiveCounter++;
                return super.skip(n);
            }
            @Override
            protected void fill() throws IOException {
                receiveCounter++;
                super.fill();
            }
        };
        //Unread the initBuffer that was used for protocol detection if it exists
        //so the stream can start over
        if (initBuffer != null) {
            buffIn.unread(initBuffer.buffer.array());
        }
        this.dataIn = new DataInputStream(buffIn);
        TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
        this.dataOut = new DataOutputStream(outputStream);
        this.buffOut = outputStream;

    }

  doConsume:流程走到了消費消息: 

public void doConsume(Object command) {
        if (command != null) {//表示已經拿到了消息
            if (transportListener != null) {
                transportListener.onCommand(command);
            } else {
                LOG.error("No transportListener available to process inbound command: " + command);
            }
        }
    }

  TransportSupport 類中唯一的成員變量是 TransportListener transportListener;,這也意味着一個 Transport 支持類綁定一個傳送監聽器類,傳送監聽器接口 TransportListener 最重要的方法就是 void onCommand(Object command);,它用來處理命令。那么這個 transportListener 是在那里初始化的呢?可以思考一下 既然是TransportSupport 唯一的成員變量,而我們鎖創建的TcpTransport 是他的子類,那么是不是在創建該transport的時候亦或是在對他進行包裝處理的時候做了初始化呢? 我們會在流程中看到在新建 ActiveMQConnectionFactory 的時候有一行關鍵的代碼:

connection = createActiveMQConnection(transport, factoryStats);

  在這個方法里面追溯下去:會進入 ActiveMQConnection 的構造方法

protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {

        this.transport = transport;
        this.clientIdGenerator = clientIdGenerator;
        this.factoryStats = factoryStats;

        // Configure a single threaded executor who's core thread can timeout if
        // idle
        executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
                //Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796
                //thread.setDaemon(true);
                return thread;
            }
        });
        // asyncConnectionThread.allowCoreThreadTimeOut(true);
        String uniqueId = connectionIdGenerator.generateId();
        this.info = new ConnectionInfo(new ConnectionId(uniqueId));
        this.info.setManageable(true);
        this.info.setFaultTolerant(transport.isFaultTolerant());
        this.connectionSessionId = new SessionId(info.getConnectionId(), -1);

        this.transport.setTransportListener(this);

        this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
        this.factoryStats.addConnection(this);
        this.timeCreated = System.currentTimeMillis();
        this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
    }

  從以上代碼我們發現  this.transport.setTransportListener(this); 那么這個this是什么呢  ? 正是ActiveMQConnection ,看了一眼該類,發現這個類實現了 TransportListener ,本身就是一個TransportListener。所以上面 transportListener.onCommand(command); 就是 ActiveMQConnection.onCommand(command)。除了和 Transport相互綁定,還對線程池執行器 executor 進行了初始化。這哥執行器是后來要進行消息處理的。

  這里面會針對不同的消息做分發,在ActiveMQMessageConsumer#receive方法中鎖dequeue所返回的對象是MessageDispatch 。假設這里傳入的 command 是MessageDispatch,那么這個 command 的 visit 方法就會調用processMessageDispatch 方法。剪切出其中的代碼片段:

public Response processMessageDispatch(MessageDispatch md) throws Exception {
       // 等待 Transport 中斷處理完成
       waitForTransportInterruptionProcessingToComplete();
       // 這里通過消費者 ID 來獲取消費者對象
//(ActiveMQMessageConsumer 實現了 ActiveMQDispatcher 接口),所以
//MessageDispatch 包含了消息應該被分配到那個消費者的映射信息
       ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
       if (dispatcher != null) {
       // Copy in case a embedded broker is dispatching via
       // vm://
       // md.getMessage() == null to signal end of queue
       // browse.
       Message msg = md.getMessage();
       if (msg != null) {
       msg = msg.copy();
       msg.setReadOnlyBody(true);
       msg.setReadOnlyProperties(true);
       msg.setRedeliveryCounter(md.getRedeliveryCounter());
       msg.setConnection(ActiveMQConnection.this);
       msg.setMemoryUsage(null);
       md.setMessage(msg);
       }
       // 調用會話ActiveMQSession 自己的 dispatch 方法來處理這條消息
       dispatcher.dispatch(md);
       } else {
           LOG.debug("{} no dispatcher for {} in {}", this, md, dispatchers);
       }
       return null;
}

  其中 ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());這行代碼的 dispatchers 是在 通過session.createConsumer(destination); 的時候通過 ActiveMQMessageConsumer 的構造方法中有一行代碼 :this.session.addConsumer(this); 將 this傳入,即 ActiveMQMessageConsumer 對象。而這個 addConsumer 方法:

protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
        this.consumers.add(consumer);
        if (consumer.isDurableSubscriber()) {
            stats.onCreateDurableSubscriber();
        }
        this.connection.addDispatcher(consumer.getConsumerId(), this);
    }

  可以發現這里的初始化了:this.connection.addDispatcher(consumer.getConsumerId(), this); 這里的this 即 ActiveMQSession。所以回到 ActiveMQConnection#onCommand方法內 processMessageDispatch 這個方法最后調用了 dispatcher.dispatch(md); 這個方法的核心功能就是處理消息的分發。:

public void dispatch(MessageDispatch messageDispatch) {
        try {
            executor.execute(messageDispatch);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            connection.onClientInternalException(e);
        }
    }

  這里離我們真正要找的進行消息入隊的結果很近了,進入executor.execute(messageDispatch);這個方法:

void execute(MessageDispatch message) throws InterruptedException {

       ...........
//如果會話不是異步分發並且沒有使用 sessionpool 分發,則調用 dispatch 發送消息
        if (!session.isSessionAsyncDispatch() && !dispatchedBySessionPool) {
            dispatch(message);
        } else {//將消息直接放到隊列里
            messageQueue.enqueue(message);
            wakeup();
        }
    }

  這里最后終於發現了入隊,判斷是否異步分發,不是的話走dispatch(message) 否則進入異步分發。默認是采用異步消息分發。所以,直接調用 messageQueue.enqueue,把消息放到隊列中,並且調用 wakeup 方法:

public void wakeup() {
        if (!dispatchedBySessionPool) {//進一步驗證
           // //判斷 session 是否為異步分發
            if (session.isSessionAsyncDispatch()) {
                try {
                    TaskRunner taskRunner = this.taskRunner;
                    if (taskRunner == null) {
                        synchronized (this) {
                            if (this.taskRunner == null) {
                                if (!isRunning()) {
                                    // stop has been called
                                    return;
                                }
//通過 TaskRunnerFactory 創建了一個任務運行類 taskRunner,這里把自己作為一個 task 傳入到 createTaskRunner 中,
//說明當前的類一定是實現了 Task 接口的. 簡單來說,就是通過線程池去執行一個任務,完成異步調度
//這里由於executor != null 所以這個task的類型是PooledTaskRunner
                                this.taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this,
                                        "ActiveMQ Session: " + session.getSessionId());
                            }
                            taskRunner = this.taskRunner;
                        }
                    }
                    taskRunner.wakeup();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            } else {// 異步分發
                while (iterate()) {
                }
            }
        }
    }

  所以,對於異步分發的方式,會調用 ActiveMQSessionExecutor 中的 iterate方法,我們來看看這個方法的代碼  iterate ():這個方法里面做兩個事

  Ø 把消費者監聽的所有消息轉存到待消費隊列中
  Ø 如果 messageQueue 還存在遺留消息,同樣把消息分發(調度)出去

public boolean iterate() {
        // Deliver any messages queued on the consumer to their listeners.
// 將消費者上排隊的任何消息傳遞給它們的偵聽器。 for (ActiveMQMessageConsumer consumer : this.session.consumers) { if (consumer.iterate()) { return true; } } // No messages left queued on the listeners.. so now dispatch messages // queued on the session
// 偵聽器上沒有留下排隊等待的消息。現在分派消息 MessageDispatch message = messageQueue.dequeueNoWait(); if (message == null) { return false; } else {// 分發(調度)消息 dispatch(message); return !messageQueue.isEmpty(); } }

  dispatch(message);消息確認分發。通過ActiveMQSessionExecutor的dispatch 方法,轉到了 ActiveMQMessageConsumer 消費者類的  dispatch 方法:

public void dispatch(MessageDispatch md) {
        MessageListener listener = this.messageListener.get();
        try {
            clearMessagesInProgress();
            clearDeliveredList();
            synchronized (unconsumedMessages.getMutex()) {
                if (!unconsumedMessages.isClosed()) {// 判斷消息是否為重發消息
                    if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
                        if (listener != null && unconsumedMessages.isRunning()) {
                          //我這邊通過consumer.receive()處理消息,所以這里listener為空,走下面
                        } else {
                            if (!unconsumedMessages.isRunning()) {
                                // delayed redelivery, ensure it can be re delivered
                                session.connection.rollbackDuplicate(this, md.getMessage());
                            }

                            if (md.getMessage() == null) {
                                // End of browse or pull request timeout.
                                unconsumedMessages.enqueue(md);
                            } else {
                                if (!consumeExpiredMessage(md)) {
                                    unconsumedMessages.enqueue(md);
                                    if (availableListener != null) {
                                        availableListener.onMessageAvailable(this);
                                    }
      .........
}

  最終會走入 unconsumedMessages.enqueue(md);添加消息。這里需要注意的是enqueue 方法:由於消費者可能處於阻塞狀態,這里做了入隊后回釋放鎖,也就是接觸阻塞。

public void enqueue(MessageDispatch message) {
        synchronized (mutex) {
            list.addLast(message);
            mutex.notify();
        }
    }

  到這里為止,消息如何接受以及他的處理方式的流程,我們已經搞清楚了。其實在這個消息消費的流程中,已經在建立連接,創建消費者的時候就已經初始化好了消息隊列了。結合上面的過程來看看整個消費流程的流程圖

 消費端的 PrefetchSize:

  在消息發布的時候我們曾經研究過 producerWindowSize 。主要用來約束在異步發送時producer端允許積壓的(尚未ACK)的消息的大小,且只對異步發送有意義。對於客戶端,也是類似存在這么一個屬性來約束客戶端的消息處理。activemq 的 consumer 端也有窗口機制,通過 prefetchSize 就可以設置窗口大小。不同的類型的隊列,prefetchSize 的默認值也是不一樣的.

  Ø 持久化隊列和非持久化隊列的默認值為 1000

  Ø 持久化 topic 默認值為 100

  Ø 非持久化隊列的默認值為 Short.MAX_VALUE-1

 測試方法是在MQ上生產1000條消息,先后啟動comsumer1,comsumer2 兩個消費者並且循環調用1000次消費,我們會發現 comsumer2 拿不到消息,這個時候我們可以通過debug進入comsumer1 的ActiveMQConnect會發現里面有個屬性的size=1000.其實就是這個prefetchSize,翻譯過來是預取大小,消費端會根據prefetchSize 的大小批量獲取數據。意思是在創建連接的時候會取獲取1000條消息預加載到緩存中等待處理,這樣子導致comsumer2去獲取消息的時候 broker上已經空了。

prefetchSize 的設置方法:

  在 createQueue 中添加 consumer.prefetchSize,就可以看到效果

Destination destination=session.createQueue("myQueue?consumer.prefetchSize=10");

  既然有批量加載,那么一定有批量確認,這樣才算是徹底的優化,這就涉及到 optimizeAcknowledge

  ActiveMQ 提供了 optimizeAcknowledge 來優化確認,它表示是否開啟“優化ACK”,只有在為 true 的情況下,prefetchSize 以及optimizeAcknowledgeTimeout 參數才會有意義優化確認一方面可以減輕 client 負擔(不需要頻繁的確認消息)、減少通信開銷,另一方面由於延遲了確認(默認 ack 了 0.65*prefetchSize 個消息才確認),這個在源碼中有體現。在ActiveMQMessageConsumer#receive方法內的處理消息后的 afterMessageIsConsumed 方法內有一個判斷:

if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) { MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); if (ack != null) { deliveredMessages.clear(); ackCounter = 0; session.sendAck(ack);//滿足條件則發送批量應答ACK
             optimizeAckTimestamp = System.currentTimeMillis(); } // AMQ-3956 - as further optimization send // ack for expired msgs when there are any. // This resets the deliveredCounter to 0 so that // we won't sent standard acks with every msg just // because the deliveredCounter just below // 0.5 * prefetch as used in ackLater()
       if (pendingAck != null && deliveredCounter > 0) { session.sendAck(pendingAck); pendingAck = null; deliveredCounter = 0; } }

  broker 再次發送消息時又可以批量發送如果只是開啟了 prefetchSize,每條消息都去確認的話,broker 在收到確認后也只是發送一條消息,並不是批量發布,當然也可以通過設置 DUPS_OK_ACK來手動延遲確認, 我們需要在 brokerUrl 指定 optimizeACK 選項

ConnectionFactory connectionFactory= new ActiveMQConnectionFactory("tcp://192.168.11.153:61616?jms.optimizeAcknowledge=true&jms.optimizeAcknowledgeTimeOut=10000");

  Ø 注意,如果 optimizeAcknowledge 為 true,那么 prefetchSize 必須大於 0. 當 prefetchSize=0 的時候,表示 consumer 通過 PULL 方式從 broker 獲取消息.

  optimizeAcknowledge 和 prefetchSize 的作用,兩者協同工作,通過批量獲取消息、並延遲批量確認,來達到一個高效的消息消費模型。它比僅減少了客戶端在獲取消息時的阻塞次數,還能減少每次獲取消息時的網絡通信開銷

  Ø 需要注意的是,如果消費端的消費速度比較高,通過這兩者組合是能大大提升 consumer 的性能。如果 consumer 的消費性能本身就比較慢,設置比較大的 prefetchSize 反而不能有效的達到提升消費性能的目的。因為過大的prefetchSize 不利於 consumer 端消息的負載均衡。因為通常情況下,我們都會部署多個 consumer 節點來提升消費端的消費性能。這個優化方案還會存在另外一個潛在風險,當消息被消費之后還沒有來得及確認時,client 端發生故障,那么這些消息就有可能會被重新發送給其他consumer,那么這種風險就需要 client 端能夠容忍“重復”消息。

 消息的確認過程:

  消息確認有四種 ACK_MODE,分別是:

    1. AUTO_ACKNOWLEDGE = 1 自動確認

    2.CLIENT_ACKNOWLEDGE = 2 客戶端手動確認

    3.DUPS_OK_ACKNOWLEDGE = 3 自動批量確認

    4.SESSION_TRANSACTED = 0 事務提交並確認

   ACK_MODE 的選擇影響着消息消費流程的走向。雖然 Client 端指定了 ACK 模式,但是在 Client 與 broker 在交換 ACK 指令的時候,還需要告知 ACK_TYPE,ACK_TYPE 表示此確認指令的類型,不同的ACK_TYPE 將傳遞着消息的狀態,broker 可以根據不同的 ACK_TYPE 對消息進行不同的操作。

ACK_TYPE應答類型:

  DELIVERED_ACK_TYPE = 0  消息"已接收",但尚未處理結束

  STANDARD_ACK_TYPE = 2  "標准"類型,通常表示為消息"處理成功",broker 端可以刪除消息了

  POSION_ACK_TYPE = 1  消息"錯誤",通常表示"拋棄"此消息,比如消息重發多次后,都無法正確處理時,消息將會被刪除或者 DLQ(死信隊列),在消息處理的時候,dispatch方法內會判斷該消息是否為重發消息

if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage())) {
                        if (listener != null && unconsumedMessages.isRunning()) {
                        // 這段為非重發消息,走else
                    } else {
                        // deal with duplicate delivery
                        ConsumerId consumerWithPendingTransaction;
                        if (redeliveryExpectedInCurrentTransaction(md, true)) {
                            LOG.debug("{} tracking transacted redelivery {}", getConsumerId(), md.getMessage());
                            if (transactedIndividualAck) {
                                immediateIndividualTransactedAck(md);
                            } else {
                                session.sendAck(new MessageAck(md, MessageAck.DELIVERED_ACK_TYPE, 1));
                            }
                        } else if ((consumerWithPendingTransaction = redeliveryPendingInCompetingTransaction(md)) != null) {
                            LOG.warn("{} delivering duplicate {}, pending transaction completion on {} will rollback", getConsumerId(), md.getMessage(), consumerWithPendingTransaction);
                            session.getConnection().rollbackDuplicate(this, md.getMessage());
                            dispatch(md);
                        } else {// 走POSION_ACK_TYPE 添加Active_DLQ 死信隊列
                            LOG.warn("{} suppressing duplicate delivery on connection, poison acking: {}", getConsumerId(), md);
                            posionAck(md, "Suppressing duplicate delivery on connection, consumer " + getConsumerId());
                        }
                    }

  REDELIVERED_ACK_TYPE = 3  消息需"重發",比如 consumer 處理消息時拋出了異常,broker 稍后會重新發送此消息

  INDIVIDUAL_ACK_TYPE = 4  表示只確認"單條消息",無論在任何 ACK_MODE 下

  UNMATCHED_ACK_TYPE = 5 在 Topic 中,如果一條消息在轉發給“訂閱者”時,發現此消息不符合 Selector 過濾條件,那么此消息將 不會轉發給訂閱者,消息將會被存儲引擎刪除(相當於在 Broker 上確            認了消息)。

  Client 端在不同的 ACK 模式時,將意味着在不同的時機發送 ACK 指令,每個 ACK Command 中會包含 ACK_TYPE,那么 broker 端就可以根據 ACK_TYPE 來決定此消息的后續操作。在 afterMessageIsConsumed 消息接收處理后會根據條件來設置 ACK_TYPE.

消息的重發機制原理:

  在正常情況下,有幾中情況會導致消息重新發送

  Ø 在事務性會話中,沒有調用 session.commit 確認消息宕機或者調用session.rollback 方法回滾消息

  Ø 在非事務性會話中,ACK 模式為 CLIENT_ACKNOWLEDGE (客戶端手動應答)的情況下,沒有調用 session.commit或者調用了 recover 方法;

  一個消息被 redelivedred 超過默認的最大重發次數(默認 6 次)時,消費端會給 broker 發送一個”poison ack”表示這個消息有毒,告訴 broker 不要再發了。這個時候 broker 會把這個消息放到 DLQ(死信隊列)。設置方法如下:

ActiveMQConnectionFactory connectionFactory1 = (ActiveMQConnectionFactory) connectionFactory;
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(2);
connectionFactory1.setRedeliveryPolicy(redeliveryPolicy);

死信隊列:

  ActiveMQ 中默認的死信隊列是 ActiveMQ.DLQ,如果沒有特別的配置,有毒的消息都會被發送到這個隊列。默認情況下,如果持久消息過期以后,也會被送到 DLQ 中。

   只要在處理消息的時候拋出一個異常就可以演示,會看到控制台對於失敗消息會重發6次,登陸ActiveMQ控制台會看到一個 ActiveMQ.DLQ。在創建隊列的時候可以直接指定從ActiveMQ.DLQ去消費消息。

  死信隊列配置策略:

  缺省所有隊列的死信消息都被發送到同一個缺省死信隊列,不便於管理,可以通過 individualDeadLetterStrategy 或 sharedDeadLetterStrategy 策略來進行修改。在activemq.xml上

<destinationPolicy>
      <policyMap>
         <policyEntries>
            <policyEntry topic=">" >
                    <!-- The constantPendingMessageLimitStrategy is used to prevent
                         slow topic consumers to block producers and affect other consumers
                         by limiting the number of messages that are retained
                         For more information, see:

                         http://activemq.apache.org/slow-consumer-handling.html
                    -->
               <pendingMessageLimitStrategy>
                 <constantPendingMessageLimitStrategy limit="1000"/>
               </pendingMessageLimitStrategy>
            </policyEntry>

      // “>”表示對所有隊列生效,如果需要設置指定隊列,則直接寫隊列名稱
          <policyEntry queue=">">
             <deadLetterStrategy>
           //queuePrefix:設置死信隊列前綴
           //useQueueForQueueMessage 設置隊列保存到死信。
              <individualDeadLetterStrategy queuePrefix="DLQ."useQueueForQueueMessages="true"/>
             </deadLetterStrategy>
          </policyEntry>
         </policyEntries>
       </policyMap>
</destinationPolicy>

  自動丟棄過期消息

<deadLetterStrategy>
    <sharedDeadLetterStrategy processExpired="false" />
</deadLetterStrategy>

ActiveMQ VirtualTopic

  ActiveMQ支持的虛擬Destinations分為有兩種,分別是

  • 虛擬主題(Virtual Topics)
  • 組合 Destinations(CompositeDestinations)

  這兩種虛擬Destinations可以看做對簡單的topic和queue用法的補充,基於它們可以實現一些簡單有用的EIP功能,虛擬主題類似於1對多的分支功能+消費端的cluster+failover,組合Destinations類似於簡單的destinations直接的路由功能。

組合隊列(Composite Destinations):

  當你想把同一個消息一次發送到多個消息隊列,那么可以在客戶端使用組合隊列。

// send to 3 queues as one logical operation
Queue queue = new ActiveMQQueue("FOO.A,FOO.B,FOO.C"); producer.send(queue, someMessage);

  當然,也可以混合使用隊列和主題,只需要使用前綴:queue:// 或 topic://

// send to queues and topic one logical operation
Queue queue = new ActiveMQQueue("FOO.A,topic://NOTIFY.FOO.A"); producer.send(queue, someMessage);

虛擬主題(Virtual Topics):

  ActiveMQ中,topic只有在持久訂閱(durablesubscription)下是持久化的。存在持久訂閱時,每個持久訂閱者,都相當於一個持久化的queue的客戶端,它會收取所有消息。這種情況下存在兩個問題:

  1. 同一應用內consumer端負載均衡的問題:同一個應用上的一個持久訂閱不能使用多個consumer來共同承擔消息處理功能。因為每個都會獲取所有消息。queue模式可以解決這個問題,broker端又不能將消息發送到多個應用端。所以,既要發布訂閱,又要讓消費者分組,這個功能jms規范本身是沒有的。
  2. 同一應用內consumer端failover的問題:由於只能使用單個的持久訂閱者,如果這個訂閱者出錯,則應用就無法處理消息了,系統的健壯性不高。

  為了解決這兩個問題,ActiveMQ中實現了虛擬Topic的功能。使用起來非常簡單。對於消息發布者來說,就是一個正常的Topic,名稱以VirtualTopic.開頭。例如VirtualTopic.TEST。對於消息接收端來說,是個隊列,不同應用里使用不同的前綴作為隊列的名稱,即可表明自己的身份即可實現消費端應用分組。例如Consumer.A.VirtualTopic.TEST,說明它是名稱為A的消費端,同理Consumer.B.VirtualTopic.TEST說明是一個名稱為B的客戶端。可以在同一個應用里使用多個consumer消費此queue,則可以實現上面兩個功能。又因為不同應用使用的queue名稱不同(前綴不同),所以不同的應用中都可以接收到全部的消息。每個客戶端相當於一個持久訂閱者,而且這個客戶端可以使用多個消費者共同來承擔消費任務。

  默認虛擬主題的前綴是 :VirtualTopic.*  。自定義消費虛擬地址默認格式:Consumer.*.VirtualTopic.> 。自定義消費虛擬地址可以改,比如下面的配置就把它修改了。xml配置示例如下:

<broker xmlns="http://activemq.apache.org/schema/core">     <destinationInterceptors>         <virtualDestinationInterceptor>             <virtualDestinations>                 <virtualTopic name=">" prefix="VirtualTopicConsumers.*." selectorAware="false"/><!-- 修改的Consumer的開頭格式-->             </virtualDestinations>         </virtualDestinationInterceptor>     </destinationInterceptors>
</broker>

  那么生產者發送的時候的代碼如下:

Destination destination = session.createTopic("VirtualTopic.helloTopic");

  生產者是這樣的:

Destination destination = session.createQueue("VirtualTopicConsumers.A.VirtualTopic.helloTopic");
Destination destination = session.createQueue("VirtualTopicConsumers.B.VirtualTopic.helloTopic");

ActiveMQ 靜態網絡配置:broker網絡連接(broker的高性能方案):

  修改 activeMQ 服務器的 activeMQ.xml, 增加如下配置,這個配置只能實現單向連接,實現雙向連接需要各個節點都配置如下配置。

<networkConnectors>
    <networkConnector uri="static://(tcp://192.168.254.135:61616,tcp://192.168.254.136:61616)"/>
</networkConnectors>

  兩個 Brokers 通過一個 static 的協議來進行網絡連接。一個 Consumer 連接到BrokerB 的一個地址上,當 Producer 在 BrokerA 上以相同的地址發送消息,此時消息會被轉移到 BrokerB 上,也就是說 BrokerA 會轉發消息到BrokerB 上。

   在activeMQ中,進行了靜態網絡橋接的兩台節點而言,當 Producer 在 BrokerA 上以相同的地址發送10條消息。一個 Consumer 連接到BrokerB去消費消息,當消費了一半的時候出現異常了,那么剩下來未處理的消息會被存放到 BrokerB 的待處理消息隊列中,此時要通過BrokerA再去消費是消費不到的,萬一此刻BrokerB 掛了,那么哪些沒有消費的消息將會丟失。mq給我們提供了一個有效的消息回流機制。

<policyEntry queue=">" enableAudit="false">
    <networkBridgeFilterFactory>
         <conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
    </networkBridgeFilterFactory>
</policyEntry>

ActiveMQ 的優缺點:

  ActiveMQ 采用消息推送方式,所以最適合的場景是默認消息都可在短時間內被消費。數據量越大,查找和消費消息就越慢,消息積壓程度與消息速度成反比。

  缺點:

  • 吞吐量低。由於 ActiveMQ 需要建立索引,導致吞吐量下降。這是無法克服的缺點,只要使用完全符合 JMS 規范的消息中間件,就要接受這個級別的TPS。
  • 無分片功能。這是一個功能缺失,JMS 並沒有規定消息中間件的集群、分片機制。而由於 ActiveMQ 是偉企業級開發設計的消息中間件,初衷並不是為了處理海量消息和高並發請求。如果一台服務器不能承受更多消息,則需要橫向拆分。ActiveMQ 官方不提供分片機制,需要自己實現。

  適用場景:

  對 TPS 要求比較低的系統,可以使用 ActiveMQ 來實現,一方面比較簡單,能夠快速上手開發,另一方面可控性也比較好,還有比較好的監控機制和界面

  不適用的場景:

  消息量巨大的場景。ActiveMQ 不支持消息自動分片機制,如果消息量巨大,導致一台服務器不能處理全部消息,就需要自己開發消息分片功能。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM