一、RocketMQ 特点
RocketMQ 是阿里巴巴在2012年开源的分布式消息中间件,有点什么的就不多说了,就直接将怎么用。
RocketMq大概就是真么一种结构,具体的使用流程就是,消息生产者(Producer)将消息发布到消息中心,消费者(Consumer)启动监听,当监听到消息时去消息中心拿消息。而NameServer的作用我的理解就类似一个注册中心,将broker的信息提供给Produce和Consumer。
RocketMq天然支持集群,一个Producer集群拥有同一个groupId,同理,同一个Consumer集群也是有相同的GroupId,当消息的消费模式为集群模式时,同一个集群的消费者智能消费一次消息,即如果Consumer1消费了消息,这个集群中的其他消费者就不能再消费此消息,除非将消费模式设置为广播模式。
废话不多说,本地搭建一套RocketMq,实践出真知。
二、本地部署RocketMq
1、RocketMq安装包下载地址:http://rocketmq.apache.org/dowloading/releases/
下载后直接找到 mqnamesrv.cmd 和 mqbroker.cmd,推荐使用everything,非常好用的全局搜索软件。直接百度搜就行,很小。不爱下载的可以从以下路径找到:rocketMq->distribution->target->rocketMq->bin
2、启动NameServer
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
启动日志会打印在控制台,如下图。
3、启动broker
nohup sh bin/mqbroker -n localhost:9876 & tail -f ~/logs/rocketmqlogs/broker.log
同样,日志如下图
4、启动Consumer
写代码之前首先引入依赖:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.2.0</version> </dependency>
如上图所示,这个依赖中已经包括slf4j和netty,所以说如果只为了练习,这一个依赖就够了。
接着创建一个类,启动main方法。
public class Consumer { public static void main(String[] args) throws Exception { //创建一个消息消费者,并设置一个消息消费者组 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("niwei_consumer_group"); //指定 NameServer 地址 consumer.setNamesrvAddr("localhost:9876"); //设置 Consumer 第一次启动时从队列头部开始消费还是队列尾部开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //订阅指定 Topic 下的所有消息 consumer.subscribe("topic_example_java", "*"); //注册消息监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) { //默认 list 里只有一条消息,可以通过设置参数来批量接收消息 if (list != null) { for (MessageExt ext : list) { try { System.out.println(new Date() + new String(ext.getBody(), "UTF-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 消费者对象在使用之前必须要调用 start 初始化 consumer.start(); System.out.println("消息消费者已启动"); } }
5、启动Producer
代码如下,记得引入包,然后启动main方法。
public class Producer { public static void main(String[] args) throws Exception { //创建一个消息生产者,并设置一个消息生产者组 DefaultMQProducer producer = new DefaultMQProducer("niwei_producer_group"); //指定 NameServer 地址 producer.setNamesrvAddr("localhost:9876"); //初始化 Producer,整个应用生命周期内只需要初始化一次 producer.start(); for (int i = 0; i < 100; i++) { //创建一条消息对象,指定其主题、标签和消息内容 Message msg = new Message( "topic_example_java" /* 消息主题名 */, "TagA" /* 消息标签 */, ("Hello Java demo RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息内容 */ ); //发送消息并返回结果 SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } // 一旦生产者实例不再被使用则将其关闭,包括清理资源,关闭网络连接等 producer.shutdown(); } }
到此,部署并启动完成,我们看到控制台消费者已经收到了消息。使用起来真的是非常的简单,当然简单的练习还是很容易,不过真正在项目里用起来还是会有很多坑,包括重复消费、消息丢失等等很多,还是要多用才能长经验啊。