RocketMQ 筆記-轉


Astrotrain概述

Astrotrain是基於阿里巴巴開源項目RocketMQ進行封裝的分布式消息中間件系統,提供集群環境下的消息生產和消費功能。

RocketMQ介紹

RocketMQ的物理部署結構

  • Name Server 是一個幾乎無狀態節點,可集群部署,節點之間無任何信息同步。所有的主題和broker節點信息都由Name Server進行維護。

  • Broker 是主要的功能單元,處理主題的存儲和消費邏輯,Broker會定時同步所有信息至Name Server。

  • 一類Producer的集合名稱,這類Producer通常發送一類消息,且發送邏輯一致。
  • 一類Consumer的集合名稱,這類Consumer通常消費一類消息,且消費邏輯一致。Consumer群組內多應用之間消息消費是競爭關系,Consumer群組之間是共享消費,這點非常重要。

RocketMQ存儲特點

  • Broker上綁定具體的Topic。
  • Topic下有多個物理存儲隊列(Queue),所有存儲隊列都會存儲消息,一個消息只會存儲一份。Topic可以分布在不同Broker上。
  • 存儲隊列的選擇決定了消費的特性,如果只讀寫一個隊列,那么消費就是順序的了,否則會是無序的。

消費者Push和Pull的區別

  • Push模式下的消息是由事件觸發,有消息到達時監聽器會被調用(MessageListener)。

  • Pull模式下的消息可以由事件觸發,也可以應用主動去拉取消息,沒有消息可拉取時返回空。

RocketMQ設計文檔

RocketMQ_design.pdf

Astrotrain介紹

Astrotrain-Client是對RocketMQ的Producer和Consumer的封裝,集中解決了RocketMQ諸多配置信息和使用特性,針對特定需求可以進行二次開發來進行擴展。

Astrotrain的整體結構

  • Basic Component是Astrotrain對RocketMQ的基礎封裝,里面包含的生產者和消費者處理消息的邏輯,同時處理了RocketMQ的很多配置信息。

  • JDBC Message Component是消息事務的功能模塊,依賴Basic Component的功能實現。

  • Astrotrain底層依賴RocketMQ提供消息服務。

  • 業務層只面對Astrotrain暴露的服務。

Astrotrain的邏輯結構

  • ATClient定義最基礎的功能,包括客戶端的啟動、初始化、關閉的定義。

  • Pipe是生產者和消費者公共部分的抽象,與具體的Topic進行綁定。

  • ATProducer是對生產者的封裝,提供基礎的消息發送服務。
  • ATConsumer是對消費者的封裝,提供基礎的消息消費服務(沒有具體的Pipe實現,因為消費是針對Listener的)。

  • ATMessage是對外提供的消息體,目前只提供StringMessage,ObjectMessage,后續再進行擴展。

Astrotrain使用示例

對於Astrotrain的使用主要是基於astrotrain-client來實現,目前有兩個版本可供使用,1.0和1.0.1,他們之間的區別在於后者提供了一個批量消費的接口,其余的相同,所以以1.0.1的版本為例說。

注意:下面列出的所有配置文件astrotrain-client會默認從ClassPath中進行讀取,不需要顯式指定。

 

版本信息

  1. astrotrain-client-1.0.2版本,2015/02/05
    1、開放了tag注冊,消費端新增 subscribe(topic,tags,listener) 方法
    2、去除了對配置文件的依賴(astrotrain-produce/astrotrain-consumer/astrotrain),使用Spring注入風格注入屬性,生產者對應DefaultATProducer,消費者對應DefaultATPushConsumer這兩個實體
    3、增加BytesMessage支持字節數據消息
    4、在消費端可以正常獲取發送端設置的key和tag熟悉,通過ATMessage.getProperty() 方法,鍵值與發送端相同

 

Maven依賴

 

maven
< dependency >
     < groupId >com.zj</ groupId >
     < artifactId >astrotrain-client</ artifactId >
     < version >1.0.1</ version >
</ dependency >

 

生產端(Producer)

  1. 准備資源配置文件astrotrain-producer.properties,生產者配置

    astrotrain-producer.properties
    #生產者群組名稱
    astrotrain.group.name=PleaseRename
    #應用實例名稱
    astrotrain.instance.name=ProducerAT
    #namesrv地址,多個之間以分號 ; 分隔
    astrotrain.namesrv.address= 10.10 . 110.51 : 9876
  2. 准備資源文件astrotrain.properties,應用配置

    astrotrain.properties
    #應用標志符
    astrotrain.appId=app1
  3. Java代碼,准備POJO。

    Order.java
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    package  com.zj.astrotrain.demo;
    import  java.io.Serializable;
    import  java.util.List;
    public  class  Order  implements  Serializable {
         /**
          *
          */
         private  static  final  long  serialVersionUID = 1L;
         
         private  long  id;
         private  String orderId;
         private  String cardNo;
         private  List<String> payments;
         
         public  long  getId() {
             return  id;
         }
         public  void  setId( long  id) {
             this .id = id;
         }
         public  String getOrderId() {
             return  orderId;
         }
         public  void  setOrderId(String orderId) {
             this .orderId = orderId;
         }
         public  String getCardNo() {
             return  cardNo;
         }
         public  void  setCardNo(String cardNo) {
             this .cardNo = cardNo;
         }
         
         public  List<String> getPayments() {
             return  payments;
         }
         public  void  setPayments(List<String> payments) {
             this .payments = payments;
         }
         
         public  String toString(){
             return  "Order [id="  this .id +  ",orderId="
                     this .orderId +  ",cardNo="  this .cardNo
                     ",payments="  this .payments ==  null  null  this .payments.size() +  "]" ;
         }
    }

     

     

  4. 生產者代碼

    ProduerDemon.java
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    package  com.zj.astrotrain.demo;
    import  java.text.DateFormat;
    import  java.text.SimpleDateFormat;
    import  java.util.ArrayList;
    import  java.util.Date;
    import  java.util.List;
    import  com.alibaba.fastjson.JSON;
    import  com.zj.astrotrain.client.ATMessage;
    import  com.zj.astrotrain.client.ATProducer;
    import  com.zj.astrotrain.client.exceptions.ATException;
    import  com.zj.astrotrain.client.message.ObjectMessage;
    import  com.zj.astrotrain.client.message.StringMessage;
    import  com.zj.astrotrain.client.producer.DefaultATProducer;
    /**
      * 生成者示例
      * <pre>需要在 ClassPath 路徑下准備 astrotrain-producer.properties 具體配置信息參考 src/main/resources下的配置</pre>
      *
      *
      */
    public  class  ProducerDemon {
         private  DefaultATProducer atProducer;
     
         public  ProducerDemon() {
     
         }
     
         public  void  setUp() {
             this .atProducer =  new  DefaultATProducer();
             try  {
                 this .atProducer.start();
             catch  (Exception e) {
                 e.printStackTrace();
             }
         }
     
         /**
          * 發送字符類型的消息
          */
         public  void  doStringMessage() {
             //獲取一個生成者通道,指定Topic
             ATProducer producer =  this .atProducer.createProducer( "demo" );
             DateFormat format =  new  SimpleDateFormat( "yyyyMMddHHmmss" );
             for ( int  i =  0 ; i <  100 ; i++){
                 Order order =  new  Order();
                 order.setId(i);
                 order.setOrderId(format.format( new  Date()) +  "_"  + i);
                 order.setCardNo( "6221202000111112222" );
                 //新建一個StringMessage
                 StringMessage msg =  new  StringMessage(JSON.toJSONString(order));
                 //為消息設置一個業務標識符,最好是唯一的,方便在調試程序時進行跟蹤,可選屬性.
                 msg.setProperty(ATMessage.MSG_KEYS, order.getOrderId());
                 try  {
                     //進行消息發送,不拋異常的都是正常發送.除非服務端程序Crash,不然不會丟失消息
                     producer.send(msg);
                 catch  (ATException e) {
                     e.printStackTrace();
                 }
             }
         }
     
         /**
          * 發送對象類型的消息
          */
         public  void  doObjectMessage() {
             //使用新的主題創建生產者通道
             ATProducer producer =  this .atProducer.createProducer( "demo" );
             DateFormat format =  new  SimpleDateFormat( "yyyyMMddHHmmss" );
             for ( int  i =  0 ; i<  100 ; i++) {
                 //新建一個ObjectMessage
                 ObjectMessage msg =  new  ObjectMessage();
                 Order order =  new  Order();
                 order.setId(i);
                 order.setOrderId(format.format( new  Date()) +  "_"  + i);
                 order.setCardNo( "6221202000111112222"  + i);
                 List<String> payments =  new  ArrayList<String>();
                 payments.add( "payment"  + i);
                 order.setPayments(payments);
                 //設置對象
                 msg.putObject(order);
                 //為消息設置一個業務標識符,最好是唯一的,方便在調試程序時進行跟蹤,可選屬性.
                 msg.setProperty(ATMessage.MSG_KEYS, order.getOrderId());
                 try  {
                     //進行消息發送,不拋異常的都是正常發送.除非服務端程序Crash,不然不會丟失消息
                     producer.send(msg);
                 catch  (ATException e) {
                     e.printStackTrace();
                 }
             }
         }
     
         public  void  shutdown() {
             if ( this .atProducer !=  null ){
                 this .atProducer.shutdown();
             }
         }
     
         public  static  void  main(String[] args) {
             ProducerDemon demon =  new  ProducerDemon();
             demon.setUp();
             demon.doStringMessage();
             demon.doObjectMessage();
             demon.shutdown();
         }
     
    }

消費端(Consumer)

  1. 准備資源文件astrotrain-consumer.properties,消費者配置。

    astrotrain-consumer.properties
    #消費者群組名稱,與生產者群組沒有關聯
    astrotrain.group.name=PleaseRename
    #消費者示例名稱
    astrotrain.instance.name=ConsumerATbatch
    #namesrv的地址,多個以分號 ; 分隔
    astrotrain.namesrv.address= 10.10 . 110.51 : 9876
    #消費模式,CLUSTERING and BROADCASTING,  default  is CLUSTERING
    astrotrain.consumer.messageModel=CLUSTERING
    #消費者啟動時從那個位置開始消費
    astrotrain.consumer.consumeFromWhere=CONSUME_FROM_FIRST_OFFSET
    #消費者線程最小數
    astrotrain.consumer.consumeThreadMin= 10
    #消費者線程最大數
    astrotrain.consumer.consumeThreadMax= 20
    #單次消費時一次性消費多少條消息,批量消費接口才有用,可選配置。
    #astrotrain.consumer.batchMaxSize= 30
    #消費者去broker拉取消息時,一次拉取多少條。可選配置。
    #astrotrain.consumer.pullBatchSize= 100
    #每次拉取消息的間隔,默認為 0 ,可選配置/
    #astrotrain.consumer.pullInterval= 1000
  2. 准備資源文件astrotrain.properties,應用配置

    astrotrain.properties
    #應用標志符
    astrotrain.appId=app2
  3. 消費者,單個消息消費

    ConsumerDemon.java
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    package  com.zj.astrotrain.demo;
    import  com.alibaba.fastjson.JSON;
    import  com.alibaba.rocketmq.client.exception.MQClientException;
    import  com.zj.astrotrain.client.ATMessage;
    import  com.zj.astrotrain.client.MessageListener;
    import  com.zj.astrotrain.client.consumer.DefaultATPushConsumer;
    import  com.zj.astrotrain.client.message.ObjectMessage;
    import  com.zj.astrotrain.client.message.StringMessage;
    /**
      * 單個消息消費
      *
      *
      */
    public  class  ConsumerDemon {
         private  DefaultATPushConsumer atPushConsumer;
         
         public  ConsumerDemon() {
             
         }
         
         public  void  setUp() {
             this .atPushConsumer =  new  DefaultATPushConsumer();
             try  {
                 //訂閱必須在start之前
                 this .atPushConsumer.subscribe( "demo" new  DemonMessageListener());
             catch  (MQClientException e) {
                 e.printStackTrace();
             }
         }
         
         public  void  start() {
             if ( this .atPushConsumer !=  null ) {
                 try  {
                     this .atPushConsumer.start();
                     System.in.read(); //按任意鍵退出
                 catch  (Exception e) {
                     e.printStackTrace();
                 }
             }
         }
         
         public  void  shutdown() {
             if ( this .atPushConsumer !=  null ){
                 this .atPushConsumer.shutdown();
             }
         }
         
         public  static  void  main(String[] args) {
             ConsumerDemon demon =  new  ConsumerDemon();
             demon.setUp();
             demon.start();
             demon.shutdown();
         }
         //單個消息監聽接口
         public  class  DemonMessageListener  implements  MessageListener {
             @Override
             public  void  onMessage(ATMessage message) {
                 try  {
                     if (message  instanceof  StringMessage){
                         StringMessage msg = (StringMessage) message;
                         Order order = JSON.parseObject(msg.getMsg(), Order. class );
                         System.out.println(order.getCardNo());
                     else  if (message  instanceof  ObjectMessage) {
                         ObjectMessage msg = (ObjectMessage) message;
                         Order order = (Order) msg.getObject();
                         System.out.println(order.getCardNo());
                     }
                 catch (Exception e) {
                     e.printStackTrace();
                 }
             }
             
         }
    }
  4. 消費者批量消費

    ConsumerBatchDemon
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    package  com.zj.astrotrain.demo;
    import  java.util.List;
    import  com.alibaba.fastjson.JSON;
    import  com.alibaba.rocketmq.client.exception.MQClientException;
    import  com.zj.astrotrain.client.ATMessage;
    import  com.zj.astrotrain.client.ConcurrentlyMessageListener;
    import  com.zj.astrotrain.client.consumer.DefaultATPushConsumer;
    import  com.zj.astrotrain.client.message.ObjectMessage;
    import  com.zj.astrotrain.client.message.StringMessage;
    /**
      * 消費者批量消費示例
      * <pre>需要在 ClassPath 路徑下准備 astrotrain-consumer.properties 具體配置信息參考 src/main/resources下的配置</pre>
      *
      *
      */
    public  class  ConsumerBatchDemon {
         private  DefaultATPushConsumer atPushConsumer;
         
         public  ConsumerBatchDemon() {
             
         }
         
         public  void  setUp() {
             this .atPushConsumer =  new  DefaultATPushConsumer();
             try  {
                 //訂閱必須在start之前
                 this .atPushConsumer.subscribe( "demo" new  DemonConcurrentlyMessageListener());
             catch  (MQClientException e) {
                 e.printStackTrace();
             }
         }
         
         public  void  start() {
             if ( this .atPushConsumer !=  null ) {
                 try  {
                     this .atPushConsumer.start();
                     System.in.read(); //按任意鍵退出
                 catch  (Exception e) {
                     e.printStackTrace();
                 }
             }
         }
         
         public  void  shutdown() {
             if ( this .atPushConsumer !=  null ){
                 this .atPushConsumer.shutdown();
             }
         }
         
         public  static  void  main(String[] args) {
             ConsumerBatchDemon demon =  new  ConsumerBatchDemon();
             demon.setUp();
             demon.start();
             demon.shutdown();
         }
         
         /**
          * 消息監聽
          *
          *
          */
         public  class  DemonConcurrentlyMessageListener  implements  ConcurrentlyMessageListener{
             @Override
             public  void  onMessage(List<ATMessage> msgs) {
                 for (ATMessage message : msgs) {
                     try  {
                         if (message  instanceof  StringMessage){
                             StringMessage msg = (StringMessage) message;
                             Order order = JSON.parseObject(msg.getMsg(), Order. class );
                             System.out.println(order.getCardNo());
                         else  if (message  instanceof  ObjectMessage) {
                             ObjectMessage msg = (ObjectMessage) message;
                             Order order = (Order) msg.getObject();
                             System.out.println(order.getCardNo());
                         }
                     catch (Exception e) {
                         e.printStackTrace();
                     }
                 }
             }
             
         }
         
    }

注意事項

    1. 訂閱時不要加上tag,以*注冊所有tag,然后再做篩選,訂閱時附上tag會導致先注冊的相同topic不能正常消費
    2. 在開發環境里Topic和SubGroup都是自動創建的,生產上是需要手動創建的,所以上線之前一定要檢查對應的主題和訂閱組是否已創建


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM