RocketMQ安裝部署及整合Springboot


消息中間件的功能:

  通過學習ActiveMq,kafka,rabbitMq這些消息中間件,我們大致能為消息中間件的功能做一下以下定義:可以先從基本的需求開始思考

  • 最基本的是要能支持消息的發送和接收,需要涉及到網絡通信就一定會涉及到NIO
  • 消息中心的消息存儲(持久化/非持久化)
  • 消息的序列化和反序列化
  • 是否跨語言
  • 消息的確認機制,如何避免消息重發

  高級功能:

  • 消息的有序性
  • 是否支持事務消息
  • 消息收發的性能,對高並發大數據量的支持
  • 是否支持集群
  • 消息的可靠性存儲
  • 是否支持多協議

MQ消息存儲選擇:

  從主流的幾種MQ消息隊列采用的存儲方式來看,主要會有三種

  1. 分布式KV存儲,比如ActiveMQ中采用的levelDB、Redis, 這種存儲方式對於消息讀寫能力要求不高的情況下可以使用
  2. 文件系統存儲,常見的比如kafka、RocketMQ、RabbitMQ都是采用消息刷盤到所部署的機器上的文件系統來做持久化,這種方案適合對於有高吞吐量要求的消息中間件,因為消息刷盤是一種高效率,高可靠、高性能的持久化方式,除非磁盤出現故障,否則一般是不會出現無法持久化的問題
  3. 關系型數據庫,比如ActiveMQ可以采用mysql作為消息存儲,關系型數據庫在單表數據量達到千萬級的情況下IO性能會出現瓶頸,所以ActiveMQ並不適合於高吞吐量的消息隊列場景。

  總的來說,對於存儲效率,文件系統要優於分布式KV存儲,分布式KV存儲要優於關系型數據庫.

RocketMQ的發展歷史:

  RocketMq是一個由阿里巴巴開源的消息中間件, 2012年開源,2017年成為apache頂級項目。它的核心設計借鑒了Kafka,所以我們在了解RocketMQ的時候,會發現很多和kafka相同的特性。同時呢,Rocket在某些功能上和kafka又有較大的差異,接下來我們就去了解RocketMQ

  • 支持集群模型、負載均衡、水平擴展能力
  • 億級別消息堆積能力
  • 采用零拷貝的原理,順序寫盤,隨機讀
  • 底層通信框架采用Netty NIO
  • NameServer代替Zookeeper,實現服務尋址和服務協調
  • 消息失敗重試機制、消息可查詢
  • 強調集群無單點,可擴展,任意一點高可用,水平可擴展
  • 經過多次雙十一的考驗

RocketMQ的架構:

  集群本身沒有什么特殊之處,和kafka的整體架構類似,其中zookeeper替換成了NameServer。在rocketmq的早版本(2.x)的時候,是沒有namesrv組件的,用的是zookeeper做分布式協調和服務發現,但是后期阿里數據根據實際業務需求進行改進和優化,自主研發了輕量級的namesrv,用於注冊Client服務與Broker的請求路由工作,namesrv上不做任何消息的位置存儲,頻繁操作zookeeper的位置存儲數據會影響整體集群性能.

RocketMQ由四部分組成:

  1. Name Server 可集群部署,節點之間無任何信息同步。提供輕量級的服務發現和路由
  2. Broker(消息中轉角色,負責存儲消息,轉發消息) 部署相對復雜,Broker 分為Master 與Slave,一個Master 可以對應多個Slave,但是一個Slave 只能對應一個Master,Master 與Slave 的對應關系通過指定相同的BrokerName,不同的BrokerId來定 義,BrokerId為0 表示Master,非0 表示Slave。Master 也可以部署多個。
  3. Producer,生產者,擁有相同 Producer Group 的 Producer 組成一個集群, 與Name Server 集群中的其中一個節點(隨機選擇)建立長連接,定期從Name Server 取Topic 路由信息,並向提供Topic服務的Master 建立長連接,且定時向Master 發送心跳。Producer 完全無狀態,可集群部署。
  4. Consumer,消費者,接收消息進行消費的實例,擁有相同 Consumer Group 的 Consumer 組成一個集群,與Name Server 集群中的其中一個節點(隨機選擇)建立長連接,定期從Name Server 取Topic 路由信息,並向提供Topic 服務的Master、Slave 建立長連接,且定時向Master、Slave 發送心跳。Consumer既可以從Master 訂閱消息,也可以從Slave 訂閱消息,訂閱規則由Broker 配置決定。

  要使用rocketmq,至少需要啟動兩個進程,nameserver、broker,前者是各種topic注冊中心,后者是真正的broker。

單機環境RocketMQ的安裝(單master):

  下載 rocketmq的安裝文件: http://rocketmq.apache.org

  解壓 unzip rocketmq-all-4.4.0-bin-release.zip

啟動 nameserver:

  進入rocketMQ解壓目錄下的bin文件夾,啟動namesrv服務:nohup sh mqnamesrv &  tail -f ~/logs/rocketmqlogs/namesrv.log 查看啟動日志

  停止 nameserver : sh bin/mqshutdown namesrv . 停止服務的時候需要注意,要先停止broker,其次停止nameserver。

  默認情況下,nameserver監聽的是 9876 端口。查看日志內容出現如下信息即啟動成功:

啟動 broker:

  nohup sh bin/mqbroker -n ${namesrvIp}:9876 -c /conf/broker.conf &   其中[-c可以指定broker.conf配置文件]。默認情況下會加載conf/broker.conf

  停止broker :sh bin/mqshutdown broker

  • nohup sh mqbroker -n localhost:9876 &  啟動broker,其中-n表示指定當前broker對應的命名服務地址: 默認情況下,broker監聽的是10911端口 。
  • 輸入 tail -f ~/logs/rocketmqlogs/broker.log 查看日志
  • 如果 tail -f ~/logs/rocketmqlogs/broker.log 提示找不到文件,則打開當前目錄下的 nohup.out日志文件查看,出現如下日志表示啟動失敗,提示內存無法分配

內存不足的問題:

  這是因為bin 目錄下啟動 nameserv 與 broker 的 runbroker.sh 和 runserver.sh 文件中默認分配的內存太大,rocketmq比較耗內存,所以默認分配的內存比較大,而系統實際內存卻太小導致啟動失敗,通常像虛擬機上安裝的 CentOS 服務器內存可能是沒有高的,只能調小。實際中應該根據服務器內存情況,配置一個合適的值 ,我這里設置成1g。

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m"
Xms:是指設定程序啟動時占用內存大小。一般來講,大點,程序會啟動的快一點,但是也可能會導致機器暫時間變慢。
Xmx:是指設定程序運行期間最大可占用的內存大小。如果程序運行需要占用更多的內存,超出了這個設置值,就會拋出OutOfMemory異常。
xmn:年輕代的heap大小,一般設置為Xmx的3、4分之一

  修改后重新啟動,輸入 tail -f ~/logs/rocketmqlogs/broker.log 查看日志:

  在這里我們會發現,這個broker所監聽的IP地址似乎不是我localhost,這個會導致后續我們會連接不上broker,我們需要修改配置文件 broker.conf 增加一行配置 brokerIP1=192.168.1.101,然后重新啟動 nohup sh mqbroker -n localhost:9876 -c ../conf/broker.conf &,再查看日志:

broker.conf 文件 基本配置:

  • namesrvAddr  :nameserver地址
  • brokerClusterName = DefaultCluster:Cluster名稱,如果集群機器數比較多,可以分成多個cluster,每個cluster提供給不同的業務場景使用
  • brokerName = broker-a:broker名稱,如果配置主從模式,master和slave需要配置相同的名稱來表明關系
  • brokerId = 0:在主從模式中,一個master broker可以有多個slave,0表示master,大於0表示不同slave的id
  • deleteWhen = 04:刪除文件時間點,默認是凌晨4點
  • fileReservedTime = 48:文件保留時間,默認48小時
  • brokerRole = ASYNC_MASTER: SYNC_MASTER/ASYNC_MASTER/SLAVE ; 同步表示slave和master消息同步完成后再返回信息給客戶端
  • flushDiskType = ASYNC_FLUSH:刷盤方式
  • autoCreateTopicEnable = true : topic不存在的情況下自動創建
  • brokerIP1         ip           ip設置外網ip,不需要連接外網的話,可以在參數前面加#注釋掉
  • listenPort         port        port可自由設置,一般設置10911
  • brokerPermission      0x4|0x2       broker讀寫權限
  • defaultTopicQueueNums     8      默認topic讀寫隊列數
  • clusterTopicEnable            true     是否啟用集群topic
  • brokerTopicEnable            true     是否啟用brokertopic
  • autoCreateSubscriptionGroup     TRUE      是否允許Broker 自動創建訂閱組,建議線下開啟,線上關閉
  • sendMessageThreadPoolNums     1        發送消息線程池數量
  • storePathConsumeQueue        $HOME/store/consumequeue      消費隊列存儲路徑
  • storePathIndex                         $HOME/store/index         消息索引存儲路徑
  • storeCheckpoint            $HOME/store/checkpoint          checkpoint 文件存儲路徑
  • abortFile                  $HOME/store/abort             abort 文件存儲路徑

消息發送和接收基本應用:

1.添加 pom 依賴:

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-client</artifactId>
  <version>4.5.2</version>
</dependency>

2.生產者 producer:

 

public class RocketMqProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        /*
         *生產者組,簡單來說就是多個發送同一類消息的生產者稱之為一個生產者組
         *rocketmq支持事務消息,在發送事務消息時,如果事務消息異常(producer掛了),broker端會來回查
         *事務的狀態,這個時候會根據group名稱來查找對應的producer來執行相應的回查邏輯。相當於實現了producer的高可用
         */
        DefaultMQProducer producer = new DefaultMQProducer("unique_producer_group_name");
     producer.setDefaultTopicQueueNums(3);//設置默認的queue數量
//指定namesrv服務地址,獲取broker相關信息 producer.setNamesrvAddr("192.168.1.101:9876"); producer.start(); for (int i = 0; i < 10; i++) { try { //創建一個消息實例,指定指定topic、tag、消息內容 Message msg = new Message("testTopic", "testTag", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */); //發送消息並且獲取發送結果 SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } }

  SendResult中,有一個sendStatus狀態,表示消息的發送狀態。一共有四種狀態:

  1. FLUSH_DISK_TIMEOUT : 表示沒有在規定時間內完成刷盤(需要Broker 的刷盤策Ill創立設置成SYNC_FLUSH 才會報這個錯誤) 。
  2. FLUSH_SLAVE_TIMEOUT :表示在主備方式下,並且Broker 被設置成SYNC_MASTER 方式,沒有在設定時間內完成主從同步。
  3. SLAVE_NOT_AVAILABLE : 這個狀態產生的場景和FLUSH_SLAVE_TIMEOUT 類似, 表示在主備方式下,並且Broker 被設置成SYNC_MASTER ,但是沒有找到被配置成Slave 的Broker 。
  4. SEND_OK :表示發送成功,發送成功的具體含義,比如消息是否已經被存儲到磁盤?消息是否被同步到了Slave 上?消息在Slave 上是否被寫入磁盤?需要結合所配置的刷盤策略、主從策略來定。這個狀態還可以簡單理解為,沒有發生上面列出的三個問題狀態就是SEND OK

3.消費者consumer:

public class RocketMqConsumer {
    public static void main(String[] args) throws MQClientException {
        //消費者的組名,這個和kafka是一樣,這里需要注意的是,
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("unique_consumer_group_name");
        //指定NameServer地址,多個地址以 ; 隔開
        consumer.setNamesrvAddr("192.168.1.101:9876");
        //設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費
        //如果非第一次啟動,那么按照上次消費的位置繼續消費
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //訂閱PushTopic下Tag為push的消息
        consumer.subscribe("testTopic", "*"); //*表示不過濾,可以通過tag來過濾,比如:”tagA”
        /*
         * 注冊消息監聽回調這里有兩種監聽,MessageListenerConcurrently以及MessageListenerOrderly
         * 前者是普通監聽,后者是順序監聽。這塊在后續單獨說明
         */
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
                                                                    msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n",
                        Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//返回消息消費狀態
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");

    }

  Rocketmq中支持廣播消息,就意味着同一個group中的消費者可以消費同一個消息。

  consumerGroup:位於同一個consumerGroup中的consumer實例和producerGroup中的各個produer實例承擔的角色類似;同一個group中可以配置多個consumer,可以提高消費端的並發消費能力以及容災,和kafka一樣,多個consumer會對消息做負載均衡,意味着同一個topic下的不同messageQueue會分發給同一個group中的不同consumer。同時,如果我們希望消息能夠達到廣播的目的,那么只需要把consumer加入到不同的group就行。

  RocketMQ提供了兩種消息消費模型,一種是pull主動拉去,另一種是push,被動接收。但實際上RocketMQ都是pull模式,只是push在pull模式上做了一層封裝,也就是pull到消息以后觸發業務消費者注冊到這里的callback. RocketMQ是基於長輪訓來實現消息的pull。

  nameServer的地址:name server地址,用於獲取broker、topic信息。

SpringBoot整合RocketMq:

1.pom.xml:

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.5.2</version>
</dependency>

2.application.yml :

rocketmq:
  # 生產者配置
  producer:
    isOnOff: on
    # 發送同一類消息的設置為同一個group,保證唯一
    groupName: unique_producer_group_name
    # 服務地址
    namesrvAddr: 192.168.1.101:9876
    # 消息最大長度 默認1024*4(4M)
    maxMessageSize: 4096
    # 發送消息超時時間,默認3000
    sendMsgTimeout: 3000
    # 發送消息失敗重試次數,默認2
    retryTimesWhenSendFailed: 2
  # 消費者配置
  consumer:
    isOnOff: on
    # 官方建議:確保同一組中的每個消費者訂閱相同的主題。
    groupName: unique_consumer_group_name
    # 服務地址
    namesrvAddr: 192.168.1.101:9876
    # 接收該 Topic 下所有 Tag
    topics: testTopic~*;
    consumeThreadMin: 20
    consumeThreadMax: 64
    # 設置一次消費消息的條數,默認為1條
    consumeMessageBatchMaxSize: 1

# 配置 Group  Topic  Tag
plat:
  plat-group: unique_group_name
  plat-topic: testTopic
  plat-tag: testTag

3. ProducerConfig 生產者配置:

@Configuration
public class ProducerConfig {
    private static final Logger LOG = LoggerFactory.getLogger(ProducerConfig.class) ;
    @Value("${rocketmq.producer.groupName}")
    private String groupName;
    @Value("${rocketmq.producer.namesrvAddr}")
    private String namesrvAddr;
    @Value("${rocketmq.producer.maxMessageSize}")
    private Integer maxMessageSize ;
    @Value("${rocketmq.producer.sendMsgTimeout}")
    private Integer sendMsgTimeout;
    @Value("${rocketmq.producer.retryTimesWhenSendFailed}")
    private Integer retryTimesWhenSendFailed;
    @Bean
    public DefaultMQProducer defaultMQProducer() {
        DefaultMQProducer producer;
        producer = new DefaultMQProducer(this.groupName);
        producer.setNamesrvAddr(this.namesrvAddr);
        //如果需要同一個jvm中不同的producer往不同的mq集群發送消息,需要設置不同的instanceName
        if(this.maxMessageSize!=null){
            producer.setMaxMessageSize(this.maxMessageSize);
        }
        if(this.sendMsgTimeout!=null){
            producer.setSendMsgTimeout(this.sendMsgTimeout);
        }
        //如果發送消息失敗,設置重試次數,默認為2次
        if(this.retryTimesWhenSendFailed!=null){
            producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
        }
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        return producer;
    }
}

4.ConsumerConfig 消費者配置:

@Configuration
public class ConsumerConfig {
    private static final Logger LOG = LoggerFactory.getLogger(ConsumerConfig.class) ;
    @Value("${rocketmq.consumer.namesrvAddr}")
    private String namesrvAddr;
    @Value("${rocketmq.consumer.groupName}")
    private String groupName;
    @Value("${rocketmq.consumer.consumeThreadMin}")
    private int consumeThreadMin;
    @Value("${rocketmq.consumer.consumeThreadMax}")
    private int consumeThreadMax;
    @Value("${rocketmq.consumer.topics}")
    private String topics;
    @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}")
    private int consumeMessageBatchMaxSize;
    @Resource
    private RocketMsgListener msgListener;
    @Bean
    public DefaultMQPushConsumer defaultMQPushConsumer(){
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setConsumeThreadMin(consumeThreadMin);
        consumer.setConsumeThreadMax(consumeThreadMax);
        consumer.registerMessageListener(msgListener);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
        try {
            String[] topicTagsArr = topics.split(";");
            for (String topicTags : topicTagsArr) {
                String[] topicTag = topicTags.split("~");
                consumer.subscribe(topicTag[0],topicTag[1]);
            }
            consumer.start();
        }catch (MQClientException e){
            e.printStackTrace();
        }
        return consumer;
    }
}

5.RocketMsgListener 監聽器:

@Component
public class RocketMsgListener implements MessageListenerConcurrently {
    private static final Logger LOG = LoggerFactory.getLogger(RocketMsgListener.class) ;
    @Resource
    private ParamConfigService paramConfigService ;

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
        if (CollectionUtils.isEmpty(list)){
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        MessageExt messageExt = list.get(0);
        LOG.info("接受到的消息為:"+new String(messageExt.getBody()));
        int reConsume = messageExt.getReconsumeTimes();
        // 消息已經重試了3次,如果不需要再次消費,則返回成功
        if(reConsume ==3){
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        if(messageExt.getTopic().equals(paramConfigService.platTopic)){
            String tags = messageExt.getTags() ;
            switch (tags){
                case "testTag":
                    LOG.info("匹配到testTag"+tags);
                    break ;
                default:
                    LOG.info("未匹配到Tag == >>"+tags);
                    break;
            }
        }
        // 消息消費成功
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

6.參數配置類:

@Service
public class ParamConfigService {

    @Value("${plat.plat-group}")
    public String platGroup ;
    @Value("${plat.plat-topic}")
    public String platTopic ;
    @Value("${plat.plat-tag}")
    public String accountTag ;
   //省略 get  set
}

7.測試類:

@RestController
public class TestController {

    @Autowired
    private DefaultMQProducer defaultMQProducer;

    @Autowired
    private ParamConfigService paramConfigService;

    @RequestMapping(value = "/testStringQueue.json", method = {RequestMethod.GET})
    public SendResult testStringQueue() {
        // 可以不使用Config中的Group
        defaultMQProducer.setProducerGroup(paramConfigService.platGroup);
        SendResult sendResult = null;
        String msgInfo = "rocketmq  message 1";
        try {
            Message sendMsg = new Message(paramConfigService.platTopic,
                    paramConfigService.accountTag, msgInfo.getBytes());
            sendResult = defaultMQProducer.send(sendMsg);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return sendResult;
    }
}

  啟動項目訪問接口就可以看到效果。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM