Broker 概述、啟動和停止流程
Broker 是 RocketMQ 體系中核心組件之一,存儲是 Broker 的核心功能之一,決定整個 RocketMQ 體系的吞吐性能、可靠性和可用性。
一、Broker 概述
1.1 什么是 Broker
Broker 是 RocketMQ 中核心的模塊之一,主要負責處理各種 TCP 請求(計算)和存儲消息(存儲),在各個組件中的角色:
Broker 分為 Master 和 Slave。Master 主要提供服務,Slave 在 Master 宕機后提供消費服務。
1.2 Broker 存儲目錄結構
commitlog:這是一個目錄,其中包含具體的 commitlog 文件。文件名長度為 20 個字符,文件名由該文件保存消息的最大物理機 offset 值在高危補全 0 組成。每個文件大小一般是 1 GB,可以通過 mapedFileSizeCommitLog 進行配置。
consumequeue:這是一個目錄,包含該 Broker 上所有的 Topic 對應的消費隊列文件信息。消費隊列文件的格式為 "consumequeue/Topic 名字/queue id/具體消費隊列文件"。每個消費隊列其實是 commitlog 的一個索引,提供給消費者做拉取消息、更新位點使用。
Index:這是一個目錄,全部的文件都是按照消息 key 創建的 Hash 索引。文件名是用創建的時間戳命名的。
Config:這是一個目錄,保存了當前 Broker 中全部的 Topic、訂閱關系和消費進度。這些數據 Broker 會定時從內存持久化到磁盤,以便宕機后恢復。
abort:Broker 是否異常關閉的標志。正常關閉時候該文件被刪除,異常關閉時則不會。當 Broker 重新啟動時,根據是否異常宕機決定是否需要重新構建 Index 索引等操作。
checkpoint:Broker 最近一次正常運行時的狀態,比如最后一次正常刷盤的時間、最后一次正確索引的時間。
1.3 Broker 啟動和停止流程
啟動命令分為兩個腳本:bin/mqbroker 和 bin/runbroker.sh。mqbroker 准備了 RocketMQ 啟動本身的環境數據,比如 ROCKETMQ_HOME 環境變量。runbroker.sh 主要設置了 JVM 啟動參數,比如 JAVA_HOME、Xms、Xmx。
os.sh 是 RocketMQ 開發人員認為適合的系統調優參數。sh os.sh執行即可設置。啟動 bin/mqbroker 的時候,需要按照自己環境的內存大小,在 bin/runbroker.sh 設置合適的內存大小。
BrokerStartup.java 類主要負責為真正的啟動過程做准備,解析腳本傳過來的參數,初始化 Broker 配置,創建 BrokerController 實例等工作。
BrokerController.java 類是 Broker 的掌控者,它管理和控制 Broker 的各個模塊,包含通信模塊、存儲模塊、索引模塊、定時任務等。在 BrokerController 全部模塊初始化並啟動成功后,將在日志中輸出 info信息"boot success"。
第一步:初始化啟動環境。由 bin/mqbroker 和 bin/runbroker.sh 兩個腳本來完成的。bin/mqbroker 腳本主要用於設置 RocketMQ 根目錄環境變量,調用 bin/runbroker.sh 進入 RocketMQ 的啟動入口,代碼路徑:rocketmq/bin/mqbroker,代碼如下:
1 # Unless required by applicable law or agreed to in writing, software 2 # distributed under the License is distributed on an "AS IS" BASIS, 3 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 4 # See the License for the specific language governing permissions and 5 # limitations under the License. 6 7 if [ -z "$ROCKETMQ_HOME" ] ; then 8 ## resolve links - $0 may be a link to maven's home 9 PRG="$0" 10 11 # need this for relative symlinks 12 while [ -h "$PRG" ] ; do 13 ls=`ls -ld "$PRG"` 14 link=`expr "$ls" : '.*-> \(.*\)$'` 15 if expr "$link" : '/.*' > /dev/null; then 16 PRG="$link" 17 else 18 PRG="`dirname "$PRG"`/$link" 19 fi 20 done 21 22 saveddir=`pwd` 23 24 ROCKETMQ_HOME=`dirname "$PRG"`/.. 25 26 # make it fully qualified 27 ROCKETMQ_HOME=`cd "$ROCKETMQ_HOME" && pwd` 28 29 cd "$saveddir" 30 fi 31 32 export ROCKETMQ_HOME 33 echo ${ROCKETMQ_HOME} 34 sh ${ROCKETMQ_HOME}/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup $@ 35
bin/runbroker.sh 腳本的主要功能是檢測 JDK 的環境配置和 JVM 的參數配置。JDK 的環境配置的檢查邏輯代碼路徑:bin/runbroker.sh,具體的實現代碼如下:
1 [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java 2 [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java 3 [ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!" 4 5 export JAVA_HOME 6 export JAVA="$JAVA_HOME/bin/java" 7 export BASE_DIR=$(dirname $0)/.. 8 export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH}
注意 bin/runbroker.sh 中 JVM 的參數設置,通常,-Xms、-Xmx、-Xmn 和 -XX:MaxDirectMemorySize 這 4 個參數會隨着 RocketMQ 服務器的物理機內存大小的變化而進行相應的改變。
第二步:初始化 BrokerController。
該初始化主要包含 RocketMQ 啟動命令行參數解析、Broker 各個模塊配置參數解析、Broker 各個模塊初始化、進程關機 Hook 初始化等過程。
RocketMQ 啟動命令行參數解析。
brokerConfig、nettyServerConfig、nettyClientConfig、messageStoreConfig 這些基本配置對象初始化完畢后,還有后續代碼依據各種啟動條件重新調整部分參數。
在各個配置對象初始化完畢后,程序會調用 BrokerController.initialize()方法對 Broker 的各個模塊進行初始化。
首先,加載 Broker 基礎數據配置和存儲層服務,代碼路徑:D:\rocketmq-master\broker\src\main\java\org\apache\rocketmq\broker\BrokerController.java,具體代碼如下:
1 public boolean initialize() throws CloneNotSupportedException { 2 boolean result = this.topicConfigManager.load(); 3 4 result = result && this.consumerOffsetManager.load(); 5 result = result && this.subscriptionGroupManager.load(); 6 result = result && this.consumerFilterManager.load(); 7 8 if (result) { 9 try { 10 this.messageStore = 11 new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, 12 this.brokerConfig); 13 if (messageStoreConfig.isEnableDLegerCommitLog()) { 14 DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore); 15 ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler); 16 } 17 this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore); 18 //load plugin 19 MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig); 20 this.messageStore = MessageStoreFactory.build(context, this.messageStore); 21 this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager)); 22 } catch (IOException e) { 23 result = false; 24 log.error("Failed to initialize", e); 25 } 26 } 27 28 result = result && this.messageStore.load(); #加載 Broker 基礎數據配置,包含 Broker 中的 Topic、消費位點、訂閱關系、消費過濾(無實際數據需要加載)
之后,初始化存儲層服務對象 messageStore 和 Broker 監控統計對象 brokerStats。
然后,Broker 會初始化通信層服務和一系列的定時任務。通信層服務主要初始化正常通信通道、VIP通信通道和通信線程池。由於代碼太多,並且大多數邏輯類似,所以這里以 VIP 通道為例,講解通信層服務初始化;以消費進度定時持久化為例,講解定時任務初始化。
在 Broker 中 VIP 通道通信端口 10909,與正常通信端口 10911 相差2,10909 和 10911 這兩個端口有什么關系呢?VIP 通道又如何初始化的呢?
1 if (result) { 2 this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService); 3 = (NettyServerConfig) this.nettyServerConfig.clone(); 4 fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2); 5 this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService); 6 this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor( 7 this.brokerConfig.getSendMessageThreadPoolNums(), 8 this.brokerConfig.getSendMessageThreadPoolNums(), 9 1000 * 60, 10 TimeUnit.MILLISECONDS, 11 this.sendThreadPoolQueue, 12 new ThreadFactoryImpl("SendMessageThread_"));
fastConfig 就是 VIP 通信層的配置,其配置對象 "克隆" 自正常通信的配置對象,唯獨通信端口是 nettyServerConfig.getListenPort() - 2,也就是 10911 - 2。利用 fastConfig 初始化 fastRemotingServer 的結果也就是我們常用的 VIP 通道。
RocketMQ 的通信層實現本質上是基於 Netty 的,那么通信層又是如何處理客戶端發送的 Netty 請求的呢?
通信層對象初始化完成后,會調用 this.registerProcessor() 方法,這里將正常的通信層對象和 VIP 通道的通信層對象與各個請求處理器進行關聯,比如將發送消息的請求交給接收消息的請求處理器進行處理。
在 VIP 通信層初始化有了基本料及哦吼,開始消費進度定時持久化。
Broker 在接收消費者上報的消費進度后,會定期持久化到物理機文件中,當消費者因為重新發布或者宕機而重啟時,能從消費進度中得知恢復,不至於重復消費。
第三步:啟動 RocketMQ的各個組件。
組件啟動代碼路徑:D:\rocketmq-master\broker\src\main\java\org\apache\rocketmq\broker\BrokerController.java 中 start() 方法,由於啟動過程非常復雜,這里按照代碼執行順序講解各個模塊功能。
this.messageStore:存儲層服務,比如 CommitLog、ConsumeQueue 存儲管理。
this.remotingServer:普通通道請求處理服務。一般的請求都是在這里被處理的。
this.fastRemotingServer:VIP 通道請求處理服務。如果普通通道比較忙,那么可以使用 VIP 通道,一般作為客戶端降級使用。
this.brokerOuterAPI:Broker 訪問對外接口的封裝對象。
this.pullRequestHoldService:Pull 長輪詢服務。
this.clientHousekeepingService:清理心跳超時的生產者、消費者、過濾服務器。
this.filterServerManager:過濾服務器管理。
this.brokerStatsManager:Broker 監控數據統計管理。
this.brokerFastFailure:Broker 快速失敗處理。
Broker 啟動進程后,關閉 Broker 進程其實是啟動過程的逆過程,這里不再贅述。
Broker 關閉只是調用 BrokerStartup.java 中注冊 JVM Hook 的 BrokerController.shutdown()方法,該方法再調用各個模塊關閉方法,最后關閉整個進程。Broker 進程關閉處理完成后,日志輸出 info 信息 "Shutdown hook over"