最近在看消息隊列框架 ,alibaba的RocketMQ單機支持1萬以上的持久化隊列,支持諸多特性,
目前RocketMQ在阿里集團被廣泛應用在訂單,交易,充值,流計算,消息推送,日志流式處理,binglog分發等場景
比kafka還是有過之無不及,其實kafka文檔很豐富
但RocketMQ網上的文章太少,找不到相關的操作教程
於是研究了下源碼 做個單機操作的教程,如果你也對此有興趣不妨共同研究
下載源碼的地址 https://github.com/alibaba/RocketMQ/releases
-
首選通過在java項目里面Maven依賴方式引用RocketMQ Java SDK
<dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.2.6</version> </dependency>
Downloads
- 11.3 MBalibaba-rocketmq-3.2.6.tar.gz
- 2.46 MBalibaba-rocketmq-client-java-3.2.6.tar.gz
- Source code (zip)
- Source code (tar.gz)
在linux 下用wget 下載源碼然后解壓出來
在runserver.sh里面可以配置 jvm啟動的參數 JAVA_OPT_1="-server -Xms4g -Xmx4g -Xmn2g -XX:PermSize=128m -XX:MaxPermSize=320m"
可以 vi runserver.sh
分別給 mqnamesrv mqbroker play.sh 執行的權限
chmod +x mqnamersrv
chmod +x mqbroker
chmod +x play.sh
下面紅線框的這段 命令輸入錯誤了,忽略不用看
通過 nohup sh mqnamesrv& 啟動 RocketMq
目前沒看到結束的命令,也沒找到相關的介紹,
我這里用的 ps -ef|grep rocketmq 查到進程pid
然后kill pid號
或則pkill -9 java [慎用]
用jps -v 查看下java進程的參數
rocketmq啟動后監聽 9876端口,這里還是在看源碼里面看到的,資料實在是太少了
在防火牆配置里面加上 9876端口,設置iptables對外開放
部署Broker
nohup sh mqbroker -n "127.0.0.1:9876" -c ../conf/2m-2s-async/broker-a.properties &
這里ip換成本機的就是單機實例,如果配置主從 這里可以配其他的ip
Master和Slave的配置文件參考conf目錄下的配置文件
Master與Slave通過指定相同的brokerName參數來配對,Master的BrokerId必須是0,Slave的BrokerId必須是大於0的數
一個Master下面可以掛載多個Slave,同一Master下的多個Slave通過指定不同的BrokerId來區分
部署一Master一Slave,集群采用異步復制方式:
Master: nohup sh mqbroker -n "192.168.1.23:9876" -c ../conf/2m-2s-async/broker-a.properties &
Slave: nohup sh mqbroker -n "192.168.1.23:9876" -c ../conf/2m-2s-async/broker-a-s.properties &
package com.pgsqlmybatis.common.rocketmq;/* *************************************************************** * 公司名稱 : * 系統名稱 :信用管家專業版 * 類 名 稱 :Ios渠道idfa統計,推廣統計用 * 功能描述 : * 業務描述 : * 作 者 名 :@Author Royal * 開發日期 :2016-05-15 * Created :IntelliJ IDEA *************************************************************** * 修改日期 : * 修 改 者 : * 修改內容 : *************************************************************** */ import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; public class Producer { public static void main(String[] args) { DefaultMQProducer producer = new DefaultMQProducer("Producer"); producer.setNamesrvAddr("xxxxxxxxxx:9876"); try { producer.start(); String pushMsg="kafka activeMq rocketMq 消息隊列使用1"; Message msg = new Message("PushTopic","push","1", pushMsg.getBytes("UTF-8")); SendResult result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); String pushMsg2="海量級消息記錄單機測試2"; msg = new Message("PushTopic","push","2",pushMsg2.getBytes("UTF-8")); result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); String pushMsg3="海量級消息記錄單機測試3"; msg = new Message("PullTopic","pull","1",pushMsg3.getBytes()); result = producer.send(msg); System.out.println("id:" + result.getMsgId() + " result:" + result.getSendStatus()); } catch (Exception e) { e.printStackTrace(); } finally { producer.shutdown(); } } }
啟動生成者
啟動消費者
package com.pgsqlmybatis.common.rocketmq;/* *************************************************************** * 公司名稱 : * 系統名稱 :信用管家專業版 * 類 名 稱 :Ios渠道idfa統計,推廣統計用 * 功能描述 : * 業務描述 : * 作 者 名 :@Author Royal * 開發日期 :2016-05-15 * Created :IntelliJ IDEA *************************************************************** * 修改日期 : * 修 改 者 : * 修改內容 : *************************************************************** */ import java.io.UnsupportedEncodingException; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageExt; public class Consumer { public static void main(String[] args){ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumer"); consumer.setNamesrvAddr("xxxxxxxxxxxx:9876"); try { consumer.subscribe("PushTopic", "push"); /** * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br> * 如果非第一次啟動,那么按照上次消費的位置繼續消費 */ consumer.setConsumeFromWhere( ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener( new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> list, ConsumeConcurrentlyContext Context) { Message msg = list.get(0); System.out.println(msg.toString()); String recString= null; try { recString = new String(msg.getBody() ,"UTF-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } System.out.println(recString); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } ); consumer.start(); } catch (Exception e) { e.printStackTrace(); } } }
以上為單機實例配置
如果你遇到什么問題可以私信我,如果覺得此文對你很有幫助,點下贊推薦下額^_^
參考:http://blog.csdn.net/a19881029/article/details/34446629
http://sofar.blog.51cto.com/353572/1540874
http://blog.csdn.net/loongshawn/article/details/51086876