RocketMQ源碼 — 二、 NameServer


NameServer

作用:Producer和Consumer獲取Broker的地址
目的:解耦Broker和Producer、Consumer
原理:使用netty作為通信工具,監聽指定端口,如果是broker注冊,將broker的信息保存在內存中並保存到文件中,producer和consumer獲取broker地址的請求

RocketMQ包含的組件

  • NameServer:單點,供Producer和Consumer獲取Broker地址
  • Producer:產生並發送消息
  • Consumer:接受並消費消息
  • Broker:消息暫存,消息轉發

NamesrvController包含的組件

  • namesrvConfig:nameServer的配置
  • nettyServerConfig:NameServer的netty配置
  • remotingServer:NameServer 的netty服務器
  • scheduledExecutorService:routeInfoManager和kvCOnfigManager使用的定時線程池
  • remotingExecutor:netty使用的線程池
  • brokerHosekeppingService:
  • kvConfigManager:kv配置管理
  • routeInfoManager:包含broker的ip和對應的隊列信息,說明producer可以往哪一個broker發送消息,consumer從哪一個broker pull消息

NameServer啟動

NamesrvStartup.main0

  1. NettySystemConfig配置
  2. 解析命令行參數,NettyServerConfig配置
  3. logback配置
  4. NamesrvController初始化,initialize
  5. 注冊shutdown鈎子
  6. NamesrvController.start

NamesrvController.initialize

  1. KVConfigManager.load加載原來的key-value文件到內存中
  2. 初始化NettyRemotingServer
  3. 注冊requestProcessor,默認為DefaultRequestProcessor,用來處理netty接收到的信息
  4. 啟動定時線程,每隔10s判斷broker是否依然存活
  5. 啟動定時線程,每隔10min打印出所有k-v

NamesrvController.start

啟動netty server,使用NettyServerHandler進行read和write,最終調用到DefaultRequestProcessor.processRequest

NameServer處理信息

netty收到的所有消息都是DefaultRequestProcessor.processRequest處理的,根據不同的RequestCode執行不同的操作

public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
    if (log.isDebugEnabled()) {
        log.debug("receive request, {} {} {}",//
                request.getCode(), //
                RemotingHelper.parseChannelRemoteAddr(ctx.channel()), //
                request);
    }

    switch (request.getCode()) {
        case RequestCode.PUT_KV_CONFIG:
            return this.putKVConfig(ctx, request);
        case RequestCode.GET_KV_CONFIG:
            return this.getKVConfig(ctx, request);
        case RequestCode.DELETE_KV_CONFIG:
            return this.deleteKVConfig(ctx, request);
        case RequestCode.REGISTER_BROKER:	// 注冊borker信息
            Version brokerVersion = MQVersion.value2Version(request.getVersion());
            if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
                return this.registerBrokerWithFilterServer(ctx, request);
            }
            else {
                return this.registerBroker(ctx, request);
            }
        case RequestCode.UNREGISTER_BROKER:		// 取消注冊broker
            return this.unregisterBroker(ctx, request);
        case RequestCode.GET_ROUTEINTO_BY_TOPIC:
            return this.getRouteInfoByTopic(ctx, request);		// 根據topic獲取路由信息,在producer發送消息和consumer在pull消息的時候的時候會從nameServer 中獲取
        case RequestCode.GET_BROKER_CLUSTER_INFO:
            return this.getBrokerClusterInfo(ctx, request);
        case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
            return this.wipeWritePermOfBroker(ctx, request);
        case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
            return getAllTopicListFromNameserver(ctx, request);
        case RequestCode.DELETE_TOPIC_IN_NAMESRV:
            return deleteTopicInNamesrv(ctx, request);
        case RequestCode.GET_KVLIST_BY_NAMESPACE:
            return this.getKVListByNamespace(ctx, request);
        case RequestCode.GET_TOPICS_BY_CLUSTER:
            return this.getTopicsByCluster(ctx, request);
        case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
            return this.getSystemTopicListFromNs(ctx, request);
        case RequestCode.GET_UNIT_TOPIC_LIST:
            return this.getUnitTopicList(ctx, request);
        case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
            return this.getHasUnitSubTopicList(ctx, request);
        case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
            return this.getHasUnitSubUnUnitTopicList(ctx, request);
        default:
            break;
    }
    return null;
}

這里主要看注冊broker,依次調用DefaultRequestProcessor.registerBroker->RouteInfoManager.registerBroker;
進行的主要操作是解析request,依次填充以下集合,供producer和consumer獲取

// 某一個topic對應的邏輯隊列
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
// 每個broker 對應的信息
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
// 某一個集群下對應的所有broker
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// 某一個broker對應的filter
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM