安裝Maven
安裝步驟:《Maven的安裝、配置及使用入門》
http://www.cnblogs.com/dcba1112/archive/2011/05/01/2033805.html
http://maven.apache.org/download.cgi (apache-maven-3.3.3-bin.zip)
Path環境變量,當我們在cmd中輸入命令時,Windows首先會在當前目錄中尋找可執行文件或腳本,如果沒有找到,Windows會接着遍歷環境變量Path中定義的路徑。由於我們將%M2_HOME%\bin添加到了Path中,而這里%M2_HOME%實際上是引用了我們前面定義的另一個變量,其值是Maven的安裝目錄。因此,Windows會在執行命令時搜索目錄D:\bin\apache-maven-3.0\bin,而mvn執行腳本的位置就是這里。
安裝RocketMQ
源碼地址:(2015-07-18 3.2.6版本)
E:\RocketMQ\RocketMQ-master
E:\RocketMQ\RocketMQ_Workspace
E:\RocketMQ\RocketMQ-master\target\alibaba-rocketmq-3.2.6-alibaba-rocketmq\alibaba-rocketmq\bin
執行下邊的命令或者執行install.bat(在這個bat文件中的命令如下)對maven熟悉的一眼就知道是執行clean package install assembly等操作。
mvn -Dmaven.test.skip=true clean packageinstall assembly:assembly –U
將RocketMQ-master導入到eclipse中
進入剛生成的target文件夾下的bin目錄,在命令行中執行:start mqnamesrv.exe,會彈出一個信息窗口,顯示The name Server boot success 說明啟動成功了,接着啟動borker,在命令行中執行:start mqbroker.exe -n 127.0.0.1:9876 同樣的彈出一個窗口,看到success表示成功了。
start mqnamesrv >E:\RocketMQ\logs\alibaba-rocketmq/mqnamesrv.log
【遇到問題】無法啟動mqnamesrv。顯示軟件不兼容。
【如何在windows下使用linux的shell腳本】
http://www.cygwin.com/。
E:\cygwin64。
【當前目錄調出CMD】
在桌面上先按住Shift鍵,然后鼠標右鍵,出現選項“在此處打開命令窗口(W)”也可以打開命令行。
生產者
1 public class Producer{ 2 public static void main(String[] args) throws MQClientException,InterruptedException{ 3 //一個應用創建一個Producer,由應用來維護此對象,可以設置為全局對象或者單例 4 final DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); 5 producer.setNamesevAddr("127.0.0.1:9876"); 6 producer.setInstanceName("Producer"); 7 8 producer.start(); 9 10 //一個Producer對象可以發送多個topic,多個tag的消息 11 for(int i=0; i<10; i++){ 12 try{ 13 { 14 Message msg = new Message("TopicTest1","TagA","OrderID001",("Hello MetaQ").getBytes()); 15 SendResult sendResult = pro .send(msg); 16 System.out.println(sendResult); 17 } 18 { 19 Message msg = new Message("TopicTest2", "TagB","OrderID0034",("Hello MetaQB").getBytes()); 20 SendResult sendResult = producer.send(msg); 21 System.out.println(sendResult); 22 } 23 { 24 Message msg = new Message("TopicTest3",// topic 25 "TagC",// tag 26 "OrderID061",// key 27 ("Hello MetaQC").getBytes());// body 28 SendResult sendResult = producer.send(msg); 29 System.out.println(sendResult); 30 } 31 }catch(Exception e){ 32 e.printStackTrace(); 33 } 34 TimeUnit.MILLSECONDS.sleep(1000); 35 } 36 37 /** 38 * 應用退出時,要調用shutdown來清理資源,關閉網絡連接,從MetaQ服務器上注銷自己 39 * 注意:我們建議應用在JBOSS、Tomcat等容器的退出鈎子里調用shutdown方法 40 */ 41 //producer.shutdown(); 42 Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { 43 public void run() { 44 producer.shutdown(); 45 } 46 })); 47 System.exit(0); 48 } 49 50 }
消費者
1 public class Consumer { 2 /** 3 * 當前例子是PushConsumer用法,使用方式給用戶感覺是消息從RocketMQ服務器推到了應用客戶端。<br> 4 * 但是實際PushConsumer內部是使用長輪詢Pull方式從MetaQ服務器拉消息,然后再回調用戶Listener方法<br> 5 */ 6 public static void main(String[] args) throws InterruptedException, 7 MQClientException{ 8 /** 9 * 一個應用創建一個Consumer,由應用來維護此對象,可以設置為全局對象或者單例<br> 10 * 注意:ConsumerGroupName需要由應用來保證唯一 11 */ 12 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( 13 "ConsumerGroupName"); 14 consumer.setNamesrvAddr("127.0.0.1:9876"); 15 consumer.setInstanceName("Consumber"); 16 17 /** 18 * 訂閱指定topic下tags分別等於TagA或TagC或TagD 19 */ 20 consumer.subscribe("TopicTest1","TagA || TagC || TagD"); 21 /** 22 * 訂閱指定topic下所有消息<br> 23 * 注意:一個consumer對象可以訂閱多個topic 24 */ 25 consumer.subscribe("TopicTest2","*"); 26 27 consumer.registerMessageListener(new MessageListenerConcurrently() { 28 29 public ConsumeConcurrentlyStatus consumeMessage( 30 List<MessageExt>msgs, ConsumeConcurrentlyContext context) { 31 32 System.out.println(Thread.currentThread().getName() 33 +" Receive New Messages: " + msgs.size()); 34 35 MessageExt msg = msgs.get(0); 36 if(msg.getTopic().equals("TopicTest1")) { 37 //執行TopicTest1的消費邏輯 38 if(msg.getTags() != null && msg.getTags().equals("TagA")) { 39 //執行TagA的消費 40 System.out.println(new String(msg.getBody())); 41 }else if (msg.getTags() != null 42 &&msg.getTags().equals("TagC")) { 43 //執行TagC的消費 44 System.out.println(new String(msg.getBody())); 45 }else if (msg.getTags() != null 46 &&msg.getTags().equals("TagD")) { 47 //執行TagD的消費 48 System.out.println(new String(msg.getBody())); 49 } 50 }else if (msg.getTopic().equals("TopicTest2")) { 51 System.out.println(new String(msg.getBody())); 52 } 53 54 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 55 56 } 57 }); 58 59 /** 60 * Consumer對象在使用之前必須要調用start初始化,初始化一次即可<br> 61 */ 62 consumer.start(); 63 64 System.out.println("ConsumerStarted."); 65 } 66 }
參考文章
l 《RocketMQ在windows上安裝和開發使用》http://blog.csdn.net/ruishenh/article/details/22390809
l 《RocketMQ在Windows平台下環境搭建》http://www.cnblogs.com/marcotan/p/4256859.html
l 《RocketMQ隨筆分類》http://www.cnblogs.com/marcotan/category/655319.html