RocketMq消息隊列使用


最近在看消息隊列框架 ,alibaba的RocketMQ單機支持1萬以上的持久化隊列,支持諸多特性,

目前RocketMQ在阿里集團被廣泛應用在訂單,交易,充值,流計算,消息推送,日志流式處理,binglog分發等場景

比kafka還是有過之無不及,其實kafka文檔很豐富

但RocketMQ網上的文章太少,找不到相關的操作教程

於是研究了下源碼 做個單機操作的教程,如果你也對此有興趣不妨共同研究

下載源碼的地址 https://github.com/alibaba/RocketMQ/releases

  • 首選通過在java項目里面Maven依賴方式引用RocketMQ Java SDK

    <dependency>
        <groupId>com.alibaba.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>3.2.6</version>
    </dependency>

Downloads

在linux 下用wget 下載源碼然后解壓出來

在runserver.sh里面可以配置 jvm啟動的參數 JAVA_OPT_1="-server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m"  

可以 vi runserver.sh

分別給 mqnamesrv mqbroker play.sh 執行的權限

chmod +x  mqnamersrv 

chmod +x  mqbroker 

chmod +x  play.sh 

下面紅線框的這段 命令輸入錯誤了,忽略不用看

通過 nohup sh mqnamesrv& 啟動 RocketMq

目前沒看到結束的命令,也沒找到相關的介紹,

我這里用的 ps -ef|grep rocketmq  查到進程pid

然后kill pid號

或則pkill -9 java [慎用]

用jps -v 查看下java進程的參數

 rocketmq啟動后監聽 9876端口,這里還是在看源碼里面看到的,資料實在是太少了

在防火牆配置里面加上 9876端口,設置iptables對外開放

部署Broker 

nohup sh mqbroker -n "127.0.0.1:9876" -c ../conf/2m-2s-async/broker-a.properties & 

這里ip換成本機的就是單機實例,如果配置主從 這里可以配其他的ip

 Master和Slave的配置文件參考conf目錄下的配置文件

 Master與Slave通過指定相同的brokerName參數來配對,Master的BrokerId必須是0,Slave的BrokerId必須是大於0的數

 一個Master下面可以掛載多個Slave,同一Master下的多個Slave通過指定不同的BrokerId來區分

 部署一Master一Slave,集群采用異步復制方式:

 Master: nohup sh mqbroker -n "192.168.1.23:9876" -c ../conf/2m-2s-async/broker-a.properties &  

Slave:   nohup sh mqbroker -n "192.168.1.23:9876" -c ../conf/2m-2s-async/broker-a-s.properties &  

 

 

package com.pgsqlmybatis.common.rocketmq;/*
***************************************************************
* 公司名稱    :
* 系統名稱    :信用管家專業版
* 類 名 稱    :Ios渠道idfa統計,推廣統計用
* 功能描述    :
* 業務描述    :
* 作 者 名    :@Author Royal
* 開發日期    :2016-05-15
* Created     :IntelliJ IDEA
***************************************************************
* 修改日期    :
* 修 改 者    :
* 修改內容    :
***************************************************************
*/

import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;

public class Producer {
    public static void main(String[] args) {
        DefaultMQProducer producer = new DefaultMQProducer("Producer");
        producer.setNamesrvAddr("xxxxxxxxxx:9876");
        try {
            producer.start();

            String pushMsg="kafka activeMq rocketMq 消息隊列使用1";
            Message msg = new Message("PushTopic","push","1",
                    pushMsg.getBytes("UTF-8"));

            SendResult result = producer.send(msg);
            System.out.println("id:" + result.getMsgId() +
                    " result:" + result.getSendStatus());

            String pushMsg2="海量級消息記錄單機測試2";
            msg = new Message("PushTopic","push","2",pushMsg2.getBytes("UTF-8"));

            result = producer.send(msg);
            System.out.println("id:" + result.getMsgId() +
                    " result:" + result.getSendStatus());

            String pushMsg3="海量級消息記錄單機測試3";
            msg = new Message("PullTopic","pull","1",pushMsg3.getBytes());

            result = producer.send(msg);
            System.out.println("id:" + result.getMsgId() +
                    " result:" + result.getSendStatus());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.shutdown();
        }
    }
}

  

啟動生成者

 

啟動消費者

package com.pgsqlmybatis.common.rocketmq;/*
***************************************************************
* 公司名稱    :
* 系統名稱    :信用管家專業版
* 類 名 稱    :Ios渠道idfa統計,推廣統計用
* 功能描述    :
* 業務描述    :
* 作 者 名    :@Author Royal
* 開發日期    :2016-05-15
* Created     :IntelliJ IDEA
***************************************************************
* 修改日期    :
* 修 改 者    :
* 修改內容    :
***************************************************************
*/

import java.io.UnsupportedEncodingException;
import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageExt;

public class Consumer {
    public static void main(String[] args){
        DefaultMQPushConsumer consumer =
                new DefaultMQPushConsumer("PushConsumer");
        consumer.setNamesrvAddr("xxxxxxxxxxxx:9876");
        try {
            consumer.subscribe("PushTopic", "push");
            /**
             * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br>
             * 如果非第一次啟動,那么按照上次消費的位置繼續消費
             */
            consumer.setConsumeFromWhere(
                    ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.registerMessageListener(
                    new MessageListenerConcurrently() {
                        public ConsumeConcurrentlyStatus consumeMessage(
                                List<MessageExt> list,
                                ConsumeConcurrentlyContext Context) {
                            Message msg = list.get(0);
                            System.out.println(msg.toString());
                            String recString= null;
                            try {
                                recString = new String(msg.getBody() ,"UTF-8");
                            } catch (UnsupportedEncodingException e) {
                                e.printStackTrace();
                            }
                            System.out.println(recString);
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        }
                    }
            );
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

   

 

以上為單機實例配置

如果你遇到什么問題可以私信我,如果覺得此文對你很有幫助,點下贊推薦下額^_^ 

參考:http://blog.csdn.net/a19881029/article/details/34446629

        http://sofar.blog.51cto.com/353572/1540874

        http://blog.csdn.net/loongshawn/article/details/51086876

        RocketMq最佳實踐

       《RocketMQ原理簡介》

       分布式開放消息系統(RocketMQ)的原理與實踐      

       《RocketMQ用戶指南》


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM