RocketMQ消息發送者


RocketMQ消息發送者

DefaultMQProducer

消息發送者啟動

public void start() throws MQClientException {
    //①調用內部的defaultMQProducerImpl#start方法
    this.defaultMQProducerImpl.start();
}

①調用內部的defaultMQProducerImpl#start方法

public void start() throws MQClientException {
    this.start(true);
}
public void start(final boolean startFactory) throws MQClientException {
    //初始狀態ServiceState.CREATE_JUST
    switch (this.serviceState) {
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;
            //①檢查生產組名是否合法
            this.checkConfig();

            if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                //②生產組名不是CLIENT_INNER_PRODUCER的話  將instanceName設置成pid
                this.defaultMQProducer.changeInstanceNameToPID();
            }
            //③創建MQClientInstance 同一個JVM里的生產者和消費者公用一個MQClientInstance
            this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
            //④MQClientInstance注冊生產者
            boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
            }

            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

            if (startFactory) {
                //⑤啟動MQClientInstance
                mQClientFactory.start();
            }

            log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
            //⑥跟新狀態為RUNNING
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
        default:
            break;
    }
    //⑦發送路由信息 將生產者和消費者信息發送到broker端
    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}

①檢查生產組名是否合法

②生產組名不是CLIENT_INNER_PRODUCER的話 將instanceName設置成pid

③創建MQClientInstance 同一個JVM里的生產者和消費者公用一個MQClientInstance

④注冊生產者

⑤啟動MQClientInstance

⑥跟新狀態為RUNNING

⑦發送路由信息 將生產者和消費者信息發送到broker端

①檢查生產組名是否合法

private void checkConfig() throws MQClientException {
    //Validate group
    Validators.checkGroup(this.defaultMQProducer.getProducerGroup());
    //producerGroup不能為空
    if (null == this.defaultMQProducer.getProducerGroup()) {
        throw new MQClientException("producerGroup is null", null);
    }
    //producerGroup不能是默認的組名
    if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) {
        throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.",
                null);
    }
}
/**
 * Validate group
 */
public static void checkGroup(String group) throws MQClientException {
  	//group不能為空
    if (UtilAll.isBlank(group)) {
        throw new MQClientException("the specified group is blank", null);
    }
    //group要符合正則規則
    if (!regularExpressionMatcher(group, PATTERN)) {
        throw new MQClientException(String.format(
            "the specified group[%s] contains illegal characters, allowing only %s", group,
            VALID_PATTERN_STR), null);
    }
    //group不能超過255個字
    if (group.length() > CHARACTER_MAX_LENGTH) {
        throw new MQClientException("the specified group is longer than group max length 255.", null);
    }
}

主要做了以下幾點生產者組名檢查:

  • group不能為空
  • group要符合正則 "[1]+$"
  • group不能超過255個字
  • group不能是默認的組名“DEFAULT_PRODUCER”

②生產組名不是CLIENT_INNER_PRODUCER的話 將instanceName設置成pid

defaultMQProducer#changeInstanceNameToPID

public void changeInstanceNameToPID() {
  	//instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
    //系統變量rocketmq.client.name如果沒有設置就是DEFAULT
    if (this.instanceName.equals("DEFAULT")) {
        //將當前instanceName設置為當前jvm進程pid
        this.instanceName = String.valueOf(UtilAll.getPid());
    }
}

UtilAll#getPid() 獲取當前jvm進程pid,這里說明同一個jvm環境里,啟動多個producer,其instanceName如果不用系統變量rocketmq.client.name指定的話,instanceName就會為同一個pid

public static int getPid() {
    RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
    String name = runtime.getName(); // format: "pid@hostname"
    try {
        return Integer.parseInt(name.substring(0, name.indexOf('@')));
    } catch (Exception e) {
        return -1;
    }
}

③創建MQClientInstance 同一個JVM里的生產者和消費者公用一個MQClientInstance

public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
    //構建clientId clientId格式為ip@instanceName@unitName, unitName默認為null,所以同一個jvm進程內如果不設定unitName,啟動多個producer其clientId是一樣的,結果只會創建一個MQClientInstance,並且發送者和消費者在同一個jvm里啟動,也只會創建一個MQClientInstance
    String clientId = clientConfig.buildMQClientId();
    //根據clientId獲取MQClientInstance
    MQClientInstance instance = this.factoryTable.get(clientId);
    if (null == instance) {
        //MQClientInstance不存在創建一個MQClientInstance
        instance =
            new MQClientInstance(clientConfig.cloneClientConfig(),
                this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
        MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
        if (prev != null) {
            instance = prev;
            log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
        } else {
            log.info("Created new MQClientInstance for clientId:[{}]", clientId);
        }
    }

    return instance;
}

④MQClientInstance注冊生產者

boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
    if (null == group || null == producer) {
        return false;
    }
    //producerTable里存放group對應的producer,默認情況下一個jvm進程里producer的group必須是唯一的,否則會注冊失敗
    MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
    if (prev != null) {
        log.warn("the producer group[{}] exist already.", group);
        return false;
    }

    return true;
}

⑤啟動MQClientInstance

MQClientInstance的啟動邏輯相當的多,這里只做宏觀上的流程講解,會單獨做一個篇章來解析其啟動的細節

public void start() throws MQClientException {

    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // If not specified,looking address from name server
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // Start request-response channel 啟動遠程通信服務
                this.mQClientAPIImpl.start();
                // Start various schedule tasks   開啟定時任務
                this.startScheduledTask();
                // Start pull service             啟動拉取消息服務
                this.pullMessageService.start();
                // Start rebalance service        啟動消費者負載均衡服務
                this.rebalanceService.start();
                // Start push service							啟動內部的defaultMQProducer
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
                break;
            case SHUTDOWN_ALREADY:
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}

⑥跟新狀態為RUNNING

⑦發送路由信息 將生產者和消費者信息發送到broker端

this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

public void sendHeartbeatToAllBrokerWithLock() {
    if (this.lockHeartbeat.tryLock()) {
        try {
          	//發送路由信息 將生產者和消費者信息發送到broker端
            this.sendHeartbeatToAllBroker();
            this.uploadFilterClassSource();
        } catch (final Exception e) {
            log.error("sendHeartbeatToAllBroker exception", e);
        } finally {
            this.lockHeartbeat.unlock();
        }
    } else {
        log.warn("lock heartBeat, but failed.");
    }
}
  • MQClientInstance#sendHeartbeatToAllBroker
private void sendHeartbeatToAllBroker() {
    //①准備心跳數據 包括生產者和消費者的信息
    final HeartbeatData heartbeatData = this.prepareHeartbeatData();
    final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
    final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
    if (producerEmpty && consumerEmpty) {
        log.warn("sending heartbeat, but no consumer and no producer");
        return;
    }

    if (!this.brokerAddrTable.isEmpty()) {
        long times = this.sendHeartbeatTimesTotal.getAndIncrement();
        Iterator<Entry<String/*brokerName*/, HashMap<Long/*brokerId*/, String/*brokerAddr*/>>> it = this.brokerAddrTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, HashMap<Long, String>> entry = it.next();
            String brokerName = entry.getKey();
            HashMap<Long, String> oneTable = entry.getValue();
            if (oneTable != null) {
                for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
                    Long id = entry1.getKey();
                    String addr = entry1.getValue();
                    if (addr != null) {
                        if (consumerEmpty) {
                            if (id != MixAll.MASTER_ID)
                                continue;
                        }

                        try {
                            int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
                            if (!this.brokerVersionTable.containsKey(brokerName)) {
                                this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
                            }
                            this.brokerVersionTable.get(brokerName).put(addr, version);
                            if (times % 20 == 0) {
                                log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
                                log.info(heartbeatData.toString());
                            }
                        } catch (Exception e) {
                            if (this.isBrokerInNameServer(addr)) {
                                log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr);
                            } else {
                                log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
                                    id, addr);
                            }
                        }
                    }
                }
            }
        }
    }
}

MQClientInstance#prepareHeartbeatData 生成心跳數據 包含生產者和消費者信息

private HeartbeatData prepareHeartbeatData() {
    HeartbeatData heartbeatData = new HeartbeatData();

    // clientID  客戶端id  ip@instanceName@unitName
    heartbeatData.setClientID(this.clientId);

    // Consumer 消費者信息
    for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
        MQConsumerInner impl = entry.getValue();
        if (impl != null) {
            ConsumerData consumerData = new ConsumerData();
            consumerData.setGroupName(impl.groupName());//組名
            consumerData.setConsumeType(impl.consumeType());//push還是pull
            consumerData.setMessageModel(impl.messageModel());//廣播消費還是集群消費
            consumerData.setConsumeFromWhere(impl.consumeFromWhere());//啟動時從哪里開始消費
            consumerData.getSubscriptionDataSet().addAll(impl.subscriptions());//主題訂閱信息
            consumerData.setUnitMode(impl.isUnitMode());//是否是unitMode

            heartbeatData.getConsumerDataSet().add(consumerData);
        }
    }

    // Producer 生產者信息
    for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {
        MQProducerInner impl = entry.getValue();
        if (impl != null) {
            ProducerData producerData = new ProducerData();
            producerData.setGroupName(entry.getKey());//組名

            heartbeatData.getProducerDataSet().add(producerData);
        }
    }

    return heartbeatData;
}

  1. %|a-zA-Z0-9_- ↩︎


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM