RocketMQ學習筆記(11)----RocketMQ的PushConsumer和PullConsumer


1. PushConsumer

  推,Broker主動向Consumer推消息,它Consumer的一種,應用通常向對象注冊一個Listener接口,一旦接收到消息,Consumer對象立刻回調Linstener接口方法。Push方式里,consumer把輪詢過程封裝了,並注冊MessageListener監聽器,取到消息后,喚醒MessageListener的consumeMessage()來消費,對用戶而言,感覺消息是被推送過來的。

  缺點:

  慢消費無疑是Push模式最大的致命傷,如果消費者的速度比發送者的速度慢很多,勢必造成消息在broker的堆積。假設這些消息都是有用的無法丟棄的,消息就要一直在broker端保存。當然這還不是最致命的,最致命的是broker給consumer推送一堆consumer無法處理的消息,consumer不是reject就是error,然后來回踢皮球。所以push適合於沒有慢消費情況的場景下

  前面使用的所有示例中的Consuemr都是采用的push方式,所以這里就不在具體寫示例代碼了。

2. PullConsumer

  拉,Consumer主動的從Broker拉取消息,主動權由應用控制,可以實現批量的消費消息。Pull方式里,取消息的過程需要用戶自己寫,首先通過打算消費的Topic拿到MessageQueue的集合,遍歷MessageQueue集合,然后針對每個MessageQueue批量取消息,一次取完后,記錄該隊列下一次要取的開始offset,直到取完了,再換另一個MessageQueue。

  反觀Pull模式,consumer可以按需消費,不用擔心自己處理不了的消息來騷擾自己,而broker堆積消息也會相對簡單,無需記錄每一個要發送消息的狀態,只需要維護所有消息的隊列和偏移量就可以了。所以對於慢消費,消息量有限且到來的速度不均勻的情況,pull模式比較合適消息延遲與忙等。

  這是Pull模式最大的短板。由於主動權在消費方,消費方無法准確地決定何時去拉取最新的消息。如果一次Pull取到消息了還可以繼續去Pull,如果沒有Pull取到則需要等待一段時間重新Pull,在阿里中的解決是長輪詢 Pull,消費者如果嘗試拉取失敗,不是直接return,而是把連接掛在那里wait,服務端如果有新的消息到來,把連接notify起來,這也是不錯的思路。但海量的長連接block對系統的開銷還是不容小覷的,還是要合理的評估時間間隔,給wait加一個時間上限比較好。

  使用PullConsumser的示例代碼如下:

package com.wangx.rocketmq.quickstart;

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
 * 
 */
public class PullConsumer {
    //保存上一次消費的消息位置
    private static final Map offsetTable = new HashMap ();
    public static void main(String[] args) throws MQClientException {

        //實例化pullConsumer
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_consumer_group_name_5");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.start();
        //拉取PullConsumerTopicTest topic下的所有消息隊列
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("PullConsumerTopicTest");

        //遍歷消息隊列
        for (MessageQueue mq : mqs) {
            System.err.println("Consume from the queue: " + mq);
            SINGLE_MQ:
            //
            while (true) {
                try {
                    //設置上次消費消息下標
                    PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.println(pullResult);
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        //根據結果狀態,如果找到消息,批量消費消息
                        case FOUND:
                            List<MessageExt> messageExtList = pullResult.getMsgFoundList();
                            for (MessageExt m : messageExtList) {
                                System.out.println(new String(m.getBody()));
                            }
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        consumer.shutdown();
    }
    //保存上次消費的消息下標,這里使用了一個全局HashMap來保存
    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        offsetTable.put(mq, offset);
    }

    //獲取上次消費的消息的下表
    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = (Long) offsetTable.get(mq);
        if (offset != null) {
            return offset;
        }
        return 0;
    }
}

  Producer跟普通或其他消息發送方式一樣即可。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM