消息队列之RocketMq


一、RocketMQ 特点

RocketMQ 是阿里巴巴在2012年开源的分布式消息中间件,有点什么的就不多说了,就直接将怎么用。

 

 

 

RocketMq大概就是真么一种结构,具体的使用流程就是,消息生产者(Producer)将消息发布到消息中心,消费者(Consumer)启动监听,当监听到消息时去消息中心拿消息。而NameServer的作用我的理解就类似一个注册中心,将broker的信息提供给Produce和Consumer。

RocketMq天然支持集群,一个Producer集群拥有同一个groupId,同理,同一个Consumer集群也是有相同的GroupId,当消息的消费模式为集群模式时,同一个集群的消费者智能消费一次消息,即如果Consumer1消费了消息,这个集群中的其他消费者就不能再消费此消息,除非将消费模式设置为广播模式。

废话不多说,本地搭建一套RocketMq,实践出真知。

二、本地部署RocketMq

1、RocketMq安装包下载地址:http://rocketmq.apache.org/dowloading/releases/

下载后直接找到 mqnamesrv.cmd 和 mqbroker.cmd,推荐使用everything,非常好用的全局搜索软件。直接百度搜就行,很小。不爱下载的可以从以下路径找到:rocketMq->distribution->target->rocketMq->bin

2、启动NameServer

nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log

启动日志会打印在控制台,如下图。

 

 

3、启动broker

nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log

 同样,日志如下图

 

4、启动Consumer

写代码之前首先引入依赖:

 <dependency>
     <groupId>org.apache.rocketmq</groupId>
     <artifactId>rocketmq-client</artifactId>
     <version>4.2.0</version>
  </dependency>

 

 

如上图所示,这个依赖中已经包括slf4j和netty,所以说如果只为了练习,这一个依赖就够了。

接着创建一个类,启动main方法。

public class Consumer {

    public static void main(String[] args) throws Exception {
        //创建一个消息消费者,并设置一个消息消费者组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("niwei_consumer_group");
        //指定 NameServer 地址
        consumer.setNamesrvAddr("localhost:9876");
        //设置 Consumer 第一次启动时从队列头部开始消费还是队列尾部开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //订阅指定 Topic 下的所有消息
        consumer.subscribe("topic_example_java", "*");
        //注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
                //默认 list 里只有一条消息,可以通过设置参数来批量接收消息
                if (list != null) {
                    for (MessageExt ext : list) {
                        try {
                            System.out.println(new Date() + new String(ext.getBody(), "UTF-8"));
                        } catch (UnsupportedEncodingException e) {
                            e.printStackTrace();
                        }
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 消费者对象在使用之前必须要调用 start 初始化
        consumer.start();
        System.out.println("消息消费者已启动");
    }
}

5、启动Producer

代码如下,记得引入包,然后启动main方法。

public class Producer {

    public static void main(String[] args) throws Exception {
        //创建一个消息生产者,并设置一个消息生产者组
        DefaultMQProducer producer = new DefaultMQProducer("niwei_producer_group");

        //指定 NameServer 地址
        producer.setNamesrvAddr("localhost:9876");

        //初始化 Producer,整个应用生命周期内只需要初始化一次
        producer.start();

        for (int i = 0; i < 100; i++) {
            //创建一条消息对象,指定其主题、标签和消息内容
            Message msg = new Message(
                    "topic_example_java" /* 消息主题名 */,
                    "TagA" /* 消息标签 */,
                    ("Hello Java demo RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息内容 */
            );

            //发送消息并返回结果
            SendResult sendResult = producer.send(msg);

            System.out.printf("%s%n", sendResult);
        }

        // 一旦生产者实例不再被使用则将其关闭,包括清理资源,关闭网络连接等
        producer.shutdown();
    }
}

 

到此,部署并启动完成,我们看到控制台消费者已经收到了消息。使用起来真的是非常的简单,当然简单的练习还是很容易,不过真正在项目里用起来还是会有很多坑,包括重复消费、消息丢失等等很多,还是要多用才能长经验啊。

 

 

 

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM