對於一個生產者來說,在進行sendmessage的時候,需要知道這個topic應該發給哪個broker。如果沒有路由信息的話,需要取注冊中心,通過GET_ROUTEINTO_BY_TOPIC去注冊中心拿到消息。
介紹下面具體流程之前,還是先介紹注冊額中心里面路由管理者RouteInfoManager:
:
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
broker-name:可以有多個broker-id,broker-id為0的就是master,否則是slave
clusterAddrTable:多個broker-name可以放在同一個cluster下面
topicqueueTable:一個topic下面可能有多個broker對應,QueueData里面存放每個broker-name的屬性。所以一個topic下面可能有多個broker-name在貢獻。
public class QueueData implements Comparable<QueueData> { private String brokerName; private int readQueueNums; private int writeQueueNums; private int perm; private int topicSynFlag;
brokerAddrTable:一個broker-name對應的信息,QueueData是真正描述一個broker-name的屬性,BrokerData描述的是這個broker的上下級關系,上級cluster是誰,下級brokerAddrs描述這個broker-name下面所有broker-id對應的ip地址。
public class BrokerData implements Comparable<BrokerData> { private String cluster; private String brokerName; private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
RouteInfoManager主要存儲的信息就是這么多。這些消息都是在一個broker啟動的時候,都會到注冊中心注冊broker,在注冊的時候把RouteInfoManager里面的信息進行填充。同時如果有變化的時候RouteInfoManager里面的數據也會跟着刷新,QueueData和BrokerData的equal方法都被覆蓋了,這里面的屬性任何一個有變化都會被認為有變化,然后被更新。
對於備機來說,注冊完成以后,還可以從注冊中心拿到主機的haServer-addr\haServer-port地址,也就是主機broker的這個地址:
public String getHAServerAddr() { return this.brokerConfig.getBrokerIP2() + ":" + this.messageStoreConfig.getHaListenPort(); }
拿到這個地址以后才能啟動HaService里面的Haclient,上報自己的ack-offset,然后拿到同步數據。
對於一個生產者來說,在進行sendmessage的時候,需要知道這個topic應該發給哪個broker。如果沒有路由信息的話,需要取注冊中心,通過GET_ROUTEINTO_BY_TOPIC去注冊中心拿到消息。
所謂的路由信息其實就是這個數據結構:
public class TopicRouteData extends RemotingSerializable { private String orderTopicConf; private List<QueueData> queueDatas; private List<BrokerData> brokerDatas; private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
具體拿的方法在pickupTopicRouteData里面通過topic可以從RouteInfoManager拿到
拿到以后需要更新發布信息和訂閱信息,其中發布信息就是針對生產者來說的,具體更新就是:(route,就是TopicRouteData)
List<QueueData> qds = route.getQueueDatas(); Collections.sort(qds); for (QueueData qd : qds) { if (PermName.isWriteable(qd.getPerm())) { BrokerData brokerData = null; for (BrokerData bd : route.getBrokerDatas()) { if (bd.getBrokerName().equals(qd.getBrokerName())) { brokerData = bd; break; } } if (null == brokerData) { continue; } if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) { continue; } for (int i = 0; i < qd.getWriteQueueNums(); i++) { MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i); info.getMessageQueueList().add(mq); } }
public MessageQueue(String topic, String brokerName, int queueId) { this.topic = topic; this.brokerName = brokerName; this.queueId = queueId; }
也就是一個topic可能對應多個broker-name,同時每個broker-name也有多個QueueId,這個Queueid個數是getWriteQueueNums決定。
這里的info就是publishInfo,會被塞入this.topicPublishInfoTable.put(topic, info)中保存下來。
重點來了,這個有啥用?
后面生產者在發送消息的時候,需要有一個broker-addr,畢竟要知道一個broker地址,為了拿到這個broker-addr,其實是在這里拿到的:
private SendResult sendKernelImpl(final Message msg, final MessageQueue mq, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
也就是broker-addr是MessageQueue給出來的,這個MessageQueue怎么取得?
public MessageQueue selectOneMessageQueue() { int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; return this.messageQueueList.get(pos); }
也就是每次從前面提的info.getMessageQueueList().add(mq)這個里面每次取一個messageQueue(第一次隨機,后面每次加1),這個MessageQueue就是針對同一個broker-name多個QueueId。
在實際發送的時候,拿到一個MessageQueue就會直接發出去。對於一個topic有多個broker-name的情況,這種做法只是給一個broker-name的master發了信息,其他broker-name沒有發送。這個就是topic層面的分片,不同broker分攤相同topic下的不同內容,而同一個broker通過主備完成信息冗余。而QueueId就是broker層面的再次分片。對於多個消費者的情況,消費相同broker-name的時候,可以根據queue-id並發消費。
