RocketMQ源碼分析——消費端


長輪詢 (RocketMQ使用)

Consumer -> Broker RocketMQ采用的長輪詢建立連接

  • consumer的處理能力Broker不知道
  • 直接推送消息 broker端壓力較大
  • 采用長連接有可能consumer不能及時處理推送過來的數據
  • pull主動權在consumer手里

短輪詢

client不斷發送請求到server,每次都需要重新連接

長輪詢

client發送請求到server,server有數據返回,沒有數據請求掛起不斷開連接

長連接

連接一旦建立,永遠不斷開,push方式推送

消費端

主要從以下 5步操作 進行源碼跟蹤

  1. new出 DefaultMQPushConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer");
  1. 設置Namesrv地址
consumer.setNamesrvAddr("192.168.88.134:9876");
  1. 訂閱topic,並進行過濾。( pullMessageService 啟動后 ,會看到內部如何操作)
  • DefaultMQPushConsumer的方法由 defaultMQPushConsumerImpl類進行真正實現
  • 返回subscriptionData ,subExpression要么指定,要么為*
  • mQClientFactory會在 消費客戶端啟動后,向broker發送心跳包
consumer.subscribe("tagTopic", "TAG-A");

-----------------------------------------------------------------------------------------

public void subscribe(String topic, String subExpression) throws MQClientException {
	this.defaultMQPushConsumerImpl.subscribe(this.withNamespace(topic), subExpression);
    }
    
-----------------------------------------------------------------------------------------
    
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
    
-----------------------------------------------------------------------------------------

if (this.mQClientFactory != null) {
                this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
            }
  1. 設置消息監聽,並回調
consumer.registerMessageListener

默認情況下 這條消息只會被 一個consumer 消費到 點對點
message 狀態修改 ( 由broker進行維護 )

ACK (重新投遞) 返回消費狀態--->CONSUME_SUCCESS 消費成功 || RECONSUME_LATER 消費失敗,重新消費
  • 返回 Broker RECONSUME_LATER狀態時
  • RocketMQ會把這批消息重發回Broker。(topic不是原topic而是這個消費租的RETRY topic 重發topic)
  • 在延遲的某個時間點(默認是10秒,業務可設置)后,再次投遞到這個ConsumerGroup的另一個消費者。
  • 如果一直這樣重復消費都持續失敗到一定次數(默認16次),就會投遞到DLQ死信隊列。應用可以監控死信隊列來做人工干預。
  1. 啟動 消費客戶端 ( 開啟 traceDispatcher 追蹤調度 )
consumer.start();
this.defaultMQPushConsumerImpl.start();
  • 針對 ServiceState 狀態進行操作
    剛剛創建	CREATE_JUST,
    正在運行	RUNNING,
    已經關閉	SHUTDOWN_ALREADY,
    開啟失敗	START_FAILED;
  • 檢查配置,獲取訂閱列表 SubscriptionData
this.checkConfig();
this.copySubscription();
  • 獲取MQClient實例
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
  • 注冊消費者,並開啟客戶端
boolean registerOK = this.mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
------------------------------------------------------------------------
this.mQClientFactory.start();

consumeMessageService啟動

this.consumeMessageService.start();

MQClientInstance啟動流程

this.mQClientAPIImpl.start();
this.startScheduledTask();
this.pullMessageService.start();
this.rebalanceService.start();
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);

NettyRemotingClient啟動

  • 啟動Netty遠程調用Client (4個工作線程),創建事件執行組並放入Netty管道中
  • NettyRemotingClient定時掃描 ResponseTable
  • 遍歷responseTable ,ConcurrentHashMap類型 ,初始容量256
  • 對於超時請求 進行刪除操作
				this.mQClientAPIImpl.start();
				private int clientWorkerThreads = 4;
				
----------------------------------------------------------------------------------------
 pipeline.addFirst(NettyRemotingClient.this.defaultEventExecutorGroup, "sslHandler", 					NettyRemotingClient.this.sslContext.newHandler(ch.alloc()));
                NettyRemotingClient.log.info("Prepend SSL handler");
-----------------------------------------------------------------------------------------

this.timer.scheduleAtFixedRate(new TimerTask() {
            public void run() {
                try {
                    NettyRemotingClient.this.scanResponseTable();
                } catch (Throwable var2) {
                    NettyRemotingClient.log.error("scanResponseTable exception", var2);
                }

            }
        }, 3000L, 1000L);
        
        protected final ConcurrentMap<Integer, ResponseFuture> responseTable = new ConcurrentHashMap(256);
        
        if (rf.getBeginTimestamp() + rf.getTimeoutMillis() + 1000L <= System.currentTimeMillis()) {
                rf.release();
                it.remove();
                rfList.add(rf);
                log.warn("remove timeout request, " + rf);
            }
  • channelEventListener 不為空, 開啟nettyEventExecutor 事件執行器( 啟動ServiceThread線程 )

  • org.apache.rocketmq.remoting.common 屬於Netty的ServiceThread
    
if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }

startScheduledTask啟動

  • 每120s 判斷 NamesrvAddr地址,若為空,便去獲取新的地址
 private void startScheduledTask() {
        if (null == this.clientConfig.getNamesrvAddr()) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                public void run() {
                    try {
                        MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                    } catch (Exception var2) {
                        MQClientInstance.this.log.error("ScheduledTask fetchNameServerAddr exception", var2);
                    }

                }
            }, 10000L, 120000L, TimeUnit.MILLISECONDS);
        }

pullMessageService啟動 、 實現消息消費 ( 重點 )

  • org.apache.rocketmq.common 屬於Rocketmq的ServiceThread
    
public void start() {
        log.info("Try to start service thread:{} started:{} lastThread:{}", new Object[]{this.getServiceName(), this.started.get(), this.thread});
        if (this.started.compareAndSet(false, true)) {
            this.stopped = false;
            this.thread = new Thread(this, this.getServiceName());
            this.thread.setDaemon(this.isDaemon);
            this.thread.start();
        }
    }
final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue();
  • 拉取消息服務
public void run() {
        this.log.info(this.getServiceName() + " service started");

        while(!this.isStopped()) {
            try {
                PullRequest pullRequest = (PullRequest)this.pullRequestQueue.take();
                
-----------------------------------------------------------------------------------------
                this.pullMessage(pullRequest);
-----------------------------------------------------------------------------------------

            } catch (InterruptedException var2) {
                ;
            } catch (Exception var3) {
                this.log.error("Pull Message Service Run Method exception", var3);
            }
        }

        this.log.info(this.getServiceName() + " service end");
    }
  • LinkedBlockingQueue pullRequestQueue 拉取隊列中取出一個拉取請求
  • 獲取AtomicInteger、可中斷的ReentrantLock重入鎖
  • lockInterruptibly(); 可中斷重入鎖 (一旦檢測到中斷請求,方法返回不再參與鎖競爭,直接拋出中斷異常)
public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
  • dequeue 出隊列,出一個PullRequest拉取請求

  • PullRequest拉取請求包括:消費組,messageQueue(元消息隊列包括:topic、brokerName、queueId )

  • processQueue 處理隊列主要包括:(TreeMap<Long, MessageExt> 存放消息)

  • private String consumerGroup;
    private MessageQueue messageQueue;
    private ProcessQueue processQueue;
    private long nextOffset;
    
private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }
  • 拉取消息服務中 this.pullMessage(pullRequest);
  • 獲取消費者,准備進行processQueue 消費
 private void pullMessage(PullRequest pullRequest) {
 
-----------------------------------------------------------------------------------------
        MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
-----------------------------------------------------------------------------------------

        if (consumer != null) {
            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl)consumer;
            
-----------------------------------------------------------------------------------------
            impl.pullMessage(pullRequest);
-----------------------------------------------------------------------------------------
        } else {
            this.log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
        }
    }
  • 拉取采用異步回調方法,onSuccess( PullResult pullResult )

  • pullResult.getMsgFoundList() 結果為 List<MessageExt> msgFoundList
    
  • submitConsumeRequest 兩個實現 ConcurrentlyService 和 OrderlyService 多線程消費和順序消費

  • executePullRequestImmediately ,將pullRequest put () pullRequestQueue 中

PullCallback pullCallback = new PullCallback()
public void onSuccess(PullResult pullResult) {
switch(pullResult.getPullStatus()) {
	case FOUND:

pullRequest.setNextOffset(pullResult.getNextBeginOffset());

DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume);

-----------------------------------------------------------------------------------------
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0L) {        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                            } else {
 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                            }
-----------------------------------------------------------------------------------------                               
 this.pullAPIWrapper.pullKernelImpl(pullRequest.getMessageQueue(), subExpression, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), pullRequest.getNextOffset(), this.defaultMQPushConsumer.getPullBatchSize(), sysFlag, commitOffsetValue, 15000L, 30000L, CommunicationMode.ASYNC, pullCallback);
 	}
 }
  • this.pullAPIWrapper.pullKernelImpl + pullCallback 回調方法 處理拉取到消息PullResult
public class PullResult {
    private final PullStatus pullStatus;
    private final long nextBeginOffset;
    private final long minOffset;
    private final long maxOffset;
    private List<MessageExt> msgFoundList;
  • ConcurrentlyService 並發消費服務

  • 並發消費 和 順序消費 run執行體 大體相同,

  • 主要區別在於:生產者向指定queue隊列發送消息,跟普通消息相比,順序消息的使用需要在producer的send()方法中添加MessageQueueSelector接口的實現類,並重寫select選擇使用的隊列,因為順序消息局部順序,需要將所有消息指定發送到同一隊列中。

  • 消費者 設置最大最小線程數為1,並實現MessageListenerOrderly 接口進行消息消費


  • msgs 小於等於 consumeMessageBatchMaxSize ,new出 consumeRequest , 在線程池消費

  • if (msgs.size() <= consumeBatchSize) 
    
  • 若大於consumeMessageBatchMaxSize ,每次只能消費consumeMessageBatchMaxSize 數量的消息

  • private String consumerGroup;
    private List&lt;MessageExt&gt; msgList;
    private MessageQueue mq;
    private boolean success;
    private String status;
    private Object mqTraceContext;
    private Map&lt;String, String&gt; props;
    private String namespace;
    
  • 並發消費線程池

this.consumeExecutor = new ThreadPoolExecutor(this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 60000L, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl("ConsumeMessageThread_"));
  • MessageListenerConcurrently 並發消費監聽接口
  • public interface MessageListenerConcurrently extends MessageListener {
        ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> var1, ConsumeConcurrentlyContext var2);
    }
    
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
                ConsumeConcurrentlyContext context = new  
                ConsumeConcurrentlyContext(this.messageQueue);
                ConsumeConcurrentlyStatus status = null;
                ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.resetRetryAndNamespace(this.msgs,ConsumeMessageConcurrentlyService.this.defaultMQPushConsumer.getConsumerGroup());
                ConsumeMessageContext consumeMessageContext = null;
     if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                    consumeMessageContext = new ConsumeMessageContext();
                    consumeMessageContext.setNamespace(ConsumeMessageConcurrentlyService.this.defaultMQPushConsumer.getNamespace());
                    consumeMessageContext.setConsumerGroup(ConsumeMessageConcurrentlyService.this.defaultMQPushConsumer.getConsumerGroup());
                    consumeMessageContext.setProps(new HashMap());
                    consumeMessageContext.setMq(this.messageQueue);
                    consumeMessageContext.setMsgList(this.msgs);
                    consumeMessageContext.setSuccess(false);
                    ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
                }
                long beginTimestamp = System.currentTimeMillis();
                boolean hasException = false;
                ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
  • OrderlyService 順序消費服務

  • new出 consumeRequest , 在線程池消費

  • ConsumeMessageOrderlyService.ConsumeRequest consumeRequest = new ConsumeMessageOrderlyService.ConsumeRequest(processQueue, messageQueue);
    
  • consumeExecutor 順序消費線程池 執行 consumeRequest

  • this.consumeExecutor.submit(consumeRequest);
    
  • this.consumeExecutor = new ThreadPoolExecutor(this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 60000L, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl("ConsumeMessageThread_"));
    
  • consumeRequest實現Runnable接口,下面是它的run()方法

  • class ConsumeRequest implements Runnable {
        private final ProcessQueue processQueue;
        private final MessageQueue messageQueue;
    
  • 取出List 消息集合

List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
  • resetRetryAndNamespace 過濾重投消息
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
  • 遍歷 消息集合, 找出屬性為RETRY_TOPIC 重投的消息 , 設置該消息的topic

  • String retryTopic = msg.getProperty("RETRY_TOPIC");
    if (retryTopic != null && groupTopic.equals(msg.getTopic())) {
        msg.setTopic(retryTopic);
    }
    
    if (StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) {
        msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
    }
    
if (!msgs.isEmpty()) {
          ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
          ConsumeOrderlyStatus status = null;
          ConsumeMessageContext consumeMessageContext = null;
       if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                consumeMessageContext = new ConsumeMessageContext();
                                        consumeMessageContext.setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
                                        consumeMessageContext.setNamespace(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getNamespace());
                consumeMessageContext.setMq(this.messageQueue);
                consumeMessageContext.setMsgList(msgs);
                consumeMessageContext.setSuccess(false);
                consumeMessageContext.setProps(new HashMap());
                                        ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
                         }

              long beginTimestamp = System.currentTimeMillis();
              ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
  • MessageListenerOrderly 順序消費監聽接口 繼承了 messageListener 接口

  • MessageListenerOrderly 就是 我們在設置 監聽訂閱時 回調用的接口,重寫此方法進行消息消費

  • public interface MessageListenerOrderly extends MessageListener {
        ConsumeOrderlyStatus consumeMessage(List<MessageExt> var1, ConsumeOrderlyContext var2);
    }
    
  • 就是這一行,如果重寫樂監聽接口,就能消費消息

-----------------------------------------------------------------------------------------
status = ConsumeMessageOrderlyService.this.messageListener.consumeMessage(
    Collections.unmodifiableList(msgs), context  );
-----------------------------------------------------------------------------------------
  • 消息在消費前后 executeHookBefore,executeHookAfter ( Hook進行調用 )
    if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                                        consumeMessageContext.setStatus(status.toString());
                                        consumeMessageContext.setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
                                        ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
                                    }

rebalanceService啟動

  • 和pullMessageService一樣啟動 Rocketmq的ServiceThread

  • 同一個抽象類 rebalanceService 和 pullMessageService 為具體實現

  • public abstract class ServiceThread implements Runnable

  • 等待間隔

private static long waitInterval = Long.parseLong(System.getProperty("rocketmq.client.rebalance.waitInterval", "20000"));
  public void run() {
        this.log.info(this.getServiceName() + " service started");

        while(!this.isStopped()) {
            this.waitForRunning(waitInterval);
            this.mqClientFactory.doRebalance();
        }
    
        this.log.info(this.getServiceName() + " service end");
    }
  • 進行負載
public void doRebalance() {
        Iterator var1 = this.consumerTable.entrySet().iterator();

        while(var1.hasNext()) {
            Entry<String, MQConsumerInner> entry = (Entry)var1.next();
            MQConsumerInner impl = (MQConsumerInner)entry.getValue();
            if (impl != null) {
                try {
                    impl.doRebalance();
                } catch (Throwable var5) {
                    this.log.error("doRebalance exception", var5);
                }
            }
        }
 public void doRebalance() {
        if (!this.pause) {
            this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
        }

    }
  • 獲取訂閱列表
  • 廣播和集群兩種模式 BROADCASTING: CLUSTERING:
private void rebalanceByTopic(String topic, boolean isOrder) {
        Set mqSet;

-----------------------------------------------------------------------------------------
廣播模式::::
        switch(this.messageModel) {
        case BROADCASTING:
            mqSet = (Set)this.topicSubscribeInfoTable.get(topic);
            if (mqSet != null) {
-----------------------------------------------------------------------------------------

清理不重要的消息 ( 同一個topic下,清理沒有在topicSubscribeInfoTable訂閱列表中的MessageQueue )  
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);

-----------------------------------------------------------------------------------------
   if (changed) {
     this.messageQueueChanged(topic, mqSet, mqSet);
     log.info("messageQueueChanged {} {} {} {}", new Object[]{this.consumerGroup, topic, mqSet, mqSet});
            }
            } else {
     log.warn("doRebalance, {}, but the topic[{}] not exist.", this.consumerGroup, topic);
            }
            break;
            
  • 集群模式下:::
  • 獲取同一個 消費組中 ,訂閱同一個topic的 消費者列表
集群模式::::
      case CLUSTERING:
      mqSet = (Set)this.topicSubscribeInfoTable.get(topic);
  List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, this.consumerGroup);
            if (null == mqSet && !topic.startsWith("%RETRY%")) {
      log.warn("doRebalance, {}, but the topic[{}] not exist.", this.consumerGroup, topic);
            }

            if (null == cidAll) {
     log.warn("doRebalance, {} {}, get consumer id list failed", this.consumerGroup, topic);
            }

            if (mqSet != null && cidAll != null) {
                List<MessageQueue> mqAll = new ArrayList();
                mqAll.addAll(mqSet);
                Collections.sort(mqAll);
                Collections.sort(cidAll);
  AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
                List allocateResult = null;
  • allocateMessageQueueStrategy 分配消息隊列策略
  • 為當前消費端 分配消息隊列MessageQueue
                try {
         allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll);
                } catch (Throwable var10) {
                    log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), var10);
                    return;
                }

                Set<MessageQueue> allocateResultSet = new HashSet();
                if (allocateResult != null) {
                    allocateResultSet.addAll(allocateResult);
                }

           boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                if (changed) {
                    log.info("rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", new Object[]{strategy.getName(), this.consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), allocateResultSet.size(), allocateResultSet});
                    this.messageQueueChanged(topic, mqSet, allocateResultSet);
                }
            }
        }

    }

DefaultMQProducerImpl (消費端默認false,不啟動)

  • true的話,傳入生產者相關配置 class DefaultMQProducer extends ClientConfig 創建生產者實例
  • 在producerTable生產者列表中 , 進行生產者客戶端注冊( 本質ConcurrentMap )
  this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, this.rpcHook);
  boolean registerOK = this.mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);


ConcurrentMap<String, MQProducerInner> producerTable;


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM