Producer
消息發送
producer start
producer啟動過程如下圖
public void start(final boolean startFactory) throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// check GroupName
this.checkConfig();
// 改變ClientConfig.instanceName為pid
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
// 初始化mQClientFactory為MQClientInstance,並將該實例加入factoryTable
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// 將producer注冊到MQClientInstance.producerTbale
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 保存topic對應的routeInfo
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
if (startFactory) {
// 啟動MQClientInstance
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "//
+ this.serviceState//
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
// 啟動的時候向所有的broker發送heartbeat
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
消息發送流程
消息發送過程:先由producer封裝通過netty發送到broker,然后由broker進行保存,過程如下
============================= producer發送消息 =============================
主要邏輯在DefaultMQProducerImpl.sendDefaultImpl
- 獲取topicRouteInfo
DefaultMQProducerImpl.tryToFindTopicPublishInfo
->
MQClientInstance.updateTopicRouteInfoFromNameServer
private final ConcurrentHashMap<String/* group */, MQProducerInner> producerTable:放置所有的producer
updateTopicRouteInfoFromNameServer
- 獲取topicRouteInfo
- 然后遍歷producerTable,更新所有producer的topicRouteInfo
- 遍歷consumerTable,更新所有的consumer的topicSubscribeInfo
- 加入topicRouteTable
->
MQClientAPIImpl.getTopicRouteInfoFromNameServer
通過netty向nameServer發出請求,請求topicRouteInfo
2. 選取一個messageQueue
從messageQueueList取一個MessageQueue
3. netty發送消息
DefaultMQProducer.sendKernelImpl,
============================= broker接收消息 =============================
因為使用netty作為網絡通信工具,broker也是先使用netty接收到信息,然后調用注冊的processor處理
- parseRequestHeader
利用反射構造requestHeader - 構造SendMessageContext
- 執行beforeHook
- 發送消息
- 構造MessageExtBrokerInner
- 保存消息DefaultMessageStore.putMessage
- CommitLog.putMessage
- 從mapedFileQueue中取出一個mapedFile,appendMessag使用directBuffer的方式寫入commitLog
- 同步或者異步刷盤
- 同步雙寫
- CommitLog.putMessage
- 執行afterHook
issue
broker怎么會使用SendMessageProcessor來處理producer發來的消息?
// 在broker初始化的時候會注冊所有的processor,registerProcessor
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
sendProcessor.registerSendMessageHook(sendMessageHookList)
sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
broker怎么接收消息並寫入文件
- 找到需要寫入的mapedFile(最后一個,或者新建一個)
- 使用mapedFile.appendMessage
怎么獲取需要寫入的mapedFile
見時序圖:RocketMQ.asta(Broker收到sendRequest)
mapedFile.appendMessage的過程
見時序圖:RocketMQ.asta(Broker收到sendRequest)
消息發送過程中涉及到的類
DefaultMQProducerImpl
topicPublishInfoTable
里面存放topic對應的messageQueue等信息
// topic對應的消息是否有序
private boolean orderTopic = false;
// 有沒有routerInfo
private boolean haveTopicRouterInfo = false;
// topic對應的messageQueue
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
// 消息發往哪一個queue
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(0);
private TopicRouteData topicRouteData;
private final ConcurrentHashMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable = new ConcurrentHashMap<String, TopicPublishInfo>();
put:
- 在producer start的時候會根據當前producer 的topic新建一個TopicPushlishInfo放進去
- 在發送信息之前會獲取topic對應的topicPublishInfo,這個時候會去nameServer 查詢最新的信息並更新table中所有的記錄
defaultMQProducer
private final DefaultMQProducer defaultMQProducer;
發送消息的類,在DefaultMQProducer的構造函數里new DefaultMQProducerImpl傳入DefaultMQProducer自身,所以這里DefaultMQProducerImpl.defaultMQProducer默認就是DefaultMQProducer
mQClientFactory
private MQClientInstance mQClientFactory;
MQClientManager是單例,包含兩個屬性
// 用來生成MQClientInstance的id,每個遞增
private AtomicInteger factoryIndexGenerator = new AtomicInteger();
// 包含clientId到MQClientInstance的映射,管理client
private ConcurrentHashMap<String/* clientId */, MQClientInstance> factoryTable =
new ConcurrentHashMap<String, MQClientInstance>();
通過getInstance獲取實例之后調用getAndCreateMQClientInstance創建MQClientInstance。
在DefaultMQProducerImpl.start的時候調用mQClientFactory.start啟動
MQClientInstance
字段
// 在MQClientManager中new MQCLientInstance的時候,傳入DefaultMQProducer,因為DefaultMQProducer繼承了ClientConfig
private final ClientConfig clientConfig;
// new 的時候傳入,由MQClientManager生成
private final int instanceIndex;
// new的時候傳入,由clientConfig.buildMQClientId生成,形式為:ip@instanceName
private final String clientId;
// 每個group對應的MQProducerInner
// 在producer啟動的時候注冊到這兒
// 在每次發送message的時候從nameServer獲取topicRouteData並更新每個producer對應的信息
private final ConcurrentHashMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
// 每個group對應的MQConsumerInner
// consumer start的時候注冊到這兒
// 在每次發送message的時候從nameServer獲取topicRouteData並更新每個consumer對應的信息
private final ConcurrentHashMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
// 每個group對應的adminExtInner,在NameServer 啟動的時候會注冊DefaultMQAdminExt
private final ConcurrentHashMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
// topic對應的TopicRouteTable
// 在每次發送message的時候從nameServer獲取topicRouteData並更新對應的信息
private final ConcurrentHashMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
// 所有broker的地址
// 在每次發送message的時候從nameServer獲取topicRouteData並更新對應的信息
private final ConcurrentHashMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
new ConcurrentHashMap<String, HashMap<Long, String>>();
// 定時任務線程池,包括:fetchNameServerAddr,updateTopicRouteInfoFromNameServer等
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "MQClientFactoryScheduledThread");
}
});
// service
private final PullMessageService pullMessageService;
private final RebalanceService rebalanceService;
private final NettyClientConfig nettyClientConfig;
// 用來client遠程通信,使用netty
private final MQClientAPIImpl mQClientAPIImpl;
關鍵方法
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.clientConfig.setNamesrvAddr(this.mQClientAPIImpl.fetchNameServerAddr());
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
private void startScheduledTask() {
if (null == this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 獲取nameServer的地址
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 更新topicRouteData
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInteval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 清理掉線的broker
MQClientInstance.this.cleanOfflineBroker();
// 給broker發送心跳
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 保存consumerOffset
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 根據processQueueTable的大小決定是否需要增加或者減少threadPool的大小
// 目前尚未實現具體的增加或者減少的邏輯
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
}
DefaultMessageStore
字段
private final MessageStoreConfig messageStoreConfig;
// CommitLog
private final CommitLog commitLog;
// topic和queueId唯一確定一個consumeQueue
// put:
// 1. 在getMessage的時候會調用findConsumeQueue,如果consumeQueueTable不存在對應的(topic,queueId),則新建一個加入table
// 2. 在啟動的時候load
private final ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;
/** service */
// 消息刷盤,依次循環consumeQueueTable,將每個consumeQueue中的mapedFileQueue commit
private final FlushConsumeQueueService flushConsumeQueueService;
// 刪除過時的commitlog文件
private final CleanCommitLogService cleanCommitLogService;
// 刪除consumeQueue文件
private final CleanConsumeQueueService cleanConsumeQueueService;
// 在新建DefaultMessageStore的時候新建new
// 在messageStore start的時候start
// 在run方法中循環取出requestQueue(PriorityBlokingQueue,take的時候是阻塞的)里面的request處理——即新建mapedFile
private final AllocateMapedFileService allocateMapedFileService;
// TODO 暫時還不能理解indexservice
private final IndexService indexService;
// TODO
private final ReputMessageService reputMessageService;
// HA 高可用,同步雙寫,異步復制
private final HAService haService;
private final ScheduleMessageService scheduleMessageService;
private final StoreStatsService storeStatsService;
關鍵方法
public void start() throws Exception {
this.flushConsumeQueueService.start();
this.commitLog.start();
this.storeStatsService.start();
if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) {
this.scheduleMessageService.start();
}
if (this.getMessageStoreConfig().isDuplicationEnable()) {
this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());
} else {
this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());
}
this.reputMessageService.start();
this.haService.start();
this.createTempFile();
this.addScheduleTask();
this.shutdown = false;
}
CommitLog
消息寫入內存,保存文件的地方
字段
// 消息文件隊列,包含所有保存在磁盤上的文件
private final MapedFileQueue mapedFileQueue;
//
private final DefaultMessageStore defaultMessageStore;
// 消息刷盤
private final FlushCommitLogService flushCommitLogService;
// 添加消息的回調,在doAppend方法中追加消息到內存
private final AppendMessageCallback appendMessageCallback;
// 記錄topic對應每個隊列的offset
private HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
MapedFileQueue
管理mapedFile,新建、獲取、刪除mapedFile將消息寫入文件
字段
// 每次刪除文件個數,作為是否刪除的一個參數
private static final int DeleteFilesBatchMax = 10;
// 文件的存儲路徑
private final String storePath;
// 每個文件的大小
private final int mapedFileSize;
// 所有文件列表
private final List<MapedFile> mapedFiles = new ArrayList<MapedFile>();
// 讀鎖
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
// 新建mapedFile的地方,將新建mapedFile的 request添加到requesttable和requestQueue中
// 在MessageStore啟動的時候會啟動AllocateMapedFileService這個線程,執行requestQueue里面的request,新建mapedFile
private final AllocateMapedFileService allocateMapedFileService;
MapedFile
和文件一對一關系
字段
// 保存消息的文件名
private final String fileName;
// 該文件的全局offset,也就是文件名的前綴
private final long fileFromOffset;
// 文件大小
private final int fileSize;
// 文件對象
private final File file;
// 文件映射為的內存
private final MappedByteBuffer mappedByteBuffer;
// 文件寫的位置
private final AtomicInteger wrotePostion = new AtomicInteger(0);
// 刷盤之后的位置
private final AtomicInteger committedPosition = new AtomicInteger(0);
// nio阻塞
private FileChannel fileChannel;
private volatile long storeTimestamp = 0;
private boolean firstCreateInQueue = false;
方法
public int commit(final int flushLeastPages) {
// 判斷是否需要flush
if (this.isAbleToFlush(flushLeastPages)) {
//判斷文件是否被占用,也就是說每次commit的時候不一定成功
if (this.hold()) {
// 如果沒有被占用則將內存刷到磁盤上
int value = this.wrotePostion.get();
// 將內存強制寫入磁盤
this.mappedByteBuffer.force();
this.committedPosition.set(value);
// 釋放占用
this.release();
} else {
// 嘗試占用失敗,重置committedPosition
log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
this.committedPosition.set(this.wrotePostion.get());
}
}
return this.getCommittedPosition();
}
public AppendMessageResult appendMessage(final Object msg, final AppendMessageCallback cb) {
assert msg != null;
assert cb != null;
// file當前position
int currentPos = this.wrotePostion.get();
if (currentPos < this.fileSize) {
// 獲取DirectByteBuffer的一個分片,重置了mark,position,limit是剩下的大小,和原來的buffer共享同一塊內存
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
AppendMessageResult result =
cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg);
this.wrotePostion.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
}
log.error("MapedFile.appendMessage return null, wrotePostion: " + currentPos + " fileSize: "
+ this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}