長輪詢 (RocketMQ使用)
Consumer -> Broker RocketMQ采用的長輪詢建立連接
- consumer的處理能力Broker不知道
- 直接推送消息 broker端壓力較大
- 采用長連接有可能consumer不能及時處理推送過來的數據
- pull主動權在consumer手里
短輪詢
client不斷發送請求到server,每次都需要重新連接
長輪詢
client發送請求到server,server有數據返回,沒有數據請求掛起不斷開連接
長連接
連接一旦建立,永遠不斷開,push方式推送
消費端
主要從以下 5步操作 進行源碼跟蹤
- new出 DefaultMQPushConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer");
- 設置Namesrv地址
consumer.setNamesrvAddr("192.168.88.134:9876");
- 訂閱topic,並進行過濾。( pullMessageService 啟動后 ,會看到內部如何操作)
- DefaultMQPushConsumer的方法由 defaultMQPushConsumerImpl類進行真正實現
- 返回subscriptionData ,subExpression要么指定,要么為*
- mQClientFactory會在 消費客戶端啟動后,向broker發送心跳包
consumer.subscribe("tagTopic", "TAG-A");
-----------------------------------------------------------------------------------------
public void subscribe(String topic, String subExpression) throws MQClientException {
this.defaultMQPushConsumerImpl.subscribe(this.withNamespace(topic), subExpression);
}
-----------------------------------------------------------------------------------------
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
-----------------------------------------------------------------------------------------
if (this.mQClientFactory != null) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
- 設置消息監聽,並回調
consumer.registerMessageListener
默認情況下 這條消息只會被 一個consumer 消費到 點對點
message 狀態修改 ( 由broker進行維護 )
ACK (重新投遞) 返回消費狀態--->CONSUME_SUCCESS 消費成功 || RECONSUME_LATER 消費失敗,重新消費
- 返回 Broker RECONSUME_LATER狀態時
- RocketMQ會把這批消息重發回Broker。(topic不是原topic而是這個消費租的RETRY topic 重發topic)
- 在延遲的某個時間點(默認是10秒,業務可設置)后,再次投遞到這個ConsumerGroup的另一個消費者。
- 如果一直這樣重復消費都持續失敗到一定次數(默認16次),就會投遞到DLQ死信隊列。應用可以監控死信隊列來做人工干預。
- 啟動 消費客戶端 ( 開啟 traceDispatcher 追蹤調度 )
consumer.start();
this.defaultMQPushConsumerImpl.start();
- 針對 ServiceState 狀態進行操作
剛剛創建 CREATE_JUST,
正在運行 RUNNING,
已經關閉 SHUTDOWN_ALREADY,
開啟失敗 START_FAILED;
- 檢查配置,獲取訂閱列表 SubscriptionData
this.checkConfig();
this.copySubscription();
- 獲取MQClient實例
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
- 注冊消費者,並開啟客戶端
boolean registerOK = this.mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
------------------------------------------------------------------------
this.mQClientFactory.start();
consumeMessageService啟動
this.consumeMessageService.start();
MQClientInstance啟動流程
this.mQClientAPIImpl.start();
this.startScheduledTask();
this.pullMessageService.start();
this.rebalanceService.start();
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
NettyRemotingClient啟動
- 啟動Netty遠程調用Client (4個工作線程),創建事件執行組並放入Netty管道中
- NettyRemotingClient定時掃描 ResponseTable
- 遍歷responseTable ,ConcurrentHashMap類型 ,初始容量256
- 對於超時請求 進行刪除操作
this.mQClientAPIImpl.start();
private int clientWorkerThreads = 4;
----------------------------------------------------------------------------------------
pipeline.addFirst(NettyRemotingClient.this.defaultEventExecutorGroup, "sslHandler", NettyRemotingClient.this.sslContext.newHandler(ch.alloc()));
NettyRemotingClient.log.info("Prepend SSL handler");
-----------------------------------------------------------------------------------------
this.timer.scheduleAtFixedRate(new TimerTask() {
public void run() {
try {
NettyRemotingClient.this.scanResponseTable();
} catch (Throwable var2) {
NettyRemotingClient.log.error("scanResponseTable exception", var2);
}
}
}, 3000L, 1000L);
protected final ConcurrentMap<Integer, ResponseFuture> responseTable = new ConcurrentHashMap(256);
if (rf.getBeginTimestamp() + rf.getTimeoutMillis() + 1000L <= System.currentTimeMillis()) {
rf.release();
it.remove();
rfList.add(rf);
log.warn("remove timeout request, " + rf);
}
-
channelEventListener 不為空, 開啟nettyEventExecutor 事件執行器( 啟動ServiceThread線程 )
-
org.apache.rocketmq.remoting.common 屬於Netty的ServiceThread
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
startScheduledTask啟動
- 每120s 判斷 NamesrvAddr地址,若為空,便去獲取新的地址
private void startScheduledTask() {
if (null == this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
public void run() {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception var2) {
MQClientInstance.this.log.error("ScheduledTask fetchNameServerAddr exception", var2);
}
}
}, 10000L, 120000L, TimeUnit.MILLISECONDS);
}
pullMessageService啟動 、 實現消息消費 ( 重點 )
-
org.apache.rocketmq.common 屬於Rocketmq的ServiceThread
public void start() {
log.info("Try to start service thread:{} started:{} lastThread:{}", new Object[]{this.getServiceName(), this.started.get(), this.thread});
if (this.started.compareAndSet(false, true)) {
this.stopped = false;
this.thread = new Thread(this, this.getServiceName());
this.thread.setDaemon(this.isDaemon);
this.thread.start();
}
}
final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue();
- 拉取消息服務
public void run() {
this.log.info(this.getServiceName() + " service started");
while(!this.isStopped()) {
try {
PullRequest pullRequest = (PullRequest)this.pullRequestQueue.take();
-----------------------------------------------------------------------------------------
this.pullMessage(pullRequest);
-----------------------------------------------------------------------------------------
} catch (InterruptedException var2) {
;
} catch (Exception var3) {
this.log.error("Pull Message Service Run Method exception", var3);
}
}
this.log.info(this.getServiceName() + " service end");
}
- LinkedBlockingQueue
pullRequestQueue 拉取隊列中取出一個拉取請求 - 獲取AtomicInteger、可中斷的ReentrantLock重入鎖
- lockInterruptibly(); 可中斷重入鎖 (一旦檢測到中斷請求,方法返回不再參與鎖競爭,直接拋出中斷異常)
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
-
dequeue 出隊列,出一個PullRequest拉取請求
-
PullRequest拉取請求包括:消費組,messageQueue(元消息隊列包括:topic、brokerName、queueId )
-
processQueue 處理隊列主要包括:(TreeMap<Long, MessageExt> 存放消息)
-
private String consumerGroup; private MessageQueue messageQueue; private ProcessQueue processQueue; private long nextOffset;
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
- 拉取消息服務中 this.pullMessage(pullRequest);
- 獲取消費者,准備進行processQueue 消費
private void pullMessage(PullRequest pullRequest) {
-----------------------------------------------------------------------------------------
MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
-----------------------------------------------------------------------------------------
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl)consumer;
-----------------------------------------------------------------------------------------
impl.pullMessage(pullRequest);
-----------------------------------------------------------------------------------------
} else {
this.log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
}
}
-
拉取采用異步回調方法,onSuccess( PullResult pullResult )
-
pullResult.getMsgFoundList() 結果為 List<MessageExt> msgFoundList
-
submitConsumeRequest 兩個實現 ConcurrentlyService 和 OrderlyService 多線程消費和順序消費
-
executePullRequestImmediately ,將pullRequest put () pullRequestQueue 中
PullCallback pullCallback = new PullCallback()
public void onSuccess(PullResult pullResult) {
switch(pullResult.getPullStatus()) {
case FOUND:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume);
-----------------------------------------------------------------------------------------
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0L) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
-----------------------------------------------------------------------------------------
this.pullAPIWrapper.pullKernelImpl(pullRequest.getMessageQueue(), subExpression, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), pullRequest.getNextOffset(), this.defaultMQPushConsumer.getPullBatchSize(), sysFlag, commitOffsetValue, 15000L, 30000L, CommunicationMode.ASYNC, pullCallback);
}
}
- this.pullAPIWrapper.pullKernelImpl + pullCallback 回調方法 處理拉取到消息PullResult
public class PullResult {
private final PullStatus pullStatus;
private final long nextBeginOffset;
private final long minOffset;
private final long maxOffset;
private List<MessageExt> msgFoundList;
-
ConcurrentlyService 並發消費服務
-
並發消費 和 順序消費 run執行體 大體相同,
-
主要區別在於:生產者向指定queue隊列發送消息,跟普通消息相比,順序消息的使用需要在producer的send()方法中添加MessageQueueSelector接口的實現類,並重寫select選擇使用的隊列,因為順序消息局部順序,需要將所有消息指定發送到同一隊列中。
-
消費者 設置最大最小線程數為1,並實現MessageListenerOrderly 接口進行消息消費
-
msgs 小於等於 consumeMessageBatchMaxSize ,new出 consumeRequest , 在線程池消費
-
if (msgs.size() <= consumeBatchSize)
-
若大於consumeMessageBatchMaxSize ,每次只能消費consumeMessageBatchMaxSize 數量的消息
-
private String consumerGroup; private List<MessageExt> msgList; private MessageQueue mq; private boolean success; private String status; private Object mqTraceContext; private Map<String, String> props; private String namespace;
-
並發消費線程池
this.consumeExecutor = new ThreadPoolExecutor(this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 60000L, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl("ConsumeMessageThread_"));
- MessageListenerConcurrently 並發消費監聽接口
-
public interface MessageListenerConcurrently extends MessageListener { ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> var1, ConsumeConcurrentlyContext var2); }
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new
ConsumeConcurrentlyContext(this.messageQueue);
ConsumeConcurrentlyStatus status = null;
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.resetRetryAndNamespace(this.msgs,ConsumeMessageConcurrentlyService.this.defaultMQPushConsumer.getConsumerGroup());
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(ConsumeMessageConcurrentlyService.this.defaultMQPushConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(ConsumeMessageConcurrentlyService.this.defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setProps(new HashMap());
consumeMessageContext.setMq(this.messageQueue);
consumeMessageContext.setMsgList(this.msgs);
consumeMessageContext.setSuccess(false);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}
long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
-
OrderlyService 順序消費服務
-
new出 consumeRequest , 在線程池消費
-
ConsumeMessageOrderlyService.ConsumeRequest consumeRequest = new ConsumeMessageOrderlyService.ConsumeRequest(processQueue, messageQueue);
-
consumeExecutor 順序消費線程池 執行 consumeRequest
-
this.consumeExecutor.submit(consumeRequest);
-
this.consumeExecutor = new ThreadPoolExecutor(this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 60000L, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl("ConsumeMessageThread_"));
-
consumeRequest實現Runnable接口,下面是它的run()方法
-
class ConsumeRequest implements Runnable { private final ProcessQueue processQueue; private final MessageQueue messageQueue;
-
取出List
消息集合
List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
- resetRetryAndNamespace 過濾重投消息
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
-
遍歷 消息集合, 找出屬性為RETRY_TOPIC 重投的消息 , 設置該消息的topic
-
String retryTopic = msg.getProperty("RETRY_TOPIC"); if (retryTopic != null && groupTopic.equals(msg.getTopic())) { msg.setTopic(retryTopic); } if (StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) { msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace())); }
if (!msgs.isEmpty()) {
ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
ConsumeOrderlyStatus status = null;
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setNamespace(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getNamespace());
consumeMessageContext.setMq(this.messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
consumeMessageContext.setProps(new HashMap());
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}
long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
-
MessageListenerOrderly 順序消費監聽接口 繼承了 messageListener 接口
-
MessageListenerOrderly 就是 我們在設置 監聽訂閱時 回調用的接口,重寫此方法進行消息消費
-
public interface MessageListenerOrderly extends MessageListener { ConsumeOrderlyStatus consumeMessage(List<MessageExt> var1, ConsumeOrderlyContext var2); }
-
就是這一行,如果重寫樂監聽接口,就能消費消息
-----------------------------------------------------------------------------------------
status = ConsumeMessageOrderlyService.this.messageListener.consumeMessage(
Collections.unmodifiableList(msgs), context );
-----------------------------------------------------------------------------------------
- 消息在消費前后 executeHookBefore,executeHookAfter ( Hook進行調用 )
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}
rebalanceService啟動
-
和pullMessageService一樣啟動 Rocketmq的ServiceThread
-
同一個抽象類 rebalanceService 和 pullMessageService 為具體實現
-
public abstract class ServiceThread implements Runnable
-
等待間隔
private static long waitInterval = Long.parseLong(System.getProperty("rocketmq.client.rebalance.waitInterval", "20000"));
public void run() {
this.log.info(this.getServiceName() + " service started");
while(!this.isStopped()) {
this.waitForRunning(waitInterval);
this.mqClientFactory.doRebalance();
}
this.log.info(this.getServiceName() + " service end");
}
- 進行負載
public void doRebalance() {
Iterator var1 = this.consumerTable.entrySet().iterator();
while(var1.hasNext()) {
Entry<String, MQConsumerInner> entry = (Entry)var1.next();
MQConsumerInner impl = (MQConsumerInner)entry.getValue();
if (impl != null) {
try {
impl.doRebalance();
} catch (Throwable var5) {
this.log.error("doRebalance exception", var5);
}
}
}
public void doRebalance() {
if (!this.pause) {
this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
}
}
- 獲取訂閱列表
- 廣播和集群兩種模式 BROADCASTING: CLUSTERING:
private void rebalanceByTopic(String topic, boolean isOrder) {
Set mqSet;
-----------------------------------------------------------------------------------------
廣播模式::::
switch(this.messageModel) {
case BROADCASTING:
mqSet = (Set)this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
-----------------------------------------------------------------------------------------
清理不重要的消息 ( 同一個topic下,清理沒有在topicSubscribeInfoTable訂閱列表中的MessageQueue )
boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
-----------------------------------------------------------------------------------------
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}", new Object[]{this.consumerGroup, topic, mqSet, mqSet});
}
} else {
log.warn("doRebalance, {}, but the topic[{}] not exist.", this.consumerGroup, topic);
}
break;
- 集群模式下:::
- 獲取同一個 消費組中 ,訂閱同一個topic的 消費者列表
集群模式::::
case CLUSTERING:
mqSet = (Set)this.topicSubscribeInfoTable.get(topic);
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, this.consumerGroup);
if (null == mqSet && !topic.startsWith("%RETRY%")) {
log.warn("doRebalance, {}, but the topic[{}] not exist.", this.consumerGroup, topic);
}
if (null == cidAll) {
log.warn("doRebalance, {} {}, get consumer id list failed", this.consumerGroup, topic);
}
if (mqSet != null && cidAll != null) {
List<MessageQueue> mqAll = new ArrayList();
mqAll.addAll(mqSet);
Collections.sort(mqAll);
Collections.sort(cidAll);
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List allocateResult = null;
- allocateMessageQueueStrategy 分配消息隊列策略
- 為當前消費端 分配消息隊列MessageQueue
try {
allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll);
} catch (Throwable var10) {
log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), var10);
return;
}
Set<MessageQueue> allocateResultSet = new HashSet();
if (allocateResult != null) {
allocateResultSet.addAll(allocateResult);
}
boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
if (changed) {
log.info("rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", new Object[]{strategy.getName(), this.consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), allocateResultSet.size(), allocateResultSet});
this.messageQueueChanged(topic, mqSet, allocateResultSet);
}
}
}
}
DefaultMQProducerImpl (消費端默認false,不啟動)
- true的話,傳入生產者相關配置 class DefaultMQProducer extends ClientConfig 創建生產者實例
- 在producerTable生產者列表中 , 進行生產者客戶端注冊( 本質ConcurrentMap )
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, this.rpcHook);
boolean registerOK = this.mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
ConcurrentMap<String, MQProducerInner> producerTable;