1 客戶端邏輯
1.1 概述
偏移量管理主要是指管理每個消息隊列的消費進度:集群模式消費下會將消息隊列的消費進度保存在Broker端,廣播模式消費下消息隊列的消費進度保存在消費者本地。
組件分析:RocketMQ定義了一個接口OffsetStore。它的實現類有兩個:RemoteBrokerOffsetStore和LocalFileOffsetStore前者主要是集群消費模式下使用,即與broker進行打交道,將消息隊列的消費偏移量通過網絡傳遞給Broker;后者主要是廣播消費模式下使用,即直接將消費偏移量存儲在消費者所在的本地中。入下圖所示:
offsetstore保存在消費者內部客戶端ConsumerInner的實現類中的,其初始化創建的時機在內部客戶端的start()方法中。offsetstore保存在消費者內部客戶端ConsumerInner的實現類中的,其初始化創建的時機在內部客戶端的start()方法中。
1 if (this.defaultMQPullConsumer.getOffsetStore() != null) { 2 this.offsetStore = this.defaultMQPullConsumer.getOffsetStore(); 3 } else { 4 switch (this.defaultMQPullConsumer.getMessageModel()) { 5 case BROADCASTING: 6 this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup()); 7 break; 8 case CLUSTERING: 9 this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup()); 10 break; 11 default: 12 break; 13 } 14 this.defaultMQPullConsumer.setOffsetStore(this.offsetStore); 15 }
下面主要分析RemoteBrokerOffsetStore的邏輯。
主要是兩個邏輯,如下圖所示
- 將消息偏移量更新到本地內存中管理消息偏移量的組件
- 將內存中保存的消息偏移量發送給Broker,更新Broker端保存的消息偏移量
1.2 更新消息隊列的偏移量
並發消息消費服務中ConsumeMessageConcurrentlyService#processConsumeResult()處理消息消費結果的方法中在消息處理完成以后會調用更新消息隊列的偏移量
1 // 獲取偏移量存儲實現,並調用其更新偏移量方法更新偏移量 2 long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); 3 if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { 4 this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); 5 }
下面是RemoteBrokerOffsetStore的更新邏輯
將已經確認消費了的偏移量存儲偏移量管理器中。此處的更新僅僅是更新了保存每個消息隊列的偏移量的map中的值,並沒有將偏移量上傳到broker。
1 public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) { 2 if (mq != null) { 3 // ConcurrentMap<MessageQueue, AtomicLong> 4 // 獲取消息隊列對應的偏移量 5 AtomicLong offsetOld = this.offsetTable.get(mq); 6 if (null == offsetOld) { 7 // 更新table 8 offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset)); 9 } 10 11 if (null != offsetOld) { 12 // 是否是只增模式 13 if (increaseOnly) { 14 MixAll.compareAndIncreaseOnly(offsetOld, offset); 15 } else { 16 offsetOld.set(offset); 17 } 18 } 19 } 20 }
1.3 向Broker發送消息偏移量
向服務端發送消息偏移量是通過MQClientInstance中啟動的一個定時任務來完成的。
1 在其startScheduledTask方法中開啟下列定時任務
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { // 對已消費的消息的偏移量進行持久化 MQClientInstance.this.persistAllConsumerOffset(); } catch (Exception e) { log.error("ScheduledTask persistAllConsumerOffset exception", e); } } }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
2 調用MQClientInstance的persisAllConsumerOffset()方法
private void persistAllConsumerOffset() { // 獲取所有消費者組對應的內部客戶端 Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, MQConsumerInner> entry = it.next(); MQConsumerInner impl = entry.getValue(); // 調用內部客戶端進行持久化 impl.persistConsumerOffset(); } }
3 調用內部消費者客戶端的持久化方法
public void persistConsumerOffset() { try { this.makeSureStateOK(); Set<MessageQueue> mqs = new HashSet<MessageQueue>(); // 獲取所有的分配的消息隊列 Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet(); mqs.addAll(allocateMq); // 持久化偏移量 this.offsetStore.persistAll(mqs); } catch (Exception e) { log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e); } }
4 調用偏移量管理器的更新
1 public void persistAll(Set<MessageQueue> mqs) { 2 if (null == mqs || mqs.isEmpty()) 3 return; 4 5 final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>(); 6 7 // 遍歷保存消息隊列偏移量的map 8 for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) { 9 MessageQueue mq = entry.getKey(); 10 AtomicLong offset = entry.getValue(); 11 if (offset != null) { 12 if (mqs.contains(mq)) { 13 try { 14 // 更新到 15 this.updateConsumeOffsetToBroker(mq, offset.get()); 16 log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}", 17 this.groupName, 18 this.mQClientFactory.getClientId(), 19 mq, 20 offset.get()); 21 } catch (Exception e) { 22 log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e); 23 } 24 } else { 25 unusedMQ.add(mq); 26 } 27 } 28 } 29 30 if (!unusedMQ.isEmpty()) { 31 for (MessageQueue mq : unusedMQ) { 32 this.offsetTable.remove(mq); 33 log.info("remove unused mq, {}, {}", mq, this.groupName); 34 } 35 } 36 }
接下來就是通過網絡層發送網絡請求給Broker進行更新消息對立偏移量。
1.4 讀取消息隊列的偏移量
兩個時刻需要獲取Broker保存的偏移量
- 消費者剛啟動的時候會去Broker獲取消息隊列對應的偏移量
- 消費者重平衡后,分配得到新的消息隊列,也要重新獲取偏移量
readOffset
在DefaultMQPushConsumerImpl的pullMessage方法中,在消費之前會讀取一次
1 commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
2 服務端的處理邏輯
服務端注冊了的消費消息偏移量的請求處理器,首先是有關偏移量的三個請求碼
GET_CONSUMER_LIST_BY_GROUP:根據組名獲取消費者列表
UPDATE_CONSUMER_OFFSET:更新消費偏移量的請求
QUERY_CONSUMER_OFFSET:查詢消費者的偏移量
所以這三個的請求碼將交給ConsumerManageProcessor來進行處理。
1 /** 2 * ConsumerManageProcessor 3 */ 4 ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this); 5 this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); 6 this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); 7 this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
2.1 更新消費者傳給broker的消費偏移量
內存存儲方式
位置:ConsumerOffsetManager的offsetTable中
格式:ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>>,第一層key是主題+消費者組,集群模式下的消費模式;第二層的key是QueueID隊列ID。
外部存儲位置:
2.2 源碼分析
2.2.1 處理偏移量更新請求和更新到內存中的流程
1 請求處理的入口
1 // RocketMQ里面的通用做法,發送請求時將給請求賦值一個請求碼; 2 // 服務端在接收到請求的時候將根據請求碼選擇不同的請求處理處理器; 3 // 統一的接口processRequest() 4 public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) 5 throws RemotingCommandException { 6 // ConsuemrManagerProcessor內部又分了不同的處理邏輯 7 switch (request.getCode()) { 8 // 處理 9 case RequestCode.GET_CONSUMER_LIST_BY_GROUP: 10 return this.getConsumerListByGroup(ctx, request); 11 // 處理更新偏移量 12 case RequestCode.UPDATE_CONSUMER_OFFSET: 13 return this.updateConsumerOffset(ctx, request); 14 case RequestCode.QUERY_CONSUMER_OFFSET: 15 return this.queryConsumerOffset(ctx, request); 16 default: 17 break; 18 } 19 return null; 20 }
2 處理更新消費偏移量的入口
1 private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) 2 throws RemotingCommandException { 3 // 首先創建響應,RocketMQ中慣例做法,具體可參照 4 final RemotingCommand response = 5 RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class); 6 // 解碼請求頭 7 final UpdateConsumerOffsetRequestHeader requestHeader = 8 (UpdateConsumerOffsetRequestHeader) request 9 .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class); 10 // 調用消費偏移量偏移器進行更新消費偏移量 11 this.brokerController.getConsumerOffsetManager() 12 .commitOffset( 13 RemotingHelper.parseChannelRemoteAddr(ctx.channel()), 14 requestHeader.getConsumerGroup(), // 消費者組 15 requestHeader.getTopic(), // 主題 16 requestHeader.getQueueId(), // 隊列ID 17 requestHeader.getCommitOffset()); // 偏移量 18 response.setCode(ResponseCode.SUCCESS); 19 response.setRemark(null); 20 return response; 21 }
3 消費偏移量管理器更新偏移量的入口
1 public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, 2 final long offset) { 3 // topic@group 4 // 構建key: 主題/消費者組名 5 String key = topic + TOPIC_GROUP_SEPARATOR + group; 6 this.commitOffset(clientHost, key, queueId, offset); 7 }
4 將消費者端傳上來的消費偏移量存儲到內存之中的map
1 private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) { 2 // 使用 主題/消費者名 獲取存儲偏移量的map<queueId, offset> 3 ConcurrentMap<Integer, Long> map = this.offsetTable.get(key); 4 if (null == map) { 5 map = new ConcurrentHashMap<Integer, Long>(32); 6 map.put(queueId, offset); 7 this.offsetTable.put(key, map); 8 } else { 9 Long storeOffset = map.put(queueId, offset); 10 if (storeOffset != null && offset < storeOffset) { 11 log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset); 12 } 13 } 14 }
2.2.2 消息偏移量持久化到磁盤
1、啟動定時任務,該定時任務在BrokerController中被啟動的;
1 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { 2 @Override 3 public void run() { 4 try { 5 // 持久化偏移量 6 BrokerController.this.consumerOffsetManager.persist(); 7 } catch (Throwable e) { 8 log.error("schedule persist consumerOffset error.", e); 9 } 10 } 11 }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
2、調用ConsuemerOffsetManager
進行偏移量持久化
1 public synchronized void persist() { 2 // 先進行編碼 3 String jsonString = this.encode(true); 4 if (jsonString != null) { 5 // 獲取存儲文件的路徑 6 String fileName = this.configFilePath(); 7 try { 8 // 將存儲內容存到磁盤 9 MixAll.string2File(jsonString, fileName); 10 } catch (IOException e) { 11 log.error("persist file " + fileName + " exception", e); 12 } 13 } 14 }