文章部分圖片來自參考資料,侵刪
概述
我們從前面的發送流程知道某個主題的消息到了broker 的 messageque 里,假如讓我們來設計一個消息隊列的消費者過程,那么多個消費者應該如何消費數量較少的 messagequeue 呢?消費者有兩種消費模式 : 廣播模式和集群模式 ,廣播模式很好理解就是消費所有的消息;集群模式相當於多個消費者邏輯上認為是一個整體,最通俗的理解就是一個消息在集群里面只有一個消費者消費就算消費完成。 那么集群模式的消費方式應該按照何種策略呢?再一個既然集群模式只允許一個消費者消費,那么如何阻止其他消費者消費呢? 獲取消費的方式也有兩種,是broker 自己推過來呢還是消費者自己去拉呢? 根據這幾個問題我們將帶着疑問去看看 rocketmq 如何設計。
消息隊列負載均衡
消費隊列的負載均衡解決的就是消費者該去哪里消費的問題,從而達到消費均衡。
集群模式
group組里只要有一個人消費就算消費成功了,分多種策略
平均分配策略
在group里平均分配消息,例如3個節點9條消息,使用除法,每人3條消息。圖片來自參考資料,侵刪。
一致性哈希策略
(分布式哈希一致性)[https://www.cnblogs.com/Benjious/p/11899188.html]通過這篇文章可以了解一致性哈希
- 平均分配輪詢策略 同樣是上面的例子,輪詢分配,同樣是策略,rocketmq同樣會利用虛擬節點防止hash環上節點不均衡的情況,而落在環上的keys這是broker上的隊列。
廣播模式.
全量消費,很好理解
消費模式在代碼中實現
消息隊列的負載均衡是由一個不停運行的均衡服務來定時執行的,執行的邏輯RebalanceImpl 來實現,
public void doRebalance(final boolean isOrder) {
Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
this.truncateMessageQueueNotMyTopic();
}
public ConcurrentMap<String, SubscriptionData> getSubscriptionInner() {
return subscriptionInner;
}
private void rebalanceByTopic(final String topic, final boolean isOrder) {
switch (messageModel) {
//廣播是存儲在本地的,集群是存儲在broker
case BROADCASTING: {
//廣播,那么必定是去本地查找
...
}
case CLUSTERING: {
//集群模式消費進度存儲在遠程broker 端
...
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
//調用分配策略
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
mqAll,
cidAll);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
e);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
}
}
}
默認是使用 AllocateMessageQueueAveragely 分配策略,這里我們思考一個問題,我們知道對於某條消息集群模式下只有一個消費者可以消費,消息是被放進 messagequeue 里的,當 消費者的數量大於 messagequeue 的數量的時候,那么該如何分配呢?一個messagequeue 可以分配多個消費者嗎?下面我寫了個測試,分配的邏輯和 AllocateMessageQueueAveragely 一樣。
public static void main(String[] args) {
Main m = new Main();
m.test();
}
public List<MessageQueue> op( String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
List<MessageQueue> result = new ArrayList<>();
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}
public void test(){
List<String> cidAll = new ArrayList<>();
for (int i=0; i<8; i++) {
String cid = "192.168.10.86@" + i;
cidAll.add(cid);
}
List<MessageQueue> mqAll = new ArrayList<>();
for (int i=0; i<3; i++) {
MessageQueue mq = new MessageQueue(i);
mqAll.add(mq);
}
for (String cid :
cidAll) {
List<MessageQueue> op = op(cid, mqAll, cidAll);
System.out.println("當前 cid : "+ cid + " ");
for (MessageQueue mq : op) {
System.out.println("去 "+ mq.no +" 號拿消息進行消息");
}
}
}
當前 cid : 192.168.10.86@0 去 0 號拿消息進行消息 當前 cid : 192.168.10.86@1 去 1 號拿消息進行消息 當前 cid : 192.168.10.86@2 去 2 號拿消息進行消息 當前 cid : 192.168.10.86@3 當前 cid : 192.168.10.86@4 當前 cid : 192.168.10.86@5 當前 cid : 192.168.10.86@6 當前 cid : 192.168.10.86@7
(忽略我不規范的命名,哈哈)可以看到在集群模式下要是有多余的消費者那么他們肯定是餓着,分配不到messagequeue 。
消費進度
某個消費者消費了某個消息,那么如何在標識該消費“已被我消費了呢”,也就是保存進度的問題 ,對於廣播模式,保存進度是在broker 端中保存的,而集群模式這保存是在客戶端本地。存儲進度主要是 offsetStore接口,它的子類實現 LocalFileOffsetStore 和 RemoteBrokerOffsetStore 分別對應這本地儲存和遠程儲存
集群模式下
以下內容來自參考資料,作者寫的非常
在消費者客戶端,RebalanceService 服務會定時地 (默認 20 秒) 從 Broker 服務器獲取當前客戶端所需要消費的消息隊列,並與當前消費者客戶端的消費隊列進行對比,看是否有變化。對於每個消費隊列,會從 Broker 服務器查詢這個隊列當前的消費偏移量。然后根據這幾個消費隊列,創建對應的拉取請求 PullRequest 准備從 Broker 服務器拉取消息,如下圖所示:
當從 Broker 服務器拉取下來消息以后,只有當用戶成功消費的時候,才會更新本地的偏移量表。本地的偏移量表再通過定時服務每隔 5 秒同步到 Broker 服務器端:
public class MQClientInstance {
private void startScheduledTask() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
MQClientInstance.this.persistAllConsumerOffset();
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
}
}
而維護在 Broker 服務器端的偏移量表也會每隔 5 秒鍾序列化到磁盤中:
public class BrokerController {
public boolean initialize() throws CloneNotSupportedException {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
BrokerController.this.consumerOffsetManager.persist();
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
}
}
拉取消費
rocketmq 的 push 實際都是利用不斷地去 pull 來達到 push 的效果。 push 實際是用 pull 實現的,開始的時候內存為空,生成 pullRequest 然后去 broker 請求數據,請求回來后再次生成 pullRequest再次去請求,去broker拉取消費進行的消費的服務 : PullMessageService ,它接受 PullRequest
public class PullRequest {
private MessageQueue messageQueue;
private ProcessQueue processQueue;
}
public class ProcessQueue {
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
}
PullRequest 關聯MessageQueue 和 ProcessQueue ,ProcessQueue 是指某個MessageQueue的消費進度抽象
/**
* Queue consumption snapshot
*
*/
public class ProcessQueue {
...
private final Logger log = ClientLogger.getLog();
//讀寫鎖
private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
// TreeMap 是可以排序的 map(紅黑樹實現)
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
private final AtomicLong msgCount = new AtomicLong();
private final AtomicLong msgSize = new AtomicLong();
private final Lock lockConsume = new ReentrantLock();
/**
* A subset of msgTreeMap, will only be used when orderly consume
*/
private final TreeMap<Long, MessageExt> consumingMsgOrderlyTreeMap = new TreeMap<Long, MessageExt>();
private final AtomicLong tryUnlockTimes = new AtomicLong(0);
}
可以看到ProcessQueue維護兩個消息樹為了就是記錄消費的進度,這在后面會介,我們也可以大概地猜測到 PullRequest 實際應該的含義是某個指定的 MessageQueue 進度發生了變化,就會生成一個 PullRequest 去遠程拉取消費進行消費。 服務器在收到客戶端的請求之后,會根據話題和隊列 ID 定位到對應的消費隊列。然后根據這條請求傳入的 offset 消費隊列偏移量,定位到對應的消費隊列文件。偏移量指定的是消費隊列文件的消費下限。
public class DefaultMessageStore implements MessageStore {
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
final int maxMsgNums,
final MessageFilter messageFilter) {
// ...
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
if (consumeQueue != null) {
// 首先根據消費隊列的偏移量定位消費隊列
SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
if (bufferConsumeQueue != null) {
try {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
// 最大消息長度
final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);
// 取消息
for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
// 根據消息的偏移量和消息的大小從 CommitLog 文件中取出一條消息
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
getResult.addMessage(selectResult);
status = GetMessageStatus.FOUND;
}
// 增加下次開始的偏移量
nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
} finally {
bufferConsumeQueue.release();
}
}
}
// ...
}
}
客戶端和 Broker 服務器端完整拉取消息的流程圖如下所示:
消費消費
順序消費和並發消費,順序消費指的是消費同一個 messagequeue 里的消息,從而達到順序消費的目的。
broker 中記錄的信息
consumerFilter.json
消費者過濾相關
consumerOffset.json
消費者消費broker各個隊列到了哪個位置
{
"offsetTable":{
"%RETRY%generalCallbackGroup@generalCallbackGroup":{0:0
},
"Jodie_topic_1023@CID_JODIE_1":{0:10,1:11,2:10,3:9
},
"PayTransactionTopic@mq_test_callback":{0:0,1:0,2:1,3:1
},
}
}
可以看到是有json表示的是“topic + group ”中的四個隊列的消費情況
delayOffset.json 、
延時相關
subscriptionGroup.json
訂閱相關,group相關的配置, 消費者訂閱了哪些topic
{
"dataVersion":{
"counter":1,
"timestamp":1572054949837
},
"subscriptionGroupTable":{
"CID_ONSAPI_OWNER":{
"brokerId":0,
"consumeBroadcastEnable":true,
"consumeEnable":true,
"consumeFromMinEnable":true,
"groupName":"CID_ONSAPI_OWNER",
"notifyConsumerIdsChangedEnable":true,
"retryMaxTimes":16,
"retryQueueNums":1,
"whichBrokerWhenConsumeSlowly":1
},
"CID_ONSAPI_PERMISSION":{
"brokerId":0,
"consumeBroadcastEnable":true,
"consumeEnable":true,
"consumeFromMinEnable":true,
"groupName":"CID_ONSAPI_PERMISSION",
"notifyConsumerIdsChangedEnable":true,
"retryMaxTimes":16,
"retryQueueNums":1,
"whichBrokerWhenConsumeSlowly":1
},
...
}
}
topics.json
topic相關配置,broker 中擁有那些topic
{
"dataVersion":{
"counter":5,
"timestamp":1573745719274
},
"topicConfigTable":{
"TopicTest":{
"order":false,
"perm":6,
"readQueueNums":4,
"topicFilterType":"SINGLE_TAG",
"topicName":"TopicTest",
"topicSysFlag":0,
"writeQueueNums":4
},
"%RETRY%please_rename_unique_group_name_4":{
"order":false,
"perm":6,
"readQueueNums":1,
"topicFilterType":"SINGLE_TAG",
"topicName":"%RETRY%please_rename_unique_group_name_4",
"topicSysFlag":0,
"writeQueueNums":1
}
...
}
}
consumerQueue 圖例
消費消息
消費消息有並發消費和順序消費兩種,主要的核心實現就是 ConsumeMessageConcurrentlyService 和 ConsumeMessageOrderlyService ,又它們繼承的接口看的出來他們都是持有了一個線程池,並在線程池內進行消費。
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void pullMessage(final PullRequest pullRequest) {
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
switch (pullResult.getPullStatus()) {
case FOUND:
// 消息放入處理隊列的消息樹中
boolean dispathToConsume = processQueue
.putMessage(pullResult.getMsgFoundList());
// 提交一個消息消費請求
DefaultMQPushConsumerImpl.this
.consumeMessageService
.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispathToConsume);
break;
}
}
}
};
}
}
下面的代碼可以看到任務來自自己投到線程池執行。
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
class ConsumeRequest implements Runnable {
@Override
public void run() {
// ...
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
// ...
}
}
}
消費完后
public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
public void processConsumeResult(final ConsumeConcurrentlyStatus status, /** 其它參數 **/) {
// 從消息樹中刪除消息
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
//假如某個隊列在這個時候剛好給移除了,不提交進度,這可能會存在重復消費的情況,所有客戶端還是要自己做冪等處理
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore()
.updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
}
而有序消費就有趣多了,我們先思考一下,順序消費是在線程池執行了,那么如何保證有序呢,加鎖。假如在執行的時候剛好進行rebalance,移除了該隊列的消費,那么有序消費就不能進行了,什么意思呢?假設 Consumer-1 消費者客戶端一開始需要消費 3 個消費隊列,這個時候又加入了 Consumer-2 消費者客戶端,並且分配到了 MessageQueue-2 消費隊列。當 Consumer-1 內部的均衡服務檢測到當前消費隊列需要移除 MessageQueue-2 隊列,
可以看到要是2號messagequeue 此時正在執行有序消費,然后卻被另一個消費者進行消費,那么就不能保證有序消費了,於是在 broker 端應該也要有把鎖,保證messagequeue在被有序消費時只有一個消費者持有,而上面的場景也一樣,當消費者不再從messagequeue 消費的時候,也會向broker申請釋放鎖。
public abstract class RebalanceImpl {
private boolean updateProcessQueueTableInRebalance(final String topic,
final Set<MessageQueue> mqSet,
final boolean isOrder) {
while (it.hasNext()) {
// ...
if (mq.getTopic().equals(topic)) {
// 當前客戶端不需要處理這個消息隊列了
if (!mqSet.contains(mq)) {
pq.setDropped(true);
// 解鎖
if (this.removeUnnecessaryMessageQueue(mq, pq)) {
// ...
}
}
// ...
}
}
}
}
class ConsumeRequest implements Runnable {
private final ProcessQueue processQueue;
private final MessageQueue messageQueue;
public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
this.processQueue = processQueue;
this.messageQueue = messageQueue;
}
public ProcessQueue getProcessQueue() {
return processQueue;
}
public MessageQueue getMessageQueue() {
return messageQueue;
}
@Override
public void run() {
if (this.processQueue.isDropped()) {
log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
//獲取鎖,保證線程池內只有一個線程可以對該messageQueue 進行消費
synchronized (objLock) {
//廣播消費 ,或是processQueue.isLocked()已經鎖住了,或是鎖沒過期
//那么 processQueue.isLocked() 什么時候返回true 呢?ConsumeMessageOrderlyService內有個定時任務,周期去broker 中鎖住這個 messagequeue
//上文已經講了 processQueue 和 messagequeue 是一一對應的。
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
...
try {
//再次獲取鎖,這里的鎖有什么用呢?我們通過查找processQueue上鎖的地方,發現就是在 Rebalance重新分配消費隊列的時候會上鎖
//為了保證此刻不被其他消費者占用於是上鎖
this.processQueue.getLockConsume().lock();
if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
break;
}
//業務邏輯回調
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
} finally {
this.processQueue.getLockConsume().unlock();
}
.....
}
}
}
}
補充
service構建源碼分析
Rocketmq 中創建一個service部分都繼承了 ServiceThread,讓我們開看一下源碼
public abstract class ServiceThread implements Runnable {
private static final Logger log = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
private static final long JOIN_TIME = 90 * 1000;
//保存了一個線程
protected final Thread thread;
/**
* 主要的阻塞方法是 await 方法,然后通過 countdown 來喚醒正在 await 的線程,每次多個線程調用進行 waitForRunning 的時候
* waitPoint 的 柵欄數量都會重置(阻塞,stop后被喚醒,又再次阻塞的情況)。
*/
protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
protected volatile boolean stopped = false;
//初始化的時候就創建一個線程
public ServiceThread() {
this.thread = new Thread(this, this.getServiceName());
}
public abstract String getServiceName();
public void start() {
this.thread.start();
}
public void shutdown() {
this.shutdown(false);
}
public void shutdown(final boolean interrupt) {
this.stopped = true;
log.info("shutdown thread " + this.getServiceName() + " interrupt " + interrupt);
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
try {
if (interrupt) {
this.thread.interrupt();
}
long beginTime = System.currentTimeMillis();
if (!this.thread.isDaemon()) {
this.thread.join(this.getJointime());
}
long eclipseTime = System.currentTimeMillis() - beginTime;
log.info("join thread " + this.getServiceName() + " eclipse time(ms) " + eclipseTime + " "
+ this.getJointime());
} catch (InterruptedException e) {
log.error("Interrupted", e);
}
}
public long getJointime() {
return JOIN_TIME;
}
public void stop() {
this.stop(false);
}
public void stop(final boolean interrupt) {
this.stopped = true;
log.info("stop thread " + this.getServiceName() + " interrupt " + interrupt);
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
if (interrupt) {
this.thread.interrupt();
}
}
public void makeStop() {
this.stopped = true;
log.info("makestop thread " + this.getServiceName());
}
public void wakeup() {
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
}
/**
* 當只有一個線程的時候直接案通過CAS 成功后執行,多個線程則需要等待一段時間間隔后執行
*
* @param interval 等待的時間
*/
protected void waitForRunning(long interval) {
if (hasNotified.compareAndSet(true, false)) {
this.onWaitEnd();
return;
}
//entry to wait
waitPoint.reset();
try {
waitPoint.await(interval, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error("Interrupted", e);
} finally {
hasNotified.set(false);
this.onWaitEnd();
}
}
protected void onWaitEnd() {
}
public boolean isStopped() {
return stopped;
}
}
可以知道他的構造創建了一個線程用於執行子類實現的run 方法,而自身的countDownLatch 和 Atomic原子類主要是用來應對多個線程同時操作的情況。
參考資料
- http://silence.work/2019/03/03/RocketMQ%20%E6%B6%88%E8%B4%B9%E6%B6%88%E6%81%AF%E8%BF%87%E7%A8%8B%E5%88%86%E6%9E%90/
- https://www.kunzhao.org/blog/2018/04/08/rocketmq-message-index-flow/
- https://cloud.tencent.com/developer/article/1554950





