一、Consumer 介紹
1.1 核心參數
* consumerGroup:消費者組名
* MessageModel:消息模型,定義了消息傳遞到消費者的方式,默認是 MessageModel.CLUSTERING
* MessageModel.BROADCASTING:廣播
* MessageModel.CLUSTERING:集群
* consumeFromWhere: 消費者開始消費的位置,默認值是 ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET
* ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET:從隊列最后的位置開始消費
* ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET:從隊列前面最開始消費
* ConsumeFromWhere.CONSUME_FROM_TIMESTAMP: 從指定時間開始消費,之前的消息將會被忽略
* consumeTimestamp:
* allocateMessageQueueStrategy:消息分配策略
* subscription:訂閱關系
* offsetStore:存儲消息偏移量
* consumeThreadMin:線程池最小值,默認值是20
* consumeThreadMax:線程池最大值,默認值是20
* consumeConcurrentlyMaxSpan:單個隊列並行消費最大的跨度,默認2000
* pullThresholdForQueue:一個隊列最大的消費個數,默認1000
* pullInterval:消息拉取的時間間隔
* pullBatchSize:消息拉取的個數,默認32啊
* consumeMessageBatchMaxSize:批量消費量,默認1
* messageListener:消息監聽器,用來處理消息,它有兩個實現類
* MessageListenerOrderly:按順序一個個消費
* MessageListenerConcurrently:並行消費
二、消費模式
2.1 集群模式
* 同一個 consumerGroup 里,並且訂閱的 tag 也必須是一樣的,這樣的 consumer 實例才能組成 consumer 集群;
* 當 consumer 使用集群消費時,每條消息只會被 consumer 集群內的任意一個 consumer 實例消費一次;
* 默認的消費模式就是集群模式;
* 集群模式天然實現負載均衡機制
2.2 廣播模式
* 同一個 consumerGroup 里的 Consumer 會消費訂閱 Topic 的全部消息
* 通過 consumer.setMessageModel(MessageModel.BROADCASTING) 方法設置
三、Offset 介紹
3.1 Offset 是什么
* 在 RocketMQ 中,相同類型的消息會放到一個 Topic 里,為了可以並行操作,一個 Topic 會有多個 MessageQueue。
* Offset 是指某個 Topic 下的一條消息在某個 MessageQueue 里的位置;
* 通過 Offset 的值可以定位到這條消息
3.2 Offset 類結構
從類結構可以看出 Offset 分為本地文件類型和遠程文件類型。
3.2 消費模式采用的 Offset 類型
* 集群模式下因為每個 Consumer 消費所訂閱主題的一部分,所以采用遠程文件存儲 Offset;
* 廣播模式下,由於每個 Consumer 需要消費所有的消息,所以采用本地文件存儲 Offset。
3.3 Offset 文件存儲格式
OffseStore 使用 Json 格式存儲,例如:
{
"OffsetTable":{
1:{
"brokeName":"localhost",
"QueueId":1,
"Topic":"broker1"
},
2:{
"brokeName":"localhost",
"QueueId":2,
"Topic":"broker2"
}
}
}
四、不同類型的消費者
根據對讀取操作的控制情況,可以消費者分為兩種類型。一個是 DefaultMQPushConsumer,由系統控制讀取操作,收到消息后自動調用傳入的處理方法來處理;另一個是 DefaultMQPullConsumer ,讀取操作中的大部分功能由使用者自主控制。
4.1 DefaultMQPushConsumer
DefaultMQPushConsumer 只需要設置好各種參數和設置傳入處理消息的回調函數即可,系統收到消息后會自動調用處理函數來處理消息,而且加入新的 DefaultMQPushConsumer 后會自動做負載均衡。
4.1.1 實例
public class Consumer {
public static void main(String[] args) throws MQClientException {
// 創建消費者對象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroupName");
// 設置服務器地址
consumer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);
// 訂閱指定主題
consumer.subscribe("topicTest","*");
// 注冊消息監聽事件
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println("msg:"+msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 啟動消費者
consumer.start();
}
}
4.2 DefaultMQPullConsumer
4.2.1 消費步驟
1) 讀取 topic 的消息隊列 message queue 的信息;
2) 按隊列去拉取一定數目的消息;
3) 持久化message queue的消費進度 offset;
4) 根據不同的消息狀態做不同的處理
4.2.2 拉取結果狀態
public enum PullStatus {
// 拉取成功
FOUND,
// 沒有消息可以拉取
NO_NEW_MSG,
// 過濾結果不匹配
NO_MATCHED_MSG,
// 偏移量非法,太大或太小
OFFSET_ILLEGAL
}
4.2.3 實例
public class PullConsumer {
// 本地 offset 存儲容器,生產環境可以放到數據庫或 Redis 中
private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer("DefaultMQPullConsumer");
// 設置服務器地址
pullConsumer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);
// 啟動消費者
pullConsumer.start();
// 從指定 topic 獲取所有的隊列
Set<MessageQueue> messageQueues = pullConsumer.fetchSubscribeMessageQueues("topicTest");
// 遍歷隊列,拉取消息
for (MessageQueue mq : messageQueues) {
System.out.printf("從隊列中消費: %s%n", mq);
SINGLE_MQ:
while (true) {
try {
// 獲取 offset
Long offset = getMessageQueueOffset(mq);
// 拉取32個消息
PullResult pullResult =
pullConsumer.pullBlockIfNotFound(mq, null, offset, 32);
System.out.printf("%s%n", pullResult);
// 保存 offset
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
pullConsumer.shutdown();
}
// 保存上次消費的消息下標
private static void putMessageQueueOffset(MessageQueue mq,
long nextBeginOffset) {
OFFSE_TABLE.put(mq, nextBeginOffset);
}
// 獲取上次消費的消息的下標
private static Long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if (offset != null) {
return offset;
}
return 0l;
}
}
4.3 DefaultLitePullConsumer
**DefaultMQPullConsumer ** 已經被標識為廢棄,替代的是 DefaultLitePullConsumer,下面我們就直接使用 DefaultLitePullConsumer 來操作。
public class LitePullConsumer {
public static volatile boolean running = true;
public static void main(String[] args) throws Exception {
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("LitePullConsumer");
// 設置服務器地址
litePullConsumer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);
// 關閉自動提交偏移量
litePullConsumer.setAutoCommit(false);
// 啟動消費者
litePullConsumer.start();
// 獲取隊列
Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("topicTest");
List<MessageQueue> list = new ArrayList<>(mqSet);
List<MessageQueue> assignList = new ArrayList<>();
for (int i = 0; i < list.size() / 2; i++) {
assignList.add(list.get(i));
}
litePullConsumer.assign(assignList);
litePullConsumer.seek(assignList.get(0), 10);
try {
while (running) {
List<MessageExt> messageExts = litePullConsumer.poll();
System.out.printf("%s %n", messageExts);
litePullConsumer.commitSync();
}
} finally {
litePullConsumer.shutdown();
}
}
}