Astrotrain概述
Astrotrain是基於阿里巴巴開源項目RocketMQ進行封裝的分布式消息中間件系統,提供集群環境下的消息生產和消費功能。
RocketMQ介紹
RocketMQ的物理部署結構

-
Name Server 是一個幾乎無狀態節點,可集群部署,節點之間無任何信息同步。所有的主題和broker節點信息都由Name Server進行維護。
-
Broker 是主要的功能單元,處理主題的存儲和消費邏輯,Broker會定時同步所有信息至Name Server。
- 一類Producer的集合名稱,這類Producer通常發送一類消息,且發送邏輯一致。
- 一類Consumer的集合名稱,這類Consumer通常消費一類消息,且消費邏輯一致。Consumer群組內多應用之間消息消費是競爭關系,Consumer群組之間是共享消費,這點非常重要。
RocketMQ存儲特點

- Broker上綁定具體的Topic。
- Topic下有多個物理存儲隊列(Queue),所有存儲隊列都會存儲消息,一個消息只會存儲一份。Topic可以分布在不同Broker上。
- 存儲隊列的選擇決定了消費的特性,如果只讀寫一個隊列,那么消費就是順序的了,否則會是無序的。
消費者Push和Pull的區別
-
Push模式下的消息是由事件觸發,有消息到達時監聽器會被調用(MessageListener)。
-
Pull模式下的消息可以由事件觸發,也可以應用主動去拉取消息,沒有消息可拉取時返回空。
RocketMQ設計文檔
Astrotrain介紹
Astrotrain-Client是對RocketMQ的Producer和Consumer的封裝,集中解決了RocketMQ諸多配置信息和使用特性,針對特定需求可以進行二次開發來進行擴展。
Astrotrain的整體結構

-
Basic Component是Astrotrain對RocketMQ的基礎封裝,里面包含的生產者和消費者處理消息的邏輯,同時處理了RocketMQ的很多配置信息。
-
JDBC Message Component是消息事務的功能模塊,依賴Basic Component的功能實現。
-
Astrotrain底層依賴RocketMQ提供消息服務。
-
業務層只面對Astrotrain暴露的服務。
Astrotrain的邏輯結構

-
ATClient定義最基礎的功能,包括客戶端的啟動、初始化、關閉的定義。
-
Pipe是生產者和消費者公共部分的抽象,與具體的Topic進行綁定。
- ATProducer是對生產者的封裝,提供基礎的消息發送服務。
-
ATConsumer是對消費者的封裝,提供基礎的消息消費服務(沒有具體的Pipe實現,因為消費是針對Listener的)。
- ATMessage是對外提供的消息體,目前只提供StringMessage,ObjectMessage,后續再進行擴展。
Astrotrain使用示例
對於Astrotrain的使用主要是基於astrotrain-client來實現,目前有兩個版本可供使用,1.0和1.0.1,他們之間的區別在於后者提供了一個批量消費的接口,其余的相同,所以以1.0.1的版本為例說。
注意:下面列出的所有配置文件astrotrain-client會默認從ClassPath中進行讀取,不需要顯式指定。
版本信息
- astrotrain-client-1.0.2版本,2015/02/05
1、開放了tag注冊,消費端新增 subscribe(topic,tags,listener) 方法
2、去除了對配置文件的依賴(astrotrain-produce/astrotrain-consumer/astrotrain),使用Spring注入風格注入屬性,生產者對應DefaultATProducer,消費者對應DefaultATPushConsumer這兩個實體
3、增加BytesMessage支持字節數據消息
4、在消費端可以正常獲取發送端設置的key和tag熟悉,通過ATMessage.getProperty() 方法,鍵值與發送端相同
Maven依賴
<
dependency
>
<
groupId
>com.zj</
groupId
>
<
artifactId
>astrotrain-client</
artifactId
>
<
version
>1.0.1</
version
>
</
dependency
>
|
生產端(Producer)
-
准備資源配置文件astrotrain-producer.properties,生產者配置
astrotrain-producer.properties#生產者群組名稱astrotrain.group.name=PleaseRename#應用實例名稱astrotrain.instance.name=ProducerAT#namesrv地址,多個之間以分號 ; 分隔astrotrain.namesrv.address=10.10.110.51:9876 -
准備資源文件astrotrain.properties,應用配置
astrotrain.properties#應用標志符astrotrain.appId=app1 -
Java代碼,准備POJO。
Order.java12345678910111213141516171819202122232425262728293031323334353637383940414243444546packagecom.zj.astrotrain.demo;importjava.io.Serializable;importjava.util.List;publicclassOrderimplementsSerializable {/****/privatestaticfinallongserialVersionUID = 1L;privatelongid;privateString orderId;privateString cardNo;privateList<String> payments;publiclonggetId() {returnid;}publicvoidsetId(longid) {this.id = id;}publicString getOrderId() {returnorderId;}publicvoidsetOrderId(String orderId) {this.orderId = orderId;}publicString getCardNo() {returncardNo;}publicvoidsetCardNo(String cardNo) {this.cardNo = cardNo;}publicList<String> getPayments() {returnpayments;}publicvoidsetPayments(List<String> payments) {this.payments = payments;}publicString toString(){return"Order [id="+this.id +",orderId="+this.orderId +",cardNo="+this.cardNo+",payments="+this.payments ==null?null:this.payments.size() +"]";}} -
生產者代碼
ProduerDemon.java123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105packagecom.zj.astrotrain.demo;importjava.text.DateFormat;importjava.text.SimpleDateFormat;importjava.util.ArrayList;importjava.util.Date;importjava.util.List;importcom.alibaba.fastjson.JSON;importcom.zj.astrotrain.client.ATMessage;importcom.zj.astrotrain.client.ATProducer;importcom.zj.astrotrain.client.exceptions.ATException;importcom.zj.astrotrain.client.message.ObjectMessage;importcom.zj.astrotrain.client.message.StringMessage;importcom.zj.astrotrain.client.producer.DefaultATProducer;/*** 生成者示例* <pre>需要在 ClassPath 路徑下准備 astrotrain-producer.properties 具體配置信息參考 src/main/resources下的配置</pre>***/publicclassProducerDemon {privateDefaultATProducer atProducer;publicProducerDemon() {}publicvoidsetUp() {this.atProducer =newDefaultATProducer();try{this.atProducer.start();}catch(Exception e) {e.printStackTrace();}}/*** 發送字符類型的消息*/publicvoiddoStringMessage() {//獲取一個生成者通道,指定TopicATProducer producer =this.atProducer.createProducer("demo");DateFormat format =newSimpleDateFormat("yyyyMMddHHmmss");for(inti =0; i <100; i++){Order order =newOrder();order.setId(i);order.setOrderId(format.format(newDate()) +"_"+ i);order.setCardNo("6221202000111112222");//新建一個StringMessageStringMessage msg =newStringMessage(JSON.toJSONString(order));//為消息設置一個業務標識符,最好是唯一的,方便在調試程序時進行跟蹤,可選屬性.msg.setProperty(ATMessage.MSG_KEYS, order.getOrderId());try{//進行消息發送,不拋異常的都是正常發送.除非服務端程序Crash,不然不會丟失消息producer.send(msg);}catch(ATException e) {e.printStackTrace();}}}/*** 發送對象類型的消息*/publicvoiddoObjectMessage() {//使用新的主題創建生產者通道ATProducer producer =this.atProducer.createProducer("demo");DateFormat format =newSimpleDateFormat("yyyyMMddHHmmss");for(inti =0; i<100; i++) {//新建一個ObjectMessageObjectMessage msg =newObjectMessage();Order order =newOrder();order.setId(i);order.setOrderId(format.format(newDate()) +"_"+ i);order.setCardNo("6221202000111112222"+ i);List<String> payments =newArrayList<String>();payments.add("payment"+ i);order.setPayments(payments);//設置對象msg.putObject(order);//為消息設置一個業務標識符,最好是唯一的,方便在調試程序時進行跟蹤,可選屬性.msg.setProperty(ATMessage.MSG_KEYS, order.getOrderId());try{//進行消息發送,不拋異常的都是正常發送.除非服務端程序Crash,不然不會丟失消息producer.send(msg);}catch(ATException e) {e.printStackTrace();}}}publicvoidshutdown() {if(this.atProducer !=null){this.atProducer.shutdown();}}publicstaticvoidmain(String[] args) {ProducerDemon demon =newProducerDemon();demon.setUp();demon.doStringMessage();demon.doObjectMessage();demon.shutdown();}}
消費端(Consumer)
-
准備資源文件astrotrain-consumer.properties,消費者配置。
astrotrain-consumer.properties#消費者群組名稱,與生產者群組沒有關聯astrotrain.group.name=PleaseRename#消費者示例名稱astrotrain.instance.name=ConsumerATbatch#namesrv的地址,多個以分號 ; 分隔astrotrain.namesrv.address=10.10.110.51:9876#消費模式,CLUSTERING and BROADCASTING,defaultis CLUSTERINGastrotrain.consumer.messageModel=CLUSTERING#消費者啟動時從那個位置開始消費astrotrain.consumer.consumeFromWhere=CONSUME_FROM_FIRST_OFFSET#消費者線程最小數astrotrain.consumer.consumeThreadMin=10#消費者線程最大數astrotrain.consumer.consumeThreadMax=20#單次消費時一次性消費多少條消息,批量消費接口才有用,可選配置。#astrotrain.consumer.batchMaxSize=30#消費者去broker拉取消息時,一次拉取多少條。可選配置。#astrotrain.consumer.pullBatchSize=100#每次拉取消息的間隔,默認為0,可選配置/#astrotrain.consumer.pullInterval=1000 -
准備資源文件astrotrain.properties,應用配置
astrotrain.properties#應用標志符astrotrain.appId=app2 -
消費者,單個消息消費
ConsumerDemon.java1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374packagecom.zj.astrotrain.demo;importcom.alibaba.fastjson.JSON;importcom.alibaba.rocketmq.client.exception.MQClientException;importcom.zj.astrotrain.client.ATMessage;importcom.zj.astrotrain.client.MessageListener;importcom.zj.astrotrain.client.consumer.DefaultATPushConsumer;importcom.zj.astrotrain.client.message.ObjectMessage;importcom.zj.astrotrain.client.message.StringMessage;/*** 單個消息消費***/publicclassConsumerDemon {privateDefaultATPushConsumer atPushConsumer;publicConsumerDemon() {}publicvoidsetUp() {this.atPushConsumer =newDefaultATPushConsumer();try{//訂閱必須在start之前this.atPushConsumer.subscribe("demo",newDemonMessageListener());}catch(MQClientException e) {e.printStackTrace();}}publicvoidstart() {if(this.atPushConsumer !=null) {try{this.atPushConsumer.start();System.in.read();//按任意鍵退出}catch(Exception e) {e.printStackTrace();}}}publicvoidshutdown() {if(this.atPushConsumer !=null){this.atPushConsumer.shutdown();}}publicstaticvoidmain(String[] args) {ConsumerDemon demon =newConsumerDemon();demon.setUp();demon.start();demon.shutdown();}//單個消息監聽接口publicclassDemonMessageListenerimplementsMessageListener {@OverridepublicvoidonMessage(ATMessage message) {try{if(messageinstanceofStringMessage){StringMessage msg = (StringMessage) message;Order order = JSON.parseObject(msg.getMsg(), Order.class);System.out.println(order.getCardNo());}elseif(messageinstanceofObjectMessage) {ObjectMessage msg = (ObjectMessage) message;Order order = (Order) msg.getObject();System.out.println(order.getCardNo());}}catch(Exception e) {e.printStackTrace();}}}} -
消費者批量消費
ConsumerBatchDemon123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384packagecom.zj.astrotrain.demo;importjava.util.List;importcom.alibaba.fastjson.JSON;importcom.alibaba.rocketmq.client.exception.MQClientException;importcom.zj.astrotrain.client.ATMessage;importcom.zj.astrotrain.client.ConcurrentlyMessageListener;importcom.zj.astrotrain.client.consumer.DefaultATPushConsumer;importcom.zj.astrotrain.client.message.ObjectMessage;importcom.zj.astrotrain.client.message.StringMessage;/*** 消費者批量消費示例* <pre>需要在 ClassPath 路徑下准備 astrotrain-consumer.properties 具體配置信息參考 src/main/resources下的配置</pre>***/publicclassConsumerBatchDemon {privateDefaultATPushConsumer atPushConsumer;publicConsumerBatchDemon() {}publicvoidsetUp() {this.atPushConsumer =newDefaultATPushConsumer();try{//訂閱必須在start之前this.atPushConsumer.subscribe("demo",newDemonConcurrentlyMessageListener());}catch(MQClientException e) {e.printStackTrace();}}publicvoidstart() {if(this.atPushConsumer !=null) {try{this.atPushConsumer.start();System.in.read();//按任意鍵退出}catch(Exception e) {e.printStackTrace();}}}publicvoidshutdown() {if(this.atPushConsumer !=null){this.atPushConsumer.shutdown();}}publicstaticvoidmain(String[] args) {ConsumerBatchDemon demon =newConsumerBatchDemon();demon.setUp();demon.start();demon.shutdown();}/*** 消息監聽***/publicclassDemonConcurrentlyMessageListenerimplementsConcurrentlyMessageListener{@OverridepublicvoidonMessage(List<ATMessage> msgs) {for(ATMessage message : msgs) {try{if(messageinstanceofStringMessage){StringMessage msg = (StringMessage) message;Order order = JSON.parseObject(msg.getMsg(), Order.class);System.out.println(order.getCardNo());}elseif(messageinstanceofObjectMessage) {ObjectMessage msg = (ObjectMessage) message;Order order = (Order) msg.getObject();System.out.println(order.getCardNo());}}catch(Exception e) {e.printStackTrace();}}}}}
注意事項
- 訂閱時不要加上tag,以*注冊所有tag,然后再做篩選,訂閱時附上tag會導致先注冊的相同topic不能正常消費
- 在開發環境里Topic和SubGroup都是自動創建的,生產上是需要手動創建的,所以上線之前一定要檢查對應的主題和訂閱組是否已創建
