本文主要是講在Centos中安裝RocketMQ並做簡單的示例。如果你按照本文安裝100%是可以成功的,如果按照阿里官方的說明,那只能呵呵了~
安裝
官方地址為:https://rocketmq.apache.org/docs/quick-start/
本人安裝如下:
//下載最新的rocketmq
wget http://apache-mirror.8birdsvideo.com/rocketmq/4.4.0/rocketmq-all-4.4.0-bin-release.zip
//解壓
unzip rocketmq-all-4.4.0-bin-release.zip
//切換到mq目錄
cd rocketmq-all-4.4.0-bin-release
//name server 啟動
nohup ./bin/mqnamesrv -n 111.231.XX.XX:9876 &
//-c conf/broker.conf autoCreateTopicEnable=true 參數需要帶上,不然topic需要手動創建
nohup sh bin/mqbroker -n 111.231.XX.XX:9876 -c conf/broker.conf autoCreateTopicEnable=true &
配置,切換到mq的bin目錄下
cd rocketmq-all-4.4.0-bin-release/bin
rocketmq默認最低內存為4g,機器內存不夠用的話,找到runserver.sh和runbroker.sh編輯如下:
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
運行
運行官方demo,發現如下錯誤:
21:20:22.249 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[] result: true
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:640)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1310)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1256)
at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:339)
at org.apache.rocketmq.example.simple.Producer.main(Producer.java:40)
運行以下命令查看broker配置並寫入遠程ip地址:
//查看broker配置
sh ./bin/mqbroker -m
//關閉broker
sh bin/mqshutdown broker
//將本機遠程ip寫入配置文件中
echo 'brokerIP1=111.231.XX.XX' > conf/broker.properties
//重新啟動broker
nohup sh bin/mqbroker -n 111.231.XX.XX:9876 -c conf/broker.conf autoCreateTopicEnable=true &
管理控制台安裝
Git地址:https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console
git clone git@github.com:apache/rocketmq-externals.git
cd rocketmq-external/rocketmq-console/
mvn clean package -Dmaven.test.skip=true
打完包后,運行以下命令
java -jar rocketmq-console-ng-1.0.1.jar --server.port=12181 --rocketmq.config.namesrvAddr=111.231.XX.XX:9876
打開 http://localhost:12181訪問控制台,像如下
在Procuder這個頁面查詢時會出現如下異常:
java.lang.RuntimeException: org.apache.rocketmq.client.exception.MQBrokerException: CODE: 1 DESC: the producer group[] not exist
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
at com.google.common.base.Throwables.propagate(Throwables.java:160)
at org.apache.rocketmq.console.service.impl.ProducerServiceImpl.getProducerConnection(ProducerServiceImpl.java:38)
at org.apache.rocketmq.console.controller.ProducerController.producerConnection(ProducerController.java:39)
請把代碼中producer.shutdown()這句注掉,生產環境中請加上。
//producer.shutdown();
代碼示例(官方)
生產者
package org.apache.rocketmq.example.simple;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("111.231.XX.XX:9876");
producer.start();
for (int i = 0; i < 10; i++)
try {
{
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
//producer.shutdown();
}
}
消費者
package org.apache.rocketmq.example.simple;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
public class PushConsumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
consumer.subscribe("TopicTest", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//wrong time format 2017_0422_221800
//consumer.setConsumeTimestamp("20181109221800");
consumer.setNamesrvAddr("111.231.XX.XX:9876");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
有更多的文章,請關注查看,更有面試寶典相送