1、 先解壓
2、 maven編譯安裝、(注意虛擬機采用nat網絡模式,需要聯網)
mvn -Prelease-all -DskipTests clean install -U
啟動nameser節點
啟動broker
nohup sh bin/mqbroker -n localhost:9876 & tail -f namesrv.log
出錯,
修改內存配置
修改為
修改broken
這里我吃了大虧,主機對虛擬機中的端口訪問不通!!!
注意一定要關閉防火牆,或者開啟9876等需要使用的端口,不然無法遠程調用!
再次啟動
nohup bin/mqnamesrv > namesrv.log 2>&1 & tail -f namesrv.log
nohup bin/mqbroker -n 127.0.0.1:9876 > broker.log 2>&1 & tail -f broker.log
、、測試案例
這個是官網的,其實這個無所謂,等會使用代碼遠程發送訂單
> export NAMESRV_ADDR=localhost:9876
> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
停止服務,這個也無所謂,實在不行直接 kill -9 pid 吧進程殺死也是可以的
來,開始發送第一單!!!
package tttt.mq; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.junit.Test; public class MqProductTest { @Test public void test1() { DefaultMQProducer producer = new DefaultMQProducer("xiaof_test"); producer.setNamesrvAddr("192.168.0.128:9876"); try { producer.start(); for (int i = 0; i < 2; i++) try { { Message msg = new Message("Topic1", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); // SendResult sendResult = producer.send(msg); // System.out.printf("%s%n", sendResult); producer.sendOneway(msg); } } catch (Exception e) { e.printStackTrace(); } } catch (MQClientException e) { e.printStackTrace(); } finally { producer.shutdown(); } } }
解壓來,我們消費掉這個
package tttt.mq; import java.util.List; import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.junit.Test; public class MqConsumeTest { @Test public void test1() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("xiaof_test"); consumer.setNamesrvAddr("192.168.0.128:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); try { consumer.subscribe("Topic1", "TagA"); consumer.registerMessageListener(new MessageListenerOrderly() { AtomicLong consumeTimes = new AtomicLong(0); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { //這個要是false,服務器就會不斷重復發送消息 context.setAutoCommit(true); MessageExt msg = msgs.get(0); String data = new String(msg.getBody()); System.out.printf("%s 消費信息線程與數據: %s %n", Thread.currentThread().getName(), data); this.consumeTimes.incrementAndGet(); if ((this.consumeTimes.get() % 2) == 0) { return ConsumeOrderlyStatus.SUCCESS; } else if ((this.consumeTimes.get() % 3) == 0) { return ConsumeOrderlyStatus.ROLLBACK; } else if ((this.consumeTimes.get() % 4) == 0) { return ConsumeOrderlyStatus.COMMIT; } else if ((this.consumeTimes.get() % 5) == 0) { context.setSuspendCurrentQueueTimeMillis(3000); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } catch (MQClientException e) { e.printStackTrace(); } } }
來一發效果:
這個是消費msg中的全部信息: