新增topic是需要客戶端直接通知broker完成的:
通過createAndUpdateTopicConfig方法 發送給broker以后,在AdminBrokerProcessor里面負責處理這個類型消息:
private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final CreateTopicRequestHeader requestHeader =
(CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
log.info("updateAndCreateTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
if (requestHeader.getTopic().equals(this.brokerController.getBrokerConfig().getBrokerClusterName())) {
String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
log.warn(errorMsg);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorMsg);
return response;
}
try {
response.setCode(ResponseCode.SUCCESS);
response.setOpaque(request.getOpaque());
response.markResponseType();
response.setRemark(null);
ctx.writeAndFlush(response);
} catch (Exception e) {
log.error("Failed to produce a proper response", e);
}
TopicConfig topicConfig = new TopicConfig(requestHeader.getTopic());
topicConfig.setReadQueueNums(requestHeader.getReadQueueNums());
topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums());
topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum());
topicConfig.setPerm(requestHeader.getPerm());
topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag());
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
this.brokerController.registerIncrementBrokerData(topicConfig,this.brokerController.getTopicConfigManager().getDataVersion());
return null;
在updateTopicConfig方法:
public void updateTopicConfig(final TopicConfig topicConfig) {
TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
if (old != null) {
log.info("update topic config, old:[{}] new:[{}]", old, topicConfig);
} else {
log.info("create new topic [{}]", topicConfig);
}
this.dataVersion.nextVersion();
this.persist();
}
TopicConfig是復寫了equal方法的,所以可以明確知道一個topci是否真的需要改變,如果需要,那么更改版本、持久化。
在后面的registerIncrementBrokerData方法中還要進一步注冊到broker里面。
注冊中心是不會持久化topic的,都是broker負責持久化,啟動的時候注冊到nameserver,除此之外還有定時任務定期注冊到注冊中心。
