rocketmq--消息的產生(普通消息)


與消息發送緊密相關的幾行代碼:
1. DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
2. producer.start();
3. Message msg = new Message(...)
4. SendResult sendResult = producer.send(msg);
5. producer.shutdown();
 
那這幾行代碼執行時,背后都做了什么?
 
一. 首先是DefaultMQProducer.start 
@Override
public void start() throws MQClientException {
this.defaultMQProducerImpl.start();
}
調用了默認生成消息的實現類 -- DefaultMQProducerImpl
調用defaultMQProducerImpl.start()方法,DefaultMQProducerImpl.start()會初始化得到MQClientInstance實例對象,MQClientInstance實例對象調用它自己的start方法會 ,啟動一些服務,如拉去消息服務PullMessageService.Start()、啟動負載平衡服務RebalanceService.Start(),比如網絡通信服務MQClientAPIImpl.Start(), 另外,還會執行與生產消息相關的信息,如注冊produceGroup、new一個TopicPublishInfo對象並以默認TopicKey為鍵值,構成鍵值對存入DefaultMQProducerImpl的topicPublishInfoTable中。efaultMQProducerImpl.start()后,獲取的MQClientInstance實例對象會調用sendHeartbeatToAllBroker()方法,不斷向broker發送心跳包,yin'b可以使用下面一幅圖大致描述DefaultMQProducerImpl.start()過程:
 

 

 
上圖中的三個部分中涉及的內容:
 
1.1 初始化MQClientInstance
 
一個客戶端只能產生一個MQClientInstance實例對象,產生方式使用了工廠模式與單例模式。MQClientInstance.start()方法啟動一些服務,源碼如下:
public void start() throws MQClientException {

        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    this.startScheduledTask();
                    // Start pull service
                    this.pullMessageService.start();
                    // Start rebalance service
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }
 
1.2 注冊producer
該過程會將這個當前producer對象注冊到MQClientInstance實例對象的的producerTable中。 一個jvm(一個客戶端)中一個producerGroup只能有一個實例,MQClientInstance操作producerTable大概有如下幾個方法:
  • -- selectProducer
  • -- updateTopicRouteInfoFromNameServer
  • -- prepareHeartbeatData
  • -- isNeedUpdateTopicRouteInfo
  • -- shutdown
注:
根據不同的clientId,MQClientManager將給出不同的MQClientInstance;
根據不同的group,MQClientInstance將給出不同的MQProducer和MQConsumer
 
1.3 向路由信息表中添加路由
topicPublishInfoTable定義:
public class DefaultMQProducerImpl implements MQProducerInner {
    private final Logger log = ClientLogger.getLog();
    private final Random random = new Random();
    private final DefaultMQProducer defaultMQProducer;
    private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>();
它是一個以topic為key的Map型數據結構,DefaultMQProducerImpl.start()時會默認創建一個key=MixAll.DEFAULT_TOPIC的TopicPublishInfo存放到topicPublishInfoTable中。
 
1.4 發送心跳包
MQClientInstance向broker發送心跳包時,調用sendHeartbeatToAllBroker( ),以及從MQClientInstance實例對象的brokerAddrTable中拿到所有broker地址,向這些broker發送心跳包。
sendHeartbeatToAllBroker會涉及到prepareHeartbeatData()方法,該方法會生成heartbeatData數據,發送心跳包時,heartbeatData作為心跳包的body。與producer相關的部分代碼如下:
// Producer
for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {
MQProducerInner impl = entry.getValue();
if (impl != null) {
ProducerData producerData = new ProducerData();
producerData.setGroupName(entry.getKey());
 
heartbeatData.getProducerDataSet().add(producerData);
}
 
 
二、. SendResult sendResult = producer.send(msg)
 
首先會調用DefaultMQProducer.send(msg) ,繼而調用sendDefaultImpl:
 public SendResult send(Message msg,
        long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
    }
 
 
sendDefaultImpl做了啥?
2.1. 獲取topicPublishInfo
根據msg的topic從topicPublishInfoTable獲取對應的topicPublishInfo,如果沒有則更新路由信息,從nameserver端拉取最新路由信息。從nameserver端拉取最新路由信息大致為:首先getTopicRouteInfoFromNameServer,然后topicRouteData2TopicPublishInfo。

 
 
2.2 選擇消息發送的隊列
普通消息:默認方式下,selectOneMessageQueue從topicPublishInfo中的messageQueueList中選擇一個隊列(MessageQueue)進行發送消息,默認采用長輪詢的方式選擇隊列 。它的機制如下:正常情況下,順序選擇queue進行發送;如果某一個節點發生了超時,則下次選擇queue時,跳過相同的broker。不同的隊列選擇策略形成了生產消息的幾種模式,如順序消息,事務消息。
 
順序消息:將一組需要有序消費的消息發往同一個broker的同一個隊列上即可實現順序消息,假設相同訂單號的支付,退款需要放到同一個隊列,那么就可以在send的時候,自己實現MessageQueueSelector,根據參數arg字段來選擇queue。
    private SendResult sendSelectImpl(
        Message msg,
        MessageQueueSelector selector,
        Object arg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback, final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { 。。。}
 
 事務消息:只有在消息發送成功,並且本地操作執行成功時,才發送提交事務消息,做事務提交,消息發送失敗,直接發送回滾消息,進行回滾,具體如何實現后面會單獨成文分析。
 
 
2.3 封裝消息體通信包,發送數據包
首先,根據獲取的MessageQueue中的getBrokerName,調用findBrokerAddressInPublish得到該消息存放對應的broker地址,如果沒有找到則跟新路由信息,重新獲取地址 :
brokerAddrTable.get(brokerName).get(MixAll.MASTER_ID)
可知獲取的broker均為master(id=0)
 
然后, 將與該消息相關信息打包成RemotingCommand數據包,其RequestCode.SEND_MESSAGE
根據獲取的broke地址,將數據包到對應的broker,默認是發送超時時間為3s。
 
封裝消息請求包的包頭:
 SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                requestHeader.setTopic(msg.getTopic());
                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                requestHeader.setQueueId(mq.getQueueId());
                requestHeader.setSysFlag(sysFlag);
                requestHeader.setBornTimestamp(System.currentTimeMillis());
                requestHeader.setFlag(msg.getFlag());
                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                requestHeader.setBatch(msg instanceof MessageBatch);

發送消息包(普通消息默認為同步方式):

SendResult sendResult = null;
switch (communicationMode) {
    case SYNC:
        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
        brokerAddr,
        mq.getBrokerName(),
        msg,
        requestHeader,
       timeout,
        communicationMode,
        context,
        this);
      break;

 處理來自broker端的響應數據包:

 private SendResult sendMessageSync(
        final String addr,
        final String brokerName,
        final Message msg,
        final long timeoutMillis,
        final RemotingCommand request
    ) throws RemotingException, MQBrokerException, InterruptedException {
        RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
        assert response != null;
        return this.processSendResponse(brokerName, msg, response);
    }

 

broker端處理request數據包后會將消息存儲到commitLog,具體過程后續分析。

 (完)
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM