1. PushConsumer
推,Broker主動向Consumer推消息,它Consumer的一種,應用通常向對象注冊一個Listener接口,一旦接收到消息,Consumer對象立刻回調Linstener接口方法。Push方式里,consumer把輪詢過程封裝了,並注冊MessageListener監聽器,取到消息后,喚醒MessageListener的consumeMessage()來消費,對用戶而言,感覺消息是被推送過來的。
缺點:
慢消費無疑是Push模式最大的致命傷,如果消費者的速度比發送者的速度慢很多,勢必造成消息在broker的堆積。假設這些消息都是有用的無法丟棄的,消息就要一直在broker端保存。當然這還不是最致命的,最致命的是broker給consumer推送一堆consumer無法處理的消息,consumer不是reject就是error,然后來回踢皮球。所以push適合於沒有慢消費情況的場景下
前面使用的所有示例中的Consuemr都是采用的push方式,所以這里就不在具體寫示例代碼了。
2. PullConsumer
拉,Consumer主動的從Broker拉取消息,主動權由應用控制,可以實現批量的消費消息。Pull方式里,取消息的過程需要用戶自己寫,首先通過打算消費的Topic拿到MessageQueue的集合,遍歷MessageQueue集合,然后針對每個MessageQueue批量取消息,一次取完后,記錄該隊列下一次要取的開始offset,直到取完了,再換另一個MessageQueue。
反觀Pull模式,consumer可以按需消費,不用擔心自己處理不了的消息來騷擾自己,而broker堆積消息也會相對簡單,無需記錄每一個要發送消息的狀態,只需要維護所有消息的隊列和偏移量就可以了。所以對於慢消費,消息量有限且到來的速度不均勻的情況,pull模式比較合適消息延遲與忙等。
這是Pull模式最大的短板。由於主動權在消費方,消費方無法准確地決定何時去拉取最新的消息。如果一次Pull取到消息了還可以繼續去Pull,如果沒有Pull取到則需要等待一段時間重新Pull,在阿里中的解決是長輪詢 Pull,消費者如果嘗試拉取失敗,不是直接return,而是把連接掛在那里wait,服務端如果有新的消息到來,把連接notify起來,這也是不錯的思路。但海量的長連接block對系統的開銷還是不容小覷的,還是要合理的評估時間間隔,給wait加一個時間上限比較好。
使用PullConsumser的示例代碼如下:
package com.wangx.rocketmq.quickstart; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; /** * */ public class PullConsumer { //保存上一次消費的消息位置 private static final Map offsetTable = new HashMap (); public static void main(String[] args) throws MQClientException { //實例化pullConsumer DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_consumer_group_name_5"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.start(); //拉取PullConsumerTopicTest topic下的所有消息隊列 Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("PullConsumerTopicTest"); //遍歷消息隊列 for (MessageQueue mq : mqs) { System.err.println("Consume from the queue: " + mq); SINGLE_MQ: // while (true) { try { //設置上次消費消息下標 PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); System.out.println(pullResult); putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { //根據結果狀態,如果找到消息,批量消費消息 case FOUND: List<MessageExt> messageExtList = pullResult.getMsgFoundList(); for (MessageExt m : messageExtList) { System.out.println(new String(m.getBody())); } break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break SINGLE_MQ; case OFFSET_ILLEGAL: break; default: break; } } catch (Exception e) { e.printStackTrace(); } } } consumer.shutdown(); } //保存上次消費的消息下標,這里使用了一個全局HashMap來保存 private static void putMessageQueueOffset(MessageQueue mq, long offset) { offsetTable.put(mq, offset); } //獲取上次消費的消息的下表 private static long getMessageQueueOffset(MessageQueue mq) { Long offset = (Long) offsetTable.get(mq); if (offset != null) { return offset; } return 0; } }
Producer跟普通或其他消息發送方式一樣即可。