前言:
RocketMq producer 在發送一條消息時候,從 producer --nameSrv -- Broker 中間經過了什么樣子的數據交互
開始:
如下是 Producer 發送消息的一個demo例子:
//1. 初始化 mq producer DefaultMQProducer mqProducer =new DefaultMQProducer("iscys-test"); //2.設置nameServer 地址 mqProducer.setNamesrvAddr("localhost:9876"); //3. 開啟mq producer,這一步是必須的,會做一些連接初始化檢測工作 mqProducer.start(); //4.創建 Message Message msg = new Message("test-topis", "iscys-test".getBytes()); //5.發送消息,設置回調,消息發送成功會回調函數 mqProducer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { //在消息發送成功之后,我們收到broker的響應通知后,會進行回調 System.out.println("send success"); } @Override public void onException(Throwable e) { System.out.println("send fail"); } });
構建發送消息:
public void send(Message msg, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, InterruptedException { try { //默認異步發送,超時3s this.sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout); } catch (MQBrokerException e) { throw new MQClientException("unknownn exception", e); } }
從NameSrv 中獲取topic 配置的相關信息,比如 broker 地址,隊列數 之類的。
private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); final long invokeID = random.nextLong(); long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; //1.嘗試取獲取從NameSrv 中獲取topic 相關信息 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); //2.選擇一個消息隊列,默認為4個,在創建新的Topic時候 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); //3.發送消息 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout); endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); switch (communicationMode) { case ASYNC: return null; case ONEWAY:
主要看一下如上代碼第一步 嘗試獲取Topic 信息 tryToFindTopicPublishInfo:
1. 會先從 topicPublishInfoTable 緩存中獲取topic 配置信息
2.緩存沒有,就從NameSrv 中拉取。
3.如果獲取到了,則返回。
4.NameSrv 沒有得到相關到topic 信息,說明是新到topic ,則就請求獲取TBW102 topic 配置信息,這個肯定能獲取到,封裝使用TBW102的配置。
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { //1.從 topicPublishInfoTable 從嘗試從Map中獲取,如果沒有獲取到,請求NameSrv TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); //2.從NameSrv 中拉取topic 信息 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { //3.說明獲取到TOPIC 的信息 return topicPublishInfo; } else { //4.如果第2步執行后 NameSrv 中沒有topic 信息,獲取默認的TBW102 topic 的信息,這個是肯定能獲取到的 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }
請求NameSrv 非默認的topic
public boolean updateTopicRouteInfoFromNameServer(final String topic) { //1.非默認的topic ,默認Topic 為TBW102 return updateTopicRouteInfoFromNameServer(topic, false, null); }
執行從NameSrv 獲取topic 請求:
1. 從NameSrv 中獲取到 TBW102 的topic 信息,這個一般都是有的。
2. 新的topic 會從NameSrv 中獲取信息,如果不存在,返回false。
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) { try { if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { try { TopicRouteData topicRouteData; if (isDefault && defaultMQProducer != null) { // 1.如果請求的是默認的Topic 請求會走到這里 topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 1000 * 3); if (topicRouteData != null) { for (QueueData data : topicRouteData.getQueueDatas()) { int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums()); data.setReadQueueNums(queueNums); data.setWriteQueueNums(queueNums); } } } else { // 2.新的Topic 會先從NameSrv 中獲取一遍,如果NameSrv 中沒有獲取到,會拋出異常 topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3); }
從NameSrv 獲取到 TopicRouteData 進行如下表的緩存
brokerAddressTable --- 從TopicRouteData 得到broker 信息,進行brokerAddressTable 存儲
TopicPublishInfoTable --TopicRouteData 轉換為topicPushInfo 存儲到 topicPublishInfoTable表中
topicRouteTable ---存儲namesrv 得到的 TopicRouteData 值
if (topicRouteData != null) { //1.與緩存做比較,查看topic 相關信息是否有改變 TopicRouteData old = this.topicRouteTable.get(topic); boolean changed = topicRouteDataIsChange(old, topicRouteData); if (!changed) { changed = this.isNeedUpdateTopicRouteInfo(topic); } else { log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData); } if (changed) { TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData(); //2.broker 信息進行brokerAddrTable 表存儲 for (BrokerData bd : topicRouteData.getBrokerDatas()) { //緩存 this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs()); } // Update Pub info { //3.將topicRouteData 轉換為TopicPublishInfo ,用於producer TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData); //設置為true 標記這個publishInfo是可用的 publishInfo.setHaveTopicRouterInfo(true); Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQProducerInner> entry = it.next(); MQProducerInner impl = entry.getValue(); if (impl != null) { //更新producer topicPublishInfoTable表信息 impl.updateTopicPublishInfo(topic, publishInfo); } } } //4.consumerTable 維護用戶Consumer // Update sub info { Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData); Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQConsumerInner> entry = it.next(); MQConsumerInner impl = entry.getValue(); if (impl != null) { impl.updateTopicSubscribeInfo(topic, subscribeInfo); } } } log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData); //5.topicRouteTable 表存儲原始信息,用於定時任務心跳等 this.topicRouteTable.put(topic, cloneTopicRouteData); return true;
獲取到topic信息后封裝成 TopicPublishInfo:
public class TopicPublishInfo { private boolean orderTopic = false; //用來檢測Topic 在Broker 真實存在的,不存在false private boolean haveTopicRouterInfo = false; //消息隊列的 private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(); private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); //請求NameSrv 返回的TOPIC 具體信息 private TopicRouteData topicRouteData;
之后就是消息的發送。
再看producer 啟動:
說完消息發送,我們再看producer 的啟動初始化:
1.檢查配置信息
2.初始化MQClientInstance
3.將producer 信息注冊到MQClientInstance維護的producerTable 表中
4. topicPublishInfoTable 放入默認的topic
5.啟動MQClientInstance
6.producer 狀態修改為運行狀態
7.發送心跳向broker
public void start(final boolean startFactory) throws MQClientException { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; //1.檢查配置 this.checkConfig(); if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { this.defaultMQProducer.changeInstanceNameToPID(); } //2.初始化 MQClientInstance producer 交互的重要信息組件 this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook); // 3.producerGroup 注冊到 producerTable 表中 k= producerName v = this boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } //4.topicPublishInfoTable 表中默認放入TBW102 this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo()); if (startFactory) { //5.啟動工廠 mQClientFactory.start(); } log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), this.defaultMQProducer.isSendMessageWithVIPChannel()); //6.狀態改變為啟動狀態 this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The producer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } //7.發送心跳檢測向所有的broker this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); }
我們主要看 MQClientInstance 創建以及啟動:
public class MQClientInstance { private final static long LOCK_TIMEOUT_MILLIS = 3000; private final Logger log = ClientLogger.getLog(); //配置信息 private final ClientConfig clientConfig; //創建索引 private final int instanceIndex; //pid private final String clientId; private final long bootTimestamp = System.currentTimeMillis(); //producer 維護的group 表,producer 會注冊這張表,如上代碼啟動的第3步 private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>(); //consumer 維護的group 表 private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>(); //admin 維護的group 表 private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>(); //Netty 配置類 private final NettyClientConfig nettyClientConfig; //Netty 客戶端 private final MQClientAPIImpl mQClientAPIImpl; //admin private final MQAdminImpl mQAdminImpl; //從NameSrv 拉取到Topic 相關信息的原始值放在這個表里面 private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>(); private final Lock lockNamesrv = new ReentrantLock(); private final Lock lockHeartbeat = new ReentrantLock(); //broker 地址表 private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable = new ConcurrentHashMap<String, HashMap<Long, String>>(); //broker version 表 private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable = new ConcurrentHashMap<String, HashMap<String, Integer>>(); private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "MQClientFactoryScheduledThread"); } }); //請求處理器 private final ClientRemotingProcessor clientRemotingProcessor; //拉去消息服務 private final PullMessageService pullMessageService; //負載均衡服務 private final RebalanceService rebalanceService; //默認Group 的producer 服務 private final DefaultMQProducer defaultMQProducer; //consumer 狀態管理 private final ConsumerStatsManager consumerStatsManager; private final AtomicLong sendHeartbeatTimesTotal = new AtomicLong(0); //MqClientInstance 啟動狀態 private ServiceState serviceState = ServiceState.CREATE_JUST; private DatagramSocket datagramSocket; private Random random = new Random();
啟動MQClientInstance:
public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // 如果沒有配置nameSrv 默認從java 配置的propertues 參數路由地址獲取信息,阿里就是這么使用的,nameSrv 的獲取就相當於一個web 服務 if (null == this.clientConfig.getNamesrvAddr()) { this.mQClientAPIImpl.fetchNameServerAddr(); } // Start request-response channel //初始化Netty 客戶端 this.mQClientAPIImpl.start(); // Start various schedule tasks //開啟一些定時任務 this.startScheduledTask(); // Start pull service this.pullMessageService.start(); // Start rebalance service this.rebalanceService.start(); // Start push service 默認group的 producer 啟動 this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info("the client factory [{}] start OK", this.clientId); this.serviceState = ServiceState.RUNNING; break; case RUNNING: break; case SHUTDOWN_ALREADY: break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: break; } } }