NameServer
作用:Producer和Consumer獲取Broker的地址
目的:解耦Broker和Producer、Consumer
原理:使用netty作為通信工具,監聽指定端口,如果是broker注冊,將broker的信息保存在內存中並保存到文件中,producer和consumer獲取broker地址的請求
RocketMQ包含的組件
- NameServer:單點,供Producer和Consumer獲取Broker地址
- Producer:產生並發送消息
- Consumer:接受並消費消息
- Broker:消息暫存,消息轉發
NamesrvController包含的組件
- namesrvConfig:nameServer的配置
- nettyServerConfig:NameServer的netty配置
- remotingServer:NameServer 的netty服務器
- scheduledExecutorService:routeInfoManager和kvCOnfigManager使用的定時線程池
- remotingExecutor:netty使用的線程池
- brokerHosekeppingService:
- kvConfigManager:kv配置管理
- routeInfoManager:包含broker的ip和對應的隊列信息,說明producer可以往哪一個broker發送消息,consumer從哪一個broker pull消息
NameServer啟動
NamesrvStartup.main0
- NettySystemConfig配置
- 解析命令行參數,NettyServerConfig配置
- logback配置
- NamesrvController初始化,initialize
- 注冊shutdown鈎子
- NamesrvController.start
NamesrvController.initialize
- KVConfigManager.load加載原來的key-value文件到內存中
- 初始化NettyRemotingServer
- 注冊requestProcessor,默認為DefaultRequestProcessor,用來處理netty接收到的信息
- 啟動定時線程,每隔10s判斷broker是否依然存活
- 啟動定時線程,每隔10min打印出所有k-v
NamesrvController.start
啟動netty server,使用NettyServerHandler進行read和write,最終調用到DefaultRequestProcessor.processRequest
NameServer處理信息
netty收到的所有消息都是DefaultRequestProcessor.processRequest處理的,根據不同的RequestCode執行不同的操作
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
if (log.isDebugEnabled()) {
log.debug("receive request, {} {} {}",//
request.getCode(), //
RemotingHelper.parseChannelRemoteAddr(ctx.channel()), //
request);
}
switch (request.getCode()) {
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.REGISTER_BROKER: // 注冊borker信息
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
return this.registerBrokerWithFilterServer(ctx, request);
}
else {
return this.registerBroker(ctx, request);
}
case RequestCode.UNREGISTER_BROKER: // 取消注冊broker
return this.unregisterBroker(ctx, request);
case RequestCode.GET_ROUTEINTO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request); // 根據topic獲取路由信息,在producer發送消息和consumer在pull消息的時候的時候會從nameServer 中獲取
case RequestCode.GET_BROKER_CLUSTER_INFO:
return this.getBrokerClusterInfo(ctx, request);
case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
return this.wipeWritePermOfBroker(ctx, request);
case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
return getAllTopicListFromNameserver(ctx, request);
case RequestCode.DELETE_TOPIC_IN_NAMESRV:
return deleteTopicInNamesrv(ctx, request);
case RequestCode.GET_KVLIST_BY_NAMESPACE:
return this.getKVListByNamespace(ctx, request);
case RequestCode.GET_TOPICS_BY_CLUSTER:
return this.getTopicsByCluster(ctx, request);
case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
return this.getSystemTopicListFromNs(ctx, request);
case RequestCode.GET_UNIT_TOPIC_LIST:
return this.getUnitTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
return this.getHasUnitSubTopicList(ctx, request);
case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
return this.getHasUnitSubUnUnitTopicList(ctx, request);
default:
break;
}
return null;
}
這里主要看注冊broker,依次調用DefaultRequestProcessor.registerBroker->RouteInfoManager.registerBroker;
進行的主要操作是解析request,依次填充以下集合,供producer和consumer獲取
// 某一個topic對應的邏輯隊列
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
// 每個broker 對應的信息
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
// 某一個集群下對應的所有broker
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// 某一個broker對應的filter
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
