記錄一次在阿里雲ECS服務器部署驗證RocketMQ的經歷


==背景==

購買了3台阿里雲ECS服務器,上面部署了RocketMQ,用來作為業務后台與平台之間的數據通訊中間件。

部署倒是異常順利,不過在本地寫程序,測試生產和消費數據的時候,出現了一些問題。

耗費了將近1天的時間,終於解決了,記錄一下本次排查的經歷。

 

==環境==

Linux:CentOS8(阿里雲ECS服務器)

RocketMQ:4.6.1

 

==集群==

節點數:3個

節點1:broker-a(master)

節點2:broker-a(slave),broker-b(master)

節點3:broker-b(slave)

 

配置文件如下(IP地址省略了):

broker-a.properties

brokerClusterName=rexel
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10921
brokerIP1=xx.xx.xx.01
namesrvAddr=xx.xx.xx.01:9876;xx.xx.xx.02:9876
autoCreateTopicEnable=false
autoCreateSubscriptionGroup=true
storePathRootDir=/home/radmin/data/rocketmq/rootdir-a-m
storePathCommitLog=/home/radmin/data/rocketmq/commitlog-a-m
storePathConsumerQueue=/home/radmin/data/rocketmq/consumequeue-a-m
storePathIndex=/home/radmin/data/rocketmq/index-a-m
storeCheckpoint=/home/radmin/data/rocketmq/checkpoint-a-m

 

broker-a-s.properties

brokerClusterName=rexel
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
listenPort=10931
brokerIP1=xx.xx.xx.02
namesrvAddr=xx.xx.xx.01:9876;xx.xx.xx.02:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=false
autoCreateSubscriptionGroup=true
storePathRootDir=/home/radmin/data/rocketmq/rootdir-a-s
storePathCommitLog=/home/radmin/data/rocketmq/commitlog-a-s
storePathConsumerQueue=/home/radmin/data/rocketmq/consumequeue-a-s
storePathIndex=/home/radmin/data/rocketmq/index-a-s
storeCheckpoint=/home/radmin/data/rocketmq/checkpoint-a-s

 

broker-b.properties

brokerClusterName=rexel
brokerName=broker-b
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10921
brokerIP1=xx.xx.xx.02
namesrvAddr=xx.xx.xx.01:9876;xx.xx.xx.02:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=false
autoCreateSubscriptionGroup=true
storePathRootDir=/home/radmin/data/rocketmq/rootdir-b-m
storePathCommitLog=/home/radmin/data/rocketmq/commitlog-b-m
storePathConsumerQueue=/home/radmin/data/rocketmq/consumequeue-b-m
storePathIndex=/home/radmin/data/rocketmq/index-b-m
storeCheckpoint=/home/radmin/data/rocketmq/checkpoint-b-m

 

broker-b-s.properties

brokerClusterName=rexel
brokerName=broker-b
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
listenPort=10931
brokerIP1=xx.xx.xx.03
namesrvAddr=xx.xx.xx.01:9876;xx.xx.xx.02:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=false
autoCreateSubscriptionGroup=true
storePathRootDir=/home/radmin/data/rocketmq/rootdir-b-s
storePathCommitLog=/home/radmin/data/rocketmq/commitlog-b-s
storePathConsumerQueue=/home/radmin/data/rocketmq/consumequeue-b-s
storePathIndex=/home/radmin/data/rocketmq/index-b-s
storeCheckpoint=/home/radmin/data/rocketmq/checkpoint-b-s

 

==最終代碼== 

RocketUtils.java

package com.rexel.stream.common.utils;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.exception.RemotingException;

public class RocketUtils implements Serializable{
    private static RocketUtils rocketUtils = null;
    private static Map<String, DefaultMQProducer> nameSrvMap = null;

    private RocketUtils() {

    }

    public synchronized static RocketUtils getInstance() {
        if (rocketUtils == null) {
            synchronized (RocketUtils.class) {
                rocketUtils = new RocketUtils();
            }
        }
        nameSrvMap = new HashMap<>();
        return rocketUtils;
    }

    public DefaultMQPushConsumer createConsumer(String namesrvAddr, String topic, String group) {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setInstanceName(UUID.randomUUID().toString());
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.setVipChannelEnabled(false);
        consumer.setConsumeThreadMax(1);
        consumer.setConsumeThreadMin(1);
        consumer.setConsumeMessageBatchMaxSize(1);
        try {
            consumer.subscribe(topic, "*");
        } catch (MQClientException e) {
            e.printStackTrace();
            return null;
        }
        return consumer;
    }

    public DefaultMQProducer createProducer(String nameSrvAddr, String group) {
        if (nameSrvMap == null) {
            return null;
        }

        if (nameSrvMap.containsKey(nameSrvAddr)) {
            return nameSrvMap.get(nameSrvAddr);
        }

        DefaultMQProducer producer = new DefaultMQProducer(group);
        producer.setNamesrvAddr(nameSrvAddr);
        producer.setSendMessageWithVIPChannel(false);
        producer.setSendMsgTimeout(5000);
        producer.setInstanceName(UUID.randomUUID().toString());
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
            return null;
        }

        nameSrvMap.put(nameSrvAddr, producer);
        return producer;
    }

    public boolean sendOr(DefaultMQProducer producer, Message msg, boolean async) {
        if (async) {
            return sendAsync(producer, msg);
        } else {
            return send(producer, msg);
        }
    }

    public boolean sendAsync(DefaultMQProducer producer, Message msg) {
        try {
            producer.send(msg, new CallBack());
            return true;
        } catch (MQClientException | RemotingException | InterruptedException e) {
            e.printStackTrace();
            return false;
        }
    }

    public boolean send(DefaultMQProducer producer, Message msg) {
        try {
            producer.send(msg);
            return true;
        } catch (MQClientException | RemotingException | InterruptedException | MQBrokerException e) {
            e.printStackTrace();
            return false;
        }
    }

    private class CallBack implements SendCallback,Serializable{
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println("[------]onSuccess");
        }

        @Override
        public void onException(Throwable throwable) {
            System.out.println("[------]onException. " + throwable.getMessage());
        }
    }
}

 

RmqProducer.java

package com.rexel.stream.tools;

import com.alibaba.fastjson.JSONObject;
import com.rexel.stream.common.utils.RocketUtils;
import java.nio.charset.StandardCharsets;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class RmqProducer {
    public static void main(String[] args) {
        System.out.println("[------]start.");
        RocketUtils rocketUtils = RocketUtils.getInstance();
        DefaultMQProducer producer =
            rocketUtils.createProducer("xx.xx.xx.01:9876;xx.xx.xx.02:9876", "pro_test3");

        for (int i = 0; i < 10; i++) {
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("name", "VA_2YC_VAL");
            jsonObject.put("judge", "≥");
            jsonObject.put("value", "100");
            rocketUtils.sendAsync(producer, new Message(
                "app_notice",
                jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8)));
        }

        //如果使用異步發送,這里不要shutdown
//        producer.shutdown();
        System.out.println("[------]end.");
    }
}

 

RmqConsumer.java

package com.rexel.stream.tools;

import com.rexel.stream.common.utils.RocketUtils;
import java.nio.charset.StandardCharsets;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;

public class RmqConsumer {
    public static void main(String[] args) throws MQClientException {
        System.out.println("[------]start.");
        RocketUtils rocketUtils = RocketUtils.getInstance();
        DefaultMQPushConsumer consumer = rocketUtils.createConsumer(
            "xx.xx.xx.01:9876;xx.xx.xx.02:9876",
            "app_notice",
            "rexel_stream3");
        consumer.registerMessageListener(
            (MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
                for (Message msg : list) {
                    try {
                        byte[] body = msg.getBody();
                        String message = new String(body, StandardCharsets.UTF_8);
                        System.out.println("[------]rmq message= " + message);
                    } catch (Exception e) {
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
        consumer.start();

        System.out.println("[------]end.");
    }
}

 

==問題1==

配置完成之后,嘗試在客戶端編寫生產者代碼,結果生產數據的時候報錯。

org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [xxxx] failed

 

嘗試1:

把生產者和消費者的代碼中增加setSendMessageWithVIPChannel(false)。

結果:依然報錯,錯誤沒有改變

 

嘗試2:

 

在配置文件中增加brokerIP1=xx.xx.xx.xx的配置。

結果:依然報錯,錯誤沒有改變

 

嘗試3:

網上說是防火牆的問題,服務器本身的防火牆很早就已經被關閉了。嘗試去設置阿里雲ECS服務器產品的端口。

一次性的把一個10900/10999的端口全部開放

結果:測試同步生產數據正常。

 

嘗試4:

測試異步生產數據。調用RocketUtils中的sendAsync方法。結果報錯:

[------]onException. org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [101.132.242.90:9876, 47.116.50.192:9876] failed
[------]onException. The producer service state not OK, SHUTDOWN_ALREADY
See http://rocketmq.apache.org/docs/faq/ for further details.
[------]onException. The producer service state not OK, SHUTDOWN_ALREADY
See http://rocketmq.apache.org/docs/faq/ for further details.
[------]onException. The producer service state not OK, SHUTDOWN_ALREADY
See http://rocketmq.apache.org/docs/faq/ for further details.
[------]onException. The producer service state not OK, SHUTDOWN_ALREADY
See http://rocketmq.apache.org/docs/faq/ for further details.
[------]onException. The producer service state not OK, SHUTDOWN_ALREADY
See http://rocketmq.apache.org/docs/faq/ for further details.
[------]onException. The producer service state not OK, SHUTDOWN_ALREADY
See http://rocketmq.apache.org/docs/faq/ for further details.
[------]onException. org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [101.132.242.90:9876, 47.116.50.192:9876] failed
[------]onException. org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [101.132.242.90:9876, 47.116.50.192:9876] failed
[------]onException. org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [101.132.242.90:9876, 47.116.50.192:9876] failed

 

原因是使用異步生產數據的時候,我的程序里調用了shutdown方法,

導致后續的異步線程無法正常執行。注釋掉shutdown處理之后,異步生產正常。

 

結論:

如果出現connect to [xxxx] failed的問題,不外乎嘗試以下幾種辦法:

1、程序中:生產者或者消費者:setSendMessageWithVIPChannel(false)

2、配置文件:如果是阿里雲ECS服務器,以下兩個配置使用外網地址:

brokerIP1=xx.xx.xx.01
namesrvAddr=xx.xx.xx.01:9876;xx.xx.xx.02:9876

3、防火牆:關閉服務器本身的防火牆。

4、安全組:阿里雲服務器本身的網絡安全組中需要開通端口。

 

==問題2==

生產者已經沒有問題了,但是消費者一直消費不到數據。程序不報任何錯誤,就是消費不到數據。

在網上找了一些到有的博客,其中這篇給了我一些方向,

http://www.jiangxinlingdu.com/rocketmq/2019/08/06/noconsumer.html

初步懷疑是消費者的偏移量有問題。

 

解決辦法:

我這個環境由於是新搭環境,目前還不是生產環境,所以我直接采用的方式是:

1、停止rocketmq集群

2、刪除所有rocketmq的文件

3、重啟集群

4、重新創建topic

 

一套暴利連招之后,消費者果然可以消費到數據了。

索然沒有真正的找到問題的原因,不過基本上可以確定是rocketmq的元數據出現了問題,

這個問題的產生可能是我最近不斷的調試配置文件,修改內外網地址,重啟引起的。

 

--END--


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM