《RocketMQ 安裝和使用》


安裝Maven

安裝步驟:《Maven的安裝、配置及使用入門》

  http://www.cnblogs.com/dcba1112/archive/2011/05/01/2033805.html

  http://maven.apache.org/download.cgi (apache-maven-3.3.3-bin.zip)

  Path環境變量,當我們在cmd中輸入命令時,Windows首先會在當前目錄中尋找可執行文件或腳本,如果沒有找到,Windows會接着遍歷環境變量Path中定義的路徑。由於我們將%M2_HOME%\bin添加到了Path中,而這里%M2_HOME%實際上是引用了我們前面定義的另一個變量,其值是Maven的安裝目錄。因此,Windows會在執行命令時搜索目錄D:\bin\apache-maven-3.0\bin,而mvn執行腳本的位置就是這里。

 

安裝RocketMQ

源碼地址:(2015-07-18 3.2.6版本)

  E:\RocketMQ\RocketMQ-master

  E:\RocketMQ\RocketMQ_Workspace

  E:\RocketMQ\RocketMQ-master\target\alibaba-rocketmq-3.2.6-alibaba-rocketmq\alibaba-rocketmq\bin

 

執行下邊的命令或者執行install.bat(在這個bat文件中的命令如下)對maven熟悉的一眼就知道是執行clean package install assembly等操作。

  mvn -Dmaven.test.skip=true clean packageinstall assembly:assembly –U

 

將RocketMQ-master導入到eclipse中

    

  進入剛生成的target文件夾下的bin目錄,在命令行中執行:start mqnamesrv.exe,會彈出一個信息窗口,顯示The name Server boot success 說明啟動成功了,接着啟動borker,在命令行中執行:start mqbroker.exe -n 127.0.0.1:9876 同樣的彈出一個窗口,看到success表示成功了。

  start mqnamesrv >E:\RocketMQ\logs\alibaba-rocketmq/mqnamesrv.log

【遇到問題】無法啟動mqnamesrv。顯示軟件不兼容。

  

 

【如何在windows下使用linux的shell腳本】
  http://www.cygwin.com/。

  E:\cygwin64。

【當前目錄調出CMD】

  在桌面上先按住Shift鍵,然后鼠標右鍵,出現選項“在此處打開命令窗口(W)”也可以打開命令行。

生產者

 1 public class Producer{
 2     public static void main(String[] args) throws MQClientException,InterruptedException{
 3         //一個應用創建一個Producer,由應用來維護此對象,可以設置為全局對象或者單例
 4         final DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
 5         producer.setNamesevAddr("127.0.0.1:9876");
 6         producer.setInstanceName("Producer");
 7         
 8         producer.start();
 9         
10         //一個Producer對象可以發送多個topic,多個tag的消息
11         for(int i=0; i<10; i++){
12             try{
13                 {
14                     Message msg = new Message("TopicTest1","TagA","OrderID001",("Hello MetaQ").getBytes());
15                     SendResult sendResult = pro .send(msg);
16                     System.out.println(sendResult);
17                 }
18                 {
19                     Message msg = new Message("TopicTest2", "TagB","OrderID0034",("Hello MetaQB").getBytes());
20                     SendResult sendResult = producer.send(msg);    
21                     System.out.println(sendResult);  
22                 }
23                 {    
24                     Message msg = new Message("TopicTest3",// topic    
25                         "TagC",// tag    
26                         "OrderID061",// key    
27                         ("Hello MetaQC").getBytes());// body    
28                     SendResult sendResult = producer.send(msg);    
29                     System.out.println(sendResult);    
30                 }   
31             }catch(Exception e){
32                 e.printStackTrace();
33             }
34             TimeUnit.MILLSECONDS.sleep(1000);
35         }
36         
37       /**  
38    * 應用退出時,要調用shutdown來清理資源,關閉網絡連接,從MetaQ服務器上注銷自己  
39    * 注意:我們建議應用在JBOSS、Tomcat等容器的退出鈎子里調用shutdown方法  
40    */    
41 //producer.shutdown();    
42   Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {    
43      public void run() {    
44         producer.shutdown();    
45      }    
46   }));    
47   System.exit(0);      
48     }
49 
50 }

消費者

 1 public class Consumer {    
 2      /**   
 3      * 當前例子是PushConsumer用法,使用方式給用戶感覺是消息從RocketMQ服務器推到了應用客戶端。<br>   
 4      * 但是實際PushConsumer內部是使用長輪詢Pull方式從MetaQ服務器拉消息,然后再回調用戶Listener方法<br>   
 5      */      
 6     public static void main(String[] args) throws InterruptedException,      
 7                        MQClientException{      
 8               /**   
 9                * 一個應用創建一個Consumer,由應用來維護此對象,可以設置為全局對象或者單例<br>   
10                * 注意:ConsumerGroupName需要由應用來保證唯一   
11                */      
12               DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(      
13                                 "ConsumerGroupName");      
14               consumer.setNamesrvAddr("127.0.0.1:9876");      
15               consumer.setInstanceName("Consumber");      
16     
17               /**   
18                * 訂閱指定topic下tags分別等於TagA或TagC或TagD   
19                */      
20               consumer.subscribe("TopicTest1","TagA || TagC || TagD");      
21               /**   
22                * 訂閱指定topic下所有消息<br>   
23                * 注意:一個consumer對象可以訂閱多個topic   
24                */      
25               consumer.subscribe("TopicTest2","*");      
26     
27               consumer.registerMessageListener(new MessageListenerConcurrently() {      
28     
29                        public ConsumeConcurrentlyStatus consumeMessage(      
30                                           List<MessageExt>msgs, ConsumeConcurrentlyContext context) {      
31     
32                                 System.out.println(Thread.currentThread().getName()      
33                                                    +" Receive New Messages: " + msgs.size());      
34     
35                                 MessageExt msg = msgs.get(0);      
36                                 if(msg.getTopic().equals("TopicTest1")) {      
37                                           //執行TopicTest1的消費邏輯      
38                                           if(msg.getTags() != null && msg.getTags().equals("TagA")) {      
39                                                    //執行TagA的消費      
40                                                    System.out.println(new String(msg.getBody()));      
41                                           }else if (msg.getTags() != null      
42                                                             &&msg.getTags().equals("TagC")) {      
43                                                    //執行TagC的消費      
44                                                    System.out.println(new String(msg.getBody()));      
45                                           }else if (msg.getTags() != null      
46                                                             &&msg.getTags().equals("TagD")) {      
47                                                    //執行TagD的消費      
48                                                    System.out.println(new String(msg.getBody()));      
49                                           }      
50                                 }else if (msg.getTopic().equals("TopicTest2")) {      
51                                           System.out.println(new String(msg.getBody()));      
52                                 }      
53     
54                                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;      
55     
56                        }      
57               });      
58     
59               /**   
60                * Consumer對象在使用之前必須要調用start初始化,初始化一次即可<br>   
61                */      
62               consumer.start();      
63     
64               System.out.println("ConsumerStarted.");      
65     }      
66 }    

 

參考文章

l  《RocketMQ在windows上安裝和開發使用》http://blog.csdn.net/ruishenh/article/details/22390809

l  《RocketMQ在Windows平台下環境搭建》http://www.cnblogs.com/marcotan/p/4256859.html

l  《RocketMQ隨筆分類》http://www.cnblogs.com/marcotan/category/655319.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM