RocketMQ消息发送者
DefaultMQProducer
消息发送者启动
public void start() throws MQClientException {
//①调用内部的defaultMQProducerImpl#start方法
this.defaultMQProducerImpl.start();
}
①调用内部的defaultMQProducerImpl#start方法
public void start() throws MQClientException {
this.start(true);
}
public void start(final boolean startFactory) throws MQClientException {
//初始状态ServiceState.CREATE_JUST
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
//①检查生产组名是否合法
this.checkConfig();
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
//②生产组名不是CLIENT_INNER_PRODUCER的话 将instanceName设置成pid
this.defaultMQProducer.changeInstanceNameToPID();
}
//③创建MQClientInstance 同一个JVM里的生产者和消费者公用一个MQClientInstance
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
//④MQClientInstance注册生产者
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory) {
//⑤启动MQClientInstance
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
//⑥跟新状态为RUNNING
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
//⑦发送路由信息 将生产者和消费者信息发送到broker端
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
①检查生产组名是否合法
②生产组名不是CLIENT_INNER_PRODUCER的话 将instanceName设置成pid
③创建MQClientInstance 同一个JVM里的生产者和消费者公用一个MQClientInstance
④注册生产者
⑤启动MQClientInstance
⑥跟新状态为RUNNING
⑦发送路由信息 将生产者和消费者信息发送到broker端
①检查生产组名是否合法
private void checkConfig() throws MQClientException {
//Validate group
Validators.checkGroup(this.defaultMQProducer.getProducerGroup());
//producerGroup不能为空
if (null == this.defaultMQProducer.getProducerGroup()) {
throw new MQClientException("producerGroup is null", null);
}
//producerGroup不能是默认的组名
if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) {
throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.",
null);
}
}
/**
* Validate group
*/
public static void checkGroup(String group) throws MQClientException {
//group不能为空
if (UtilAll.isBlank(group)) {
throw new MQClientException("the specified group is blank", null);
}
//group要符合正则规则
if (!regularExpressionMatcher(group, PATTERN)) {
throw new MQClientException(String.format(
"the specified group[%s] contains illegal characters, allowing only %s", group,
VALID_PATTERN_STR), null);
}
//group不能超过255个字
if (group.length() > CHARACTER_MAX_LENGTH) {
throw new MQClientException("the specified group is longer than group max length 255.", null);
}
}
主要做了以下几点生产者组名检查:
- group不能为空
- group要符合正则 "[1]+$"
- group不能超过255个字
- group不能是默认的组名“DEFAULT_PRODUCER”
②生产组名不是CLIENT_INNER_PRODUCER的话 将instanceName设置成pid
defaultMQProducer#changeInstanceNameToPID
public void changeInstanceNameToPID() {
//instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
//系统变量rocketmq.client.name如果没有设置就是DEFAULT
if (this.instanceName.equals("DEFAULT")) {
//将当前instanceName设置为当前jvm进程pid
this.instanceName = String.valueOf(UtilAll.getPid());
}
}
UtilAll#getPid() 获取当前jvm进程pid,这里说明同一个jvm环境里,启动多个producer,其instanceName如果不用系统变量rocketmq.client.name指定的话,instanceName就会为同一个pid
public static int getPid() {
RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
String name = runtime.getName(); // format: "pid@hostname"
try {
return Integer.parseInt(name.substring(0, name.indexOf('@')));
} catch (Exception e) {
return -1;
}
}
③创建MQClientInstance 同一个JVM里的生产者和消费者公用一个MQClientInstance
public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
//构建clientId clientId格式为ip@instanceName@unitName, unitName默认为null,所以同一个jvm进程内如果不设定unitName,启动多个producer其clientId是一样的,结果只会创建一个MQClientInstance,并且发送者和消费者在同一个jvm里启动,也只会创建一个MQClientInstance
String clientId = clientConfig.buildMQClientId();
//根据clientId获取MQClientInstance
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
//MQClientInstance不存在创建一个MQClientInstance
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
④MQClientInstance注册生产者
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
public boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
if (null == group || null == producer) {
return false;
}
//producerTable里存放group对应的producer,默认情况下一个jvm进程里producer的group必须是唯一的,否则会注册失败
MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
if (prev != null) {
log.warn("the producer group[{}] exist already.", group);
return false;
}
return true;
}
⑤启动MQClientInstance
MQClientInstance的启动逻辑相当的多,这里只做宏观上的流程讲解,会单独做一个篇章来解析其启动的细节
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel 启动远程通信服务
this.mQClientAPIImpl.start();
// Start various schedule tasks 开启定时任务
this.startScheduledTask();
// Start pull service 启动拉取消息服务
this.pullMessageService.start();
// Start rebalance service 启动消费者负载均衡服务
this.rebalanceService.start();
// Start push service 启动内部的defaultMQProducer
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
⑥跟新状态为RUNNING
⑦发送路由信息 将生产者和消费者信息发送到broker端
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
public void sendHeartbeatToAllBrokerWithLock() {
if (this.lockHeartbeat.tryLock()) {
try {
//发送路由信息 将生产者和消费者信息发送到broker端
this.sendHeartbeatToAllBroker();
this.uploadFilterClassSource();
} catch (final Exception e) {
log.error("sendHeartbeatToAllBroker exception", e);
} finally {
this.lockHeartbeat.unlock();
}
} else {
log.warn("lock heartBeat, but failed.");
}
}
- MQClientInstance#sendHeartbeatToAllBroker
private void sendHeartbeatToAllBroker() {
//①准备心跳数据 包括生产者和消费者的信息
final HeartbeatData heartbeatData = this.prepareHeartbeatData();
final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
if (producerEmpty && consumerEmpty) {
log.warn("sending heartbeat, but no consumer and no producer");
return;
}
if (!this.brokerAddrTable.isEmpty()) {
long times = this.sendHeartbeatTimesTotal.getAndIncrement();
Iterator<Entry<String/*brokerName*/, HashMap<Long/*brokerId*/, String/*brokerAddr*/>>> it = this.brokerAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, HashMap<Long, String>> entry = it.next();
String brokerName = entry.getKey();
HashMap<Long, String> oneTable = entry.getValue();
if (oneTable != null) {
for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
Long id = entry1.getKey();
String addr = entry1.getValue();
if (addr != null) {
if (consumerEmpty) {
if (id != MixAll.MASTER_ID)
continue;
}
try {
int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
if (!this.brokerVersionTable.containsKey(brokerName)) {
this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
}
this.brokerVersionTable.get(brokerName).put(addr, version);
if (times % 20 == 0) {
log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
log.info(heartbeatData.toString());
}
} catch (Exception e) {
if (this.isBrokerInNameServer(addr)) {
log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr);
} else {
log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
id, addr);
}
}
}
}
}
}
}
}
MQClientInstance#prepareHeartbeatData 生成心跳数据 包含生产者和消费者信息
private HeartbeatData prepareHeartbeatData() {
HeartbeatData heartbeatData = new HeartbeatData();
// clientID 客户端id ip@instanceName@unitName
heartbeatData.setClientID(this.clientId);
// Consumer 消费者信息
for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
ConsumerData consumerData = new ConsumerData();
consumerData.setGroupName(impl.groupName());//组名
consumerData.setConsumeType(impl.consumeType());//push还是pull
consumerData.setMessageModel(impl.messageModel());//广播消费还是集群消费
consumerData.setConsumeFromWhere(impl.consumeFromWhere());//启动时从哪里开始消费
consumerData.getSubscriptionDataSet().addAll(impl.subscriptions());//主题订阅信息
consumerData.setUnitMode(impl.isUnitMode());//是否是unitMode
heartbeatData.getConsumerDataSet().add(consumerData);
}
}
// Producer 生产者信息
for (Map.Entry<String/* group */, MQProducerInner> entry : this.producerTable.entrySet()) {
MQProducerInner impl = entry.getValue();
if (impl != null) {
ProducerData producerData = new ProducerData();
producerData.setGroupName(entry.getKey());//组名
heartbeatData.getProducerDataSet().add(producerData);
}
}
return heartbeatData;
}
%|a-zA-Z0-9_- ↩︎