rocketmq的message里面的路由信息是啥


 

 

對於一個生產者來說,在進行sendmessage的時候,需要知道這個topic應該發給哪個broker。如果沒有路由信息的話,需要取注冊中心,通過GET_ROUTEINTO_BY_TOPIC去注冊中心拿到消息。

介紹下面具體流程之前,還是先介紹注冊額中心里面路由管理者RouteInfoManager:

 

  private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
 
        

broker-name:可以有多個broker-id,broker-id為0的就是master,否則是slave

clusterAddrTable:多個broker-name可以放在同一個cluster下面

topicqueueTable:一個topic下面可能有多個broker對應,QueueData里面存放每個broker-name的屬性。所以一個topic下面可能有多個broker-name在貢獻。

 

public class QueueData implements Comparable<QueueData> {
    private String brokerName;
    private int readQueueNums;
    private int writeQueueNums;
    private int perm;
    private int topicSynFlag;

 
        

brokerAddrTable:一個broker-name對應的信息,QueueData是真正描述一個broker-name的屬性,BrokerData描述的是這個broker的上下級關系,上級cluster是誰,下級brokerAddrs描述這個broker-name下面所有broker-id對應的ip地址。

public class BrokerData implements Comparable<BrokerData> {
    private String cluster;
    private String brokerName;
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;

 

RouteInfoManager主要存儲的信息就是這么多。這些消息都是在一個broker啟動的時候,都會到注冊中心注冊broker,在注冊的時候把RouteInfoManager里面的信息進行填充。同時如果有變化的時候RouteInfoManager里面的數據也會跟着刷新,QueueData和BrokerData的equal方法都被覆蓋了,這里面的屬性任何一個有變化都會被認為有變化,然后被更新。

對於備機來說,注冊完成以后,還可以從注冊中心拿到主機的haServer-addr\haServer-port地址,也就是主機broker的這個地址:

 

    public String getHAServerAddr() {
        return this.brokerConfig.getBrokerIP2() + ":" + this.messageStoreConfig.getHaListenPort();
    }

 

拿到這個地址以后才能啟動HaService里面的Haclient,上報自己的ack-offset,然后拿到同步數據。

 

對於一個生產者來說,在進行sendmessage的時候,需要知道這個topic應該發給哪個broker。如果沒有路由信息的話,需要取注冊中心,通過GET_ROUTEINTO_BY_TOPIC去注冊中心拿到消息。

所謂的路由信息其實就是這個數據結構:

 

public class TopicRouteData extends RemotingSerializable {
    private String orderTopicConf;
    private List<QueueData> queueDatas;
    private List<BrokerData> brokerDatas;
    private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

具體拿的方法在pickupTopicRouteData里面通過topic可以從RouteInfoManager拿到

拿到以后需要更新發布信息和訂閱信息,其中發布信息就是針對生產者來說的,具體更新就是:(route,就是TopicRouteData)

 

 

   List<QueueData> qds = route.getQueueDatas();
            Collections.sort(qds);
            for (QueueData qd : qds) {
                if (PermName.isWriteable(qd.getPerm())) {
                    BrokerData brokerData = null;
                    for (BrokerData bd : route.getBrokerDatas()) {
                        if (bd.getBrokerName().equals(qd.getBrokerName())) {
                            brokerData = bd;
                            break;
                        }
                    }

                    if (null == brokerData) {
                        continue;
                    }

                    if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
                        continue;
                    }

                    for (int i = 0; i < qd.getWriteQueueNums(); i++) {
                        MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                        info.getMessageQueueList().add(mq);
                    }
                }

 

 public MessageQueue(String topic, String brokerName, int queueId) {
        this.topic = topic;
        this.brokerName = brokerName;
        this.queueId = queueId;
    }

 

也就是一個topic可能對應多個broker-name,同時每個broker-name也有多個QueueId,這個Queueid個數是getWriteQueueNums決定。

這里的info就是publishInfo,會被塞入this.topicPublishInfoTable.put(topic, info)中保存下來。

 

 

重點來了,這個有啥用?

后面生產者在發送消息的時候,需要有一個broker-addr,畢竟要知道一個broker地址,為了拿到這個broker-addr,其實是在這里拿到的:

 

private SendResult sendKernelImpl(final Message msg,
                                      final MessageQueue mq,
                                      final CommunicationMode communicationMode,
                                      final SendCallback sendCallback,
                                      final TopicPublishInfo topicPublishInfo,
                                      final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());

 

也就是broker-addr是MessageQueue給出來的,這個MessageQueue怎么取得?

public MessageQueue selectOneMessageQueue() {
        int index = this.sendWhichQueue.getAndIncrement();
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        return this.messageQueueList.get(pos);
    }

也就是每次從前面提的info.getMessageQueueList().add(mq)這個里面每次取一個messageQueue(第一次隨機,后面每次加1),這個MessageQueue就是針對同一個broker-name多個QueueId。

在實際發送的時候,拿到一個MessageQueue就會直接發出去。對於一個topic有多個broker-name的情況,這種做法只是給一個broker-name的master發了信息,其他broker-name沒有發送。這個就是topic層面的分片,不同broker分攤相同topic下的不同內容,而同一個broker通過主備完成信息冗余。而QueueId就是broker層面的再次分片。對於多個消費者的情況,消費相同broker-name的時候,可以根據queue-id並發消費。

 

 
       


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM