RocketMq producer 發送一條消息所經過的流程


 前言:

  RocketMq producer 在發送一條消息時候,從 producer --nameSrv -- Broker  中間經過了什么樣子的數據交互

開始:

如下是 Producer 發送消息的一個demo例子:

    //1. 初始化 mq producer
        DefaultMQProducer mqProducer =new DefaultMQProducer("iscys-test");
        //2.設置nameServer 地址
        mqProducer.setNamesrvAddr("localhost:9876");
        //3. 開啟mq producer,這一步是必須的,會做一些連接初始化檢測工作
        mqProducer.start();
        //4.創建 Message
        Message msg = new Message("test-topis", "iscys-test".getBytes());
        //5.發送消息,設置回調,消息發送成功會回調函數
        mqProducer.send(msg, new SendCallback() {

            @Override
            public void onSuccess(SendResult sendResult) {
                //在消息發送成功之后,我們收到broker的響應通知后,會進行回調
                System.out.println("send success");
            }

            @Override
            public void onException(Throwable e) {
                System.out.println("send fail");

            }
        });

構建發送消息:

 public void send(Message msg, SendCallback sendCallback, long timeout)
        throws MQClientException, RemotingException, InterruptedException {
        try {
            //默認異步發送,超時3s
            this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout);
        } catch (MQBrokerException e) {
            throw new MQClientException("unknownn exception", e);
        }
    }

從NameSrv 中獲取topic 配置的相關信息,比如 broker 地址,隊列數 之類的。

 private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.makeSureStateOK();
        Validators.checkMessage(msg, this.defaultMQProducer);

        final long invokeID = random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        //1.嘗試取獲取從NameSrv 中獲取topic 相關信息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                //2.選擇一個消息隊列,默認為4個,在創建新的Topic時候
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
                        //3.發送消息
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:

主要看一下如上代碼第一步 嘗試獲取Topic 信息  tryToFindTopicPublishInfo:

    

   1. 會先從 topicPublishInfoTable 緩存中獲取topic 配置信息

   2.緩存沒有,就從NameSrv 中拉取。

   3.如果獲取到了,則返回。

   4.NameSrv 沒有得到相關到topic 信息,說明是新到topic ,則就請求獲取TBW102 topic 配置信息,這個肯定能獲取到,封裝使用TBW102的配置。

 

    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        //1.從 topicPublishInfoTable 從嘗試從Map中獲取,如果沒有獲取到,請求NameSrv
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            //2.從NameSrv 中拉取topic 信息
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }
       
        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            //3.說明獲取到TOPIC 的信息
            return topicPublishInfo;
        } else {
            //4.如果第2步執行后 NameSrv 中沒有topic 信息,獲取默認的TBW102 topic 的信息,這個是肯定能獲取到的
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }

 

請求NameSrv 非默認的topic

 public boolean updateTopicRouteInfoFromNameServer(final String topic) {
        //1.非默認的topic ,默認Topic 為TBW102
        return updateTopicRouteInfoFromNameServer(topic, false, null);
    }

執行從NameSrv 獲取topic 請求:

    1. 從NameSrv 中獲取到 TBW102 的topic 信息,這個一般都是有的。

    2. 新的topic 會從NameSrv 中獲取信息,如果不存在,返回false。

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
        try {
            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    TopicRouteData topicRouteData;
                    if (isDefault && defaultMQProducer != null) {
                        // 1.如果請求的是默認的Topic 請求會走到這里
                        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                            1000 * 3);
                        if (topicRouteData != null) {
                            for (QueueData data : topicRouteData.getQueueDatas()) {
                                int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                                data.setReadQueueNums(queueNums);
                                data.setWriteQueueNums(queueNums);
                            }
                        }
                    } else {
                        // 2.新的Topic 會先從NameSrv 中獲取一遍,如果NameSrv 中沒有獲取到,會拋出異常
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    } 

 從NameSrv 獲取到 TopicRouteData 進行如下表的緩存

          brokerAddressTable  --- 從TopicRouteData 得到broker 信息,進行brokerAddressTable 存儲

          TopicPublishInfoTable --TopicRouteData 轉換為topicPushInfo 存儲到 topicPublishInfoTable表中

          topicRouteTable  ---存儲namesrv 得到的 TopicRouteData  值 

 if (topicRouteData != null) {
                        //1.與緩存做比較,查看topic 相關信息是否有改變
                        TopicRouteData old = this.topicRouteTable.get(topic);
                        boolean changed = topicRouteDataIsChange(old, topicRouteData);
                        if (!changed) {
                            changed = this.isNeedUpdateTopicRouteInfo(topic);
                        } else {
                            log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
                        }

                        if (changed) {
                            TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
                            //2.broker 信息進行brokerAddrTable 表存儲
                            for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                //緩存
                                this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                            }

                            // Update Pub info
                            {
                                //3.將topicRouteData 轉換為TopicPublishInfo ,用於producer
                                TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                                //設置為true 標記這個publishInfo是可用的
                                publishInfo.setHaveTopicRouterInfo(true);
                                Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Entry<String, MQProducerInner> entry = it.next();
                                    MQProducerInner impl = entry.getValue();
                                    if (impl != null) {
                                        //更新producer  topicPublishInfoTable表信息
                                        impl.updateTopicPublishInfo(topic, publishInfo);
                                    }
                                }
                            }
                                //4.consumerTable 維護用戶Consumer
                            // Update sub info
                            {
                                Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                                Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Entry<String, MQConsumerInner> entry = it.next();
                                    MQConsumerInner impl = entry.getValue();
                                    if (impl != null) {
                                        impl.updateTopicSubscribeInfo(topic, subscribeInfo);
                                    }
                                }
                            }
                            log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
                            //5.topicRouteTable 表存儲原始信息,用於定時任務心跳等 
                            this.topicRouteTable.put(topic, cloneTopicRouteData);
                            return true;

獲取到topic信息后封裝成 TopicPublishInfo:

public class TopicPublishInfo {
    private boolean orderTopic = false;
    //用來檢測Topic 在Broker 真實存在的,不存在false
    private boolean haveTopicRouterInfo = false;
    //消息隊列的
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    //請求NameSrv 返回的TOPIC 具體信息 
    private TopicRouteData topicRouteData;

 之后就是消息的發送。

 

再看producer 啟動:

  說完消息發送,我們再看producer 的啟動初始化:

     1.檢查配置信息

     2.初始化MQClientInstance

     3.將producer 信息注冊到MQClientInstance維護的producerTable 表中

     4. topicPublishInfoTable 放入默認的topic

     5.啟動MQClientInstance

     6.producer 狀態修改為運行狀態

     7.發送心跳向broker

public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                //1.檢查配置
                this.checkConfig();

                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    this.defaultMQProducer.changeInstanceNameToPID();
                }
                //2.初始化 MQClientInstance producer 交互的重要信息組件
                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
                // 3.producerGroup 注冊到 producerTable 表中 k= producerName v = this
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
                //4.topicPublishInfoTable 表中默認放入TBW102
                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

                if (startFactory) {
                    //5.啟動工廠
                    mQClientFactory.start();
                }

                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                //6.狀態改變為啟動狀態
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
        //7.發送心跳檢測向所有的broker
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    }

 

我們主要看  MQClientInstance 創建以及啟動:

public class MQClientInstance {
    private final static long LOCK_TIMEOUT_MILLIS = 3000;
    private final Logger log = ClientLogger.getLog();
    //配置信息
    private final ClientConfig clientConfig;
    //創建索引
    private final int instanceIndex;
    //pid
    private final String clientId;
    private final long bootTimestamp = System.currentTimeMillis();
    //producer 維護的group 表,producer 會注冊這張表,如上代碼啟動的第3步
    private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
    //consumer 維護的group 表
    private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
    //admin 維護的group 表
    private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
    //Netty 配置類
    private final NettyClientConfig nettyClientConfig;
    //Netty 客戶端
    private final MQClientAPIImpl mQClientAPIImpl;
    //admin
    private final MQAdminImpl mQAdminImpl;
    //從NameSrv 拉取到Topic 相關信息的原始值放在這個表里面
    private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
    private final Lock lockNamesrv = new ReentrantLock();
    private final Lock lockHeartbeat = new ReentrantLock();
    //broker 地址表
    private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
        new ConcurrentHashMap<String, HashMap<Long, String>>();
    //broker version 表
    private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =
        new ConcurrentHashMap<String, HashMap<String, Integer>>();
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "MQClientFactoryScheduledThread");
        }
    });
    //請求處理器
    private final ClientRemotingProcessor clientRemotingProcessor;
    //拉去消息服務
    private final PullMessageService pullMessageService;
    //負載均衡服務
    private final RebalanceService rebalanceService;
    //默認Group 的producer 服務
    private final DefaultMQProducer defaultMQProducer;
    //consumer 狀態管理
    private final ConsumerStatsManager consumerStatsManager;
    private final AtomicLong sendHeartbeatTimesTotal = new AtomicLong(0);
    //MqClientInstance 啟動狀態
    private ServiceState serviceState = ServiceState.CREATE_JUST;
    private DatagramSocket datagramSocket;
    private Random random = new Random();

啟動MQClientInstance:

  public void start() throws MQClientException {

        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // 如果沒有配置nameSrv 默認從java 配置的propertues 參數路由地址獲取信息,阿里就是這么使用的,nameSrv 的獲取就相當於一個web 服務
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    //初始化Netty 客戶端
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    //開啟一些定時任務
                    this.startScheduledTask();
                    // Start pull service
                    this.pullMessageService.start();
                    // Start rebalance service
                    this.rebalanceService.start();
                    // Start push service  默認group的 producer 啟動
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM