==背景==
購買了3台阿里雲ECS服務器,上面部署了RocketMQ,用來作為業務后台與平台之間的數據通訊中間件。
部署倒是異常順利,不過在本地寫程序,測試生產和消費數據的時候,出現了一些問題。
耗費了將近1天的時間,終於解決了,記錄一下本次排查的經歷。
==環境==
Linux:CentOS8(阿里雲ECS服務器)
RocketMQ:4.6.1
==集群==
節點數:3個
節點1:broker-a(master)
節點2:broker-a(slave),broker-b(master)
節點3:broker-b(slave)
配置文件如下(IP地址省略了):
broker-a.properties
brokerClusterName=rexel brokerName=broker-a brokerId=0 deleteWhen=04 fileReservedTime=48 brokerRole=SYNC_MASTER flushDiskType=ASYNC_FLUSH listenPort=10921 brokerIP1=xx.xx.xx.01 namesrvAddr=xx.xx.xx.01:9876;xx.xx.xx.02:9876 autoCreateTopicEnable=false autoCreateSubscriptionGroup=true storePathRootDir=/home/radmin/data/rocketmq/rootdir-a-m storePathCommitLog=/home/radmin/data/rocketmq/commitlog-a-m storePathConsumerQueue=/home/radmin/data/rocketmq/consumequeue-a-m storePathIndex=/home/radmin/data/rocketmq/index-a-m storeCheckpoint=/home/radmin/data/rocketmq/checkpoint-a-m
broker-a-s.properties
brokerClusterName=rexel brokerName=broker-a brokerId=1 deleteWhen=04 fileReservedTime=48 brokerRole=SLAVE flushDiskType=ASYNC_FLUSH listenPort=10931 brokerIP1=xx.xx.xx.02 namesrvAddr=xx.xx.xx.01:9876;xx.xx.xx.02:9876 defaultTopicQueueNums=4 autoCreateTopicEnable=false autoCreateSubscriptionGroup=true storePathRootDir=/home/radmin/data/rocketmq/rootdir-a-s storePathCommitLog=/home/radmin/data/rocketmq/commitlog-a-s storePathConsumerQueue=/home/radmin/data/rocketmq/consumequeue-a-s storePathIndex=/home/radmin/data/rocketmq/index-a-s storeCheckpoint=/home/radmin/data/rocketmq/checkpoint-a-s
broker-b.properties
brokerClusterName=rexel brokerName=broker-b brokerId=0 deleteWhen=04 fileReservedTime=48 brokerRole=SYNC_MASTER flushDiskType=ASYNC_FLUSH listenPort=10921 brokerIP1=xx.xx.xx.02 namesrvAddr=xx.xx.xx.01:9876;xx.xx.xx.02:9876 defaultTopicQueueNums=4 autoCreateTopicEnable=false autoCreateSubscriptionGroup=true storePathRootDir=/home/radmin/data/rocketmq/rootdir-b-m storePathCommitLog=/home/radmin/data/rocketmq/commitlog-b-m storePathConsumerQueue=/home/radmin/data/rocketmq/consumequeue-b-m storePathIndex=/home/radmin/data/rocketmq/index-b-m storeCheckpoint=/home/radmin/data/rocketmq/checkpoint-b-m
broker-b-s.properties
brokerClusterName=rexel brokerName=broker-b brokerId=1 deleteWhen=04 fileReservedTime=48 brokerRole=SLAVE flushDiskType=ASYNC_FLUSH listenPort=10931 brokerIP1=xx.xx.xx.03 namesrvAddr=xx.xx.xx.01:9876;xx.xx.xx.02:9876 defaultTopicQueueNums=4 autoCreateTopicEnable=false autoCreateSubscriptionGroup=true storePathRootDir=/home/radmin/data/rocketmq/rootdir-b-s storePathCommitLog=/home/radmin/data/rocketmq/commitlog-b-s storePathConsumerQueue=/home/radmin/data/rocketmq/consumequeue-b-s storePathIndex=/home/radmin/data/rocketmq/index-b-s storeCheckpoint=/home/radmin/data/rocketmq/checkpoint-b-s
==最終代碼==
RocketUtils.java
package com.rexel.stream.common.utils; import java.io.Serializable; import java.util.HashMap; import java.util.Map; import java.util.UUID; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.exception.RemotingException; public class RocketUtils implements Serializable{ private static RocketUtils rocketUtils = null; private static Map<String, DefaultMQProducer> nameSrvMap = null; private RocketUtils() { } public synchronized static RocketUtils getInstance() { if (rocketUtils == null) { synchronized (RocketUtils.class) { rocketUtils = new RocketUtils(); } } nameSrvMap = new HashMap<>(); return rocketUtils; } public DefaultMQPushConsumer createConsumer(String namesrvAddr, String topic, String group) { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); consumer.setNamesrvAddr(namesrvAddr); consumer.setInstanceName(UUID.randomUUID().toString()); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setMessageModel(MessageModel.CLUSTERING); consumer.setVipChannelEnabled(false); consumer.setConsumeThreadMax(1); consumer.setConsumeThreadMin(1); consumer.setConsumeMessageBatchMaxSize(1); try { consumer.subscribe(topic, "*"); } catch (MQClientException e) { e.printStackTrace(); return null; } return consumer; } public DefaultMQProducer createProducer(String nameSrvAddr, String group) { if (nameSrvMap == null) { return null; } if (nameSrvMap.containsKey(nameSrvAddr)) { return nameSrvMap.get(nameSrvAddr); } DefaultMQProducer producer = new DefaultMQProducer(group); producer.setNamesrvAddr(nameSrvAddr); producer.setSendMessageWithVIPChannel(false); producer.setSendMsgTimeout(5000); producer.setInstanceName(UUID.randomUUID().toString()); try { producer.start(); } catch (MQClientException e) { e.printStackTrace(); return null; } nameSrvMap.put(nameSrvAddr, producer); return producer; } public boolean sendOr(DefaultMQProducer producer, Message msg, boolean async) { if (async) { return sendAsync(producer, msg); } else { return send(producer, msg); } } public boolean sendAsync(DefaultMQProducer producer, Message msg) { try { producer.send(msg, new CallBack()); return true; } catch (MQClientException | RemotingException | InterruptedException e) { e.printStackTrace(); return false; } } public boolean send(DefaultMQProducer producer, Message msg) { try { producer.send(msg); return true; } catch (MQClientException | RemotingException | InterruptedException | MQBrokerException e) { e.printStackTrace(); return false; } } private class CallBack implements SendCallback,Serializable{ @Override public void onSuccess(SendResult sendResult) { System.out.println("[------]onSuccess"); } @Override public void onException(Throwable throwable) { System.out.println("[------]onException. " + throwable.getMessage()); } } }
RmqProducer.java
package com.rexel.stream.tools; import com.alibaba.fastjson.JSONObject; import com.rexel.stream.common.utils.RocketUtils; import java.nio.charset.StandardCharsets; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; public class RmqProducer { public static void main(String[] args) { System.out.println("[------]start."); RocketUtils rocketUtils = RocketUtils.getInstance(); DefaultMQProducer producer = rocketUtils.createProducer("xx.xx.xx.01:9876;xx.xx.xx.02:9876", "pro_test3"); for (int i = 0; i < 10; i++) { JSONObject jsonObject = new JSONObject(); jsonObject.put("name", "VA_2YC_VAL"); jsonObject.put("judge", "≥"); jsonObject.put("value", "100"); rocketUtils.sendAsync(producer, new Message( "app_notice", jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8))); } //如果使用異步發送,這里不要shutdown // producer.shutdown(); System.out.println("[------]end."); } }
RmqConsumer.java
package com.rexel.stream.tools; import com.rexel.stream.common.utils.RocketUtils; import java.nio.charset.StandardCharsets; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.Message; public class RmqConsumer { public static void main(String[] args) throws MQClientException { System.out.println("[------]start."); RocketUtils rocketUtils = RocketUtils.getInstance(); DefaultMQPushConsumer consumer = rocketUtils.createConsumer( "xx.xx.xx.01:9876;xx.xx.xx.02:9876", "app_notice", "rexel_stream3"); consumer.registerMessageListener( (MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> { for (Message msg : list) { try { byte[] body = msg.getBody(); String message = new String(body, StandardCharsets.UTF_8); System.out.println("[------]rmq message= " + message); } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); System.out.println("[------]end."); } }
==問題1==
配置完成之后,嘗試在客戶端編寫生產者代碼,結果生產數據的時候報錯。
org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [xxxx] failed
嘗試1:
把生產者和消費者的代碼中增加setSendMessageWithVIPChannel(false)。
結果:依然報錯,錯誤沒有改變
嘗試2:
在配置文件中增加brokerIP1=xx.xx.xx.xx的配置。
結果:依然報錯,錯誤沒有改變
嘗試3:
網上說是防火牆的問題,服務器本身的防火牆很早就已經被關閉了。嘗試去設置阿里雲ECS服務器產品的端口。
一次性的把一個10900/10999的端口全部開放
結果:測試同步生產數據正常。
嘗試4:
測試異步生產數據。調用RocketUtils中的sendAsync方法。結果報錯:
[------]onException. org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [101.132.242.90:9876, 47.116.50.192:9876] failed [------]onException. The producer service state not OK, SHUTDOWN_ALREADY See http://rocketmq.apache.org/docs/faq/ for further details. [------]onException. The producer service state not OK, SHUTDOWN_ALREADY See http://rocketmq.apache.org/docs/faq/ for further details. [------]onException. The producer service state not OK, SHUTDOWN_ALREADY See http://rocketmq.apache.org/docs/faq/ for further details. [------]onException. The producer service state not OK, SHUTDOWN_ALREADY See http://rocketmq.apache.org/docs/faq/ for further details. [------]onException. The producer service state not OK, SHUTDOWN_ALREADY See http://rocketmq.apache.org/docs/faq/ for further details. [------]onException. The producer service state not OK, SHUTDOWN_ALREADY See http://rocketmq.apache.org/docs/faq/ for further details. [------]onException. org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [101.132.242.90:9876, 47.116.50.192:9876] failed [------]onException. org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [101.132.242.90:9876, 47.116.50.192:9876] failed [------]onException. org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [101.132.242.90:9876, 47.116.50.192:9876] failed
原因是使用異步生產數據的時候,我的程序里調用了shutdown方法,
導致后續的異步線程無法正常執行。注釋掉shutdown處理之后,異步生產正常。
結論:
如果出現connect to [xxxx] failed的問題,不外乎嘗試以下幾種辦法:
1、程序中:生產者或者消費者:setSendMessageWithVIPChannel(false)
2、配置文件:如果是阿里雲ECS服務器,以下兩個配置使用外網地址:
brokerIP1=xx.xx.xx.01
namesrvAddr=xx.xx.xx.01:9876;xx.xx.xx.02:9876
3、防火牆:關閉服務器本身的防火牆。
4、安全組:阿里雲服務器本身的網絡安全組中需要開通端口。
==問題2==
生產者已經沒有問題了,但是消費者一直消費不到數據。程序不報任何錯誤,就是消費不到數據。
在網上找了一些到有的博客,其中這篇給了我一些方向,
http://www.jiangxinlingdu.com/rocketmq/2019/08/06/noconsumer.html
初步懷疑是消費者的偏移量有問題。
解決辦法:
我這個環境由於是新搭環境,目前還不是生產環境,所以我直接采用的方式是:
1、停止rocketmq集群
2、刪除所有rocketmq的文件
3、重啟集群
4、重新創建topic
一套暴利連招之后,消費者果然可以消費到數據了。
索然沒有真正的找到問題的原因,不過基本上可以確定是rocketmq的元數據出現了問題,
這個問題的產生可能是我最近不斷的調試配置文件,修改內外網地址,重啟引起的。
--END--