RocketMq的单机安装(一):https://www.cnblogs.com/simplefuer/p/12192984.html
完成RocketMq的安装后,需要对rocketMq进行本地测试,java代码如下:
Producer(生产者):
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { /*** * 需要一个producerGroup 名字作为构造方法的参数,可自定义 */ DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName1"); producer.setNamesrvAddr("*****:9876");//外网地址 producer.start(); for (int i = 0; i < 128; i++) try { { Message msg = new Message("TopicTest",//主题 "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET))//发送的内容; SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } } catch (Exception e) { e.printStackTrace(); } producer.shutdown(); } }
运行结果:
Connected to the target VM, address: '127.0.0.1:64374', transport: 'socket' 16:17:20.502 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework SendResult [sendStatus=SEND_OK, msgId=C0A8014221A818B4AAC24B97D4C70000, offsetMsgId=79296EE100002A9F000000000002320C, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3], queueOffset=32] SendResult [sendStatus=SEND_OK, msgId=C0A8014221A818B4AAC24B97D5150001, offsetMsgId=79296EE100002A9F00000000000232E1, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=0], queueOffset=32] SendResult [sendStatus=SEND_OK, msgId=C0A8014221A818B4AAC24B97D5370002, offsetMsgId=79296EE100002A9F00000000000233B6, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=1], queueOffset=32] SendResult [sendStatus=SEND_OK, msgId=C0A8014221A818B4AAC24B97D5600003, offsetMsgId=79296EE100002A9F000000000002348B, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=2], queueOffset=32] SendResult [sendStatus=SEND_OK, msgId=C0A8014221A818B4AAC24B97D5810004, offsetMsgId=79296EE100002A9F0000000000023560, messageQueue=MessageQueue [topic=TopicTest1, brokerName=broker-a, queueId=3], queueOffset=33]
Consumer(消费者):
import java.util.List; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; public class Consumer { public static void main(String[] args) { /** * Instantiate with specified consumer group name. */ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ProducerGroupName"); consumer.setNamesrvAddr("121.41.110.225:9876"); /** * 指定name server的地址. * <p/> * * 或者你可以通过环境变量指定name server地址: NAMESRV_ADDR * <pre> * {@code * consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876"); * } * </pre> */
/** * 指定从哪里开始,以防指定的消费群体是一个全新的消费群体. */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); /** * 订阅主题来消费. */
try { consumer.subscribe("TopicTest", "*"); } catch (MQClientException e) { e.printStackTrace(); } /** * Register callback to execute on arrival of messages fetched from brokers. */ consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) throws InterruptedException { for (MessageExt ext : msgs) { String result = new String(ext.getBody()); // Thread.sleep(10000);
System.out.println(result); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }) ; /** * 启动消费者实例. */
try { consumer.start(); } catch (MQClientException e) { e.printStackTrace(); } System.out.printf("Consumer Started.%n"); } }
运行结果:
Connected to the target VM, address: '127.0.0.1:64630', transport: 'socket' 16:32:39.716 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework Consumer Started. Hello world Hello world Hello world Hello world Hello world Hello world Hello world
注意:Producer和Consumer的topic需要一致。