這節介紹RocketMQ客戶端的啟動流程,即Consumer和Producer的啟動流程。
1. 客戶端demo
首先先看下客戶端的demo
Producer:
public class SyncProducer {
public static void main (String[] args) throws Exception {
// 實例化消息生產者Producer
DefaultMQProducer producer = new DefaultMQProducer ("GroupTest");
// 設置NameServer的地址
producer.setNamesrvAddr ("localhost:9876");
// 啟動Producer實例
producer.start ();
for (int i = 0; i < 100; i++) {
// 創建消息,並指定Topic,Tag和消息體
Message msg = new Message ("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes (RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 發送消息到一個Broker
SendResult sendResult = producer.send (msg);
// 通過sendResult返回消息是否成功送達
System.out.printf ("%s%n", sendResult);
}
// 如果不再發送消息,關閉Producer實例。
producer.shutdown ();
}
}
Consumer:
public class Consumer {
public static void main (String[] args) throws InterruptedException, MQClientException {
// 實例化消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("GroupTest");
// 設置NameServer的地址
consumer.setNamesrvAddr ("localhost:9876");
// 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息
consumer.subscribe ("TopicTest", "*");
// 注冊回調實現類來處理從broker拉取回來的消息
consumer.registerMessageListener (new MessageListenerConcurrently () {
@Override
public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf ("%s Receive New Messages: %s %n", Thread.currentThread ().getName (), msgs);
// 標記該消息已經被成功消費
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 啟動消費者實例
consumer.start ();
System.out.printf ("Consumer Started.%n");
}
}
Producer和Consumer的啟動類似,在初始化然后進行必要設置(主要是客戶端所屬的Group和NameServer地址)后,執行start方法啟動后台監聽服務,事實上Producer和Consumer都是調用同一個類MQClientInstance的start方法,下圖為繼承關系:
DefaultMQproducer和DefaultMQPushConsumer都繼承自ClientConfig,顧名思義ClientConfig表示客戶端的配置,包括NameServer地址、客戶端地址、客戶端實例名等。由於Producer和Consumer都需要同Broker和NameServer交互,所以配置上有很多相同,這兩個將主要功能的實現都委托給了對應的Impl(DefaultMQProducerImpl和DefaultMQPushConsumerImpl)。Impl內部調用了MQClientInstance來完成客戶端同遠程交互的主要功能,而Producer和Consumer則封裝自己相關的行為,MQClientInstance內部又委托忒了MQClientAPIImpl。
2. Producer的啟動
DefaultMQProducer的啟動如下:
DefaultMQProducer將start委托給了DefaultMQProducerImpl來完成,主要過程為:
- DefaultMQProducerImpl先標記客戶端當前狀態為START_FAILED(初始狀態為CREATE_JUST)
- 調用MQClientManager的getAndCreateMQClientInstance方法獲取MQClientInstance,每個客戶端實例都會對應一個MQClientInstance,並由MQClientManager管理。MQClientManager內部使用一個Map維護各客戶端的關系,key為clientId(格式為ip@instName,instName為pid),value為MQClientInstance實例。當key不存在時則會初始化一個實例,在初始化時連帶初始化MQClientAPIImpl、NettyRemoteClient等。
- 調用MQClientInstance的registerProducer方法,注冊當前客戶端自身。實現上是客戶端放入client實例緩存中,定時器定時上報,后面會說。
- 調用MQClientInstance的start方法,啟動客戶端的后台任務,該方法是重點,后面會介紹。
- 標記客戶端當前狀態為RUNNING
- 調用MQClientInstance的sendHeartbeatToAllBrokerWithLock方法,向所有Broker上報心跳
3. Consumer的啟動
DefaultMQPushConsumer的啟動如下:
DefaultMQPushConsumer同樣將start委托給了DefaultMQPushConsumerImpl來完成,流程上也相似。但相比DefaultMQProducer多了很多其他組件來輔助消費過程,如rebalance、offset管理等,主要過程為:
- DefaultMQPushConsumerImpl先標記客戶端當前狀態為START_FAILED(初始狀態為CREATE_JUST)
- 同步設置RebalanceImpl的topic(Map</*topic*/String,/*sub expression*/String>)信息
- 同DefaultMQProducer一致,調用MQClientManager的getAndCreateMQClientInstance方法獲取MQClientInstance,每個客戶端實例都會對應一個MQClientInstance,並由MQClientManager管理。MQClientManager內部使用一個Map維護各客戶端的關系,key為clientId(格式為ip@instName),value為MQClientInstance實例。當key不存在時則會初始化一個實例,在初始化時連帶初始化MQClientAPIImpl、NettyRemoteClient等。這里需要說明的是,RocketMQ中Consumer的消費模式分為CLUSTERING和BROADCASTING,即集群消費和廣播消費。區別在於集群消費時,一條消息只會被一個實例消費,即各實例會平分所有的消息;而廣播消費時所有實例都會收到同一條消息。體現在clientId的是,集群模式下instName為pid,而廣播模式instName為DEFAULT。
- 設置RebalanceImpl屬性,包括所在Group、消費模式、消息分配策略(平均分配q的策略)
- 初始化PlullAPIWrapper,設置消息過濾器鈎子列表
- 初始化OffsetStore,設置offset的存儲模式,廣播模式使用本地存儲;集群模式使用遠程存儲
- 初始化ConsumeMessageService,根據監聽器類型設定消息消費模式(順序消費/並行消費),pull模式需要自己指定offset,push不需要設定。
- 啟動ConsumeMessageService
- 同DefaultMQProducer一致,調用MQClientInstance的registerProducer方法,注冊當前客戶端自身。實現上是客戶端放入client實例緩存中,定時器定時上報,后面會說。
- 調用MQClientInstance的start方法,啟動客戶端的后台任務,該方法是重點,后面會介紹。
- 標記客戶端當前狀態為RUNNING
- 判斷監聽信息是否發生改變,從namesrv更新topic的路由信息
- 調用MQClientInstance的checkClientInBroker方法,確認該實例已經在broker注冊成功,否則拋異常
- 調用MQClientInstance的sendHeartbeatToAllBrokerWithLock方法,向所有Broker上報心跳
- 調用MQClientInstance的rebalanceImmediately方法,觸發一次rebalance
DefaultMQPushConsumer為推模式,RocketMQ還提供了拉模式來消費消息,實現類為DefaultMQPullConsumer,啟動過程類似,推模式是用拉模式來實現的,重點實現都在MQClientInstace中。
4. MQClientInstance
MQClientInstance為一個門戶類,組合了各功能,如下,包括Rebalance、消費數據統計、生產消息、消費消息等,這些都有對應的實現。
上面說過,Producer和Consumer在啟動的時候,都會在內部先初始化一個MQClientInstance對象,然后調用其start方法啟動對應的后台程序,如下:
MQClientInstance的start方法除了調用自身進行准備工作外,也調用了其他組件的start方法開始它們的准備工作,主要流程為:
- 先標記客戶端當前狀態為START_FAILED(初始狀態為CREATE_JUST)
- 若沒有指定nameserver地址,則調用MQClientAPIImpl同步獲取一次(通過設置的Http endpoint同步)
- 調用MQClientAPIImpl的start方法,主要是初始化Netty客戶端,啟動netty client初始化任務,連接的建立發生在第一次請求時
- 開啟MQClientInstance的定時任務,包括:
- 如果沒有指定nameserver地址,每兩分鍾從配置的endpoint處同步nameserver地址
- 定時從namesrv同步topic路由信息
- 定時清除下線的broker信息;發送心跳
- 定時持久化消費者消費的offset信息
- 每1分鍾調整線程池的大小
- 調用PullMessageService的start方法,啟動拉取消息線程
- 調用RebalanceService的start方法,啟動rebalance線程
- 調用內部Producer(CLIENT_INNER_PRODUCER)的start方法
- 標記客戶端當前狀態為RUNNING
下面詳細介紹下各個過程。
4.2. MQClientAPIImpl.fetchNameServerAddr
該方法用於更新NameServer地址,該方法會從http://xxx:port/rocketmq/yyy
,默認8080端口(如果xxx中沒有:,即不帶端口時)中獲取NameServer地址(xxx為域名,由系統配置項rocketmq.namesrv.domain控制,默認為jmenv.tbsite.net;yyy為訪問路徑,由系統配置項rocketmq.namesrv.domain.subgroup控制,默認為nsaddr)。該地址要求返回結果為一個ip列表,以;隔開,如果獲取回來的地址跟現有的地址不一致則會更新緩存的NameServer地址列表。解析出來的地址列表用於根據NettyRemotingClient內部持有的變量:
private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();
4.3. MQClientAPIImpl.start
該方法在內部調用了NettyRemotingClient的start方法,用於初始化Netty客戶端。NettyRemotingClient是基於Netty實現的tcp協議客戶端,主要流程為:
- 初始化客戶端bootstrap連接池
- 設置處理鏈:編碼、解碼、空閑處理、連接管理(服務端)、請求分發
- 每3秒清除超時的請求(netty主線程不處理邏輯)
- 啟動客戶端的事件處理器,處理IDLE、CLOSE、CONNECT、EXCEPTION事件
關於NettyRemotingClient后面會專門進行講解,這里只介紹在客戶端啟動時其做了哪些動作。
4.4.2. MQClientInstance.updateTopicRouteInfoFromNameServer
該方法用於根據客戶端實例關注的所有topic的路由信息,包括客戶端監聽的topic以及producer生產的topic。首先會遍歷從MQClientInstance內部的consumerTable和consumerTable的客戶端實例,拿到所有的topic信息,然后挨個更新topic的路由。
同步topic路由時,會通過NettyRemotingClient選擇一個NameServer獲取topic路由信息,然后判斷topic信息是否發生了更改,主要比較topic所對應的Queue和Broker是否發生了更改。若路由信息發生了更改則會同步topic所在的broker地址列表,即內部的brokerAddrTable屬性;接着同步produer關注的topic路由信息,即producerTable屬性;接着同步consumer訂閱的topic路由信息,即consumerTable屬性;最后更新本地topic信息,即topicRouteTable屬性。
4.4.3. MQClientInstance.sendHeartbeatToAllBrokerWithLock
該方法會遍歷MQClient所持有的各個producer和consumer,將客戶端信息構造為HeartbeatData對象,然后調用MQClientAPIImpl的sendHearbeat方法,向所有的broker上報心跳數據。心跳內容包括:
- Consumer:所有Consumer的Group、消費類型、消息模式、消費起始offset、訂閱消息的篩選類型等
- Producer:所有Producer的group
4.4.4. MQClientInstance.persistAllConsumerOffset
該方法會遍歷consumerTable里的所有MQConsumer對象,獲取每個隊列處理的MessageQueue,然后調用OffsetStore持久化所有的MessageQueue。OffsetStore后面會專門進行講解。
4.4.5. MQClientInstance.adjustThreadPool
該方法主要是動態調整DefaultMQPushConsumerImpl(推模式)客戶端消費線程池的大小。前面說過推模式是通過包裝拉模式來實現的,內部都依賴PullAPIWrapper。實現上推模式多了一個ConsumeMessageService定時使用拉模式消費消息,該實現需要一個線程池,adjustThreadPool就是動態調整該線程池的大小。關於客戶端消費消息的過程,后面也會專門進行講解。
4.5. PullMessageService.start
PullMessageService用於封裝拉模式以實現推模式。它會循環從內部的LinkedBlockingQueue<PullRequest>中拿出PullRequest對象(消費q消息封裝的對象),選取一個可用的客戶端實例DefaultMQPushConsumerImpl,調用其pullMessage方法.該方法會判斷消費進度,決定是立即消費還是延遲消費,如果是延遲消費則再放回LinkedBlockingQueue中等待消費;如果是直接消費,則調用PullMessageService(拉模式)的executePullRequestImmediately消費消息.
PullMessageService的基礎關系如下:
PullMessageService.start內部主要是啟動線程,該線程會循環執行執行任務,具體實現會在后續介紹消息消費的時候提及。
4.6. RebalanceService.start
該方法用於啟動rebalance任務。RebalanceService同PullMessageService相同,都繼承自ServiceThread類,,並實現了run方法。RebalanceService在run方法中等待一定時間(默認20S,可以通過rocketmq.client.rebalance.waitInterval配置具體時間)后會調用MQClientInstance.doRebalance執行具體的動作。具體實現會在后續介紹rebalance實現的時候提及。
4.7. DefaultMQPushConsumerImpl.start
在上面2.
時有提及該流程,這里的DefaultMQPushConsumerImpl對象是Group為CLIENT_INNER_PRODUCER
的內部對象。
客戶端的啟動過程就如上面介紹,下面附上該部分當時源碼閱讀過程做的筆記簡圖,該圖描述了客戶端啟動過程的大致過程:
更多原創內容請搜索微信公眾號:啊駝(doubaotaizi)