Astrotrain概述
Astrotrain是基於阿里巴巴開源項目RocketMQ進行封裝的分布式消息中間件系統,提供集群環境下的消息生產和消費功能。
RocketMQ介紹
RocketMQ的物理部署結構
-
Name Server 是一個幾乎無狀態節點,可集群部署,節點之間無任何信息同步。所有的主題和broker節點信息都由Name Server進行維護。
-
Broker 是主要的功能單元,處理主題的存儲和消費邏輯,Broker會定時同步所有信息至Name Server。
- 一類Producer的集合名稱,這類Producer通常發送一類消息,且發送邏輯一致。
- 一類Consumer的集合名稱,這類Consumer通常消費一類消息,且消費邏輯一致。Consumer群組內多應用之間消息消費是競爭關系,Consumer群組之間是共享消費,這點非常重要。
RocketMQ存儲特點
- Broker上綁定具體的Topic。
- Topic下有多個物理存儲隊列(Queue),所有存儲隊列都會存儲消息,一個消息只會存儲一份。Topic可以分布在不同Broker上。
- 存儲隊列的選擇決定了消費的特性,如果只讀寫一個隊列,那么消費就是順序的了,否則會是無序的。
消費者Push和Pull的區別
-
Push模式下的消息是由事件觸發,有消息到達時監聽器會被調用(MessageListener)。
-
Pull模式下的消息可以由事件觸發,也可以應用主動去拉取消息,沒有消息可拉取時返回空。
RocketMQ設計文檔
Astrotrain介紹
Astrotrain-Client是對RocketMQ的Producer和Consumer的封裝,集中解決了RocketMQ諸多配置信息和使用特性,針對特定需求可以進行二次開發來進行擴展。
Astrotrain的整體結構
-
Basic Component是Astrotrain對RocketMQ的基礎封裝,里面包含的生產者和消費者處理消息的邏輯,同時處理了RocketMQ的很多配置信息。
-
JDBC Message Component是消息事務的功能模塊,依賴Basic Component的功能實現。
-
Astrotrain底層依賴RocketMQ提供消息服務。
-
業務層只面對Astrotrain暴露的服務。
Astrotrain的邏輯結構
-
ATClient定義最基礎的功能,包括客戶端的啟動、初始化、關閉的定義。
-
Pipe是生產者和消費者公共部分的抽象,與具體的Topic進行綁定。
- ATProducer是對生產者的封裝,提供基礎的消息發送服務。
-
ATConsumer是對消費者的封裝,提供基礎的消息消費服務(沒有具體的Pipe實現,因為消費是針對Listener的)。
- ATMessage是對外提供的消息體,目前只提供StringMessage,ObjectMessage,后續再進行擴展。
Astrotrain使用示例
對於Astrotrain的使用主要是基於astrotrain-client來實現,目前有兩個版本可供使用,1.0和1.0.1,他們之間的區別在於后者提供了一個批量消費的接口,其余的相同,所以以1.0.1的版本為例說。
注意:下面列出的所有配置文件astrotrain-client會默認從ClassPath中進行讀取,不需要顯式指定。
版本信息
- astrotrain-client-1.0.2版本,2015/02/05
1、開放了tag注冊,消費端新增 subscribe(topic,tags,listener) 方法
2、去除了對配置文件的依賴(astrotrain-produce/astrotrain-consumer/astrotrain),使用Spring注入風格注入屬性,生產者對應DefaultATProducer,消費者對應DefaultATPushConsumer這兩個實體
3、增加BytesMessage支持字節數據消息
4、在消費端可以正常獲取發送端設置的key和tag熟悉,通過ATMessage.getProperty() 方法,鍵值與發送端相同
Maven依賴
<
dependency
>
<
groupId
>com.zj</
groupId
>
<
artifactId
>astrotrain-client</
artifactId
>
<
version
>1.0.1</
version
>
</
dependency
>
|
生產端(Producer)
-
准備資源配置文件astrotrain-producer.properties,生產者配置
astrotrain-producer.properties#生產者群組名稱
astrotrain.group.name=PleaseRename
#應用實例名稱
astrotrain.instance.name=ProducerAT
#namesrv地址,多個之間以分號 ; 分隔
astrotrain.namesrv.address=
10.10
.
110.51
:
9876
-
准備資源文件astrotrain.properties,應用配置
astrotrain.properties#應用標志符
astrotrain.appId=app1
-
Java代碼,准備POJO。
Order.java12345678910111213141516171819202122232425262728293031323334353637383940414243444546package
com.zj.astrotrain.demo;
import
java.io.Serializable;
import
java.util.List;
public
class
Order
implements
Serializable {
/**
*
*/
private
static
final
long
serialVersionUID = 1L;
private
long
id;
private
String orderId;
private
String cardNo;
private
List<String> payments;
public
long
getId() {
return
id;
}
public
void
setId(
long
id) {
this
.id = id;
}
public
String getOrderId() {
return
orderId;
}
public
void
setOrderId(String orderId) {
this
.orderId = orderId;
}
public
String getCardNo() {
return
cardNo;
}
public
void
setCardNo(String cardNo) {
this
.cardNo = cardNo;
}
public
List<String> getPayments() {
return
payments;
}
public
void
setPayments(List<String> payments) {
this
.payments = payments;
}
public
String toString(){
return
"Order [id="
+
this
.id +
",orderId="
+
this
.orderId +
",cardNo="
+
this
.cardNo
+
",payments="
+
this
.payments ==
null
?
null
:
this
.payments.size() +
"]"
;
}
}
-
生產者代碼
ProduerDemon.java123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105package
com.zj.astrotrain.demo;
import
java.text.DateFormat;
import
java.text.SimpleDateFormat;
import
java.util.ArrayList;
import
java.util.Date;
import
java.util.List;
import
com.alibaba.fastjson.JSON;
import
com.zj.astrotrain.client.ATMessage;
import
com.zj.astrotrain.client.ATProducer;
import
com.zj.astrotrain.client.exceptions.ATException;
import
com.zj.astrotrain.client.message.ObjectMessage;
import
com.zj.astrotrain.client.message.StringMessage;
import
com.zj.astrotrain.client.producer.DefaultATProducer;
/**
* 生成者示例
* <pre>需要在 ClassPath 路徑下准備 astrotrain-producer.properties 具體配置信息參考 src/main/resources下的配置</pre>
*
*
*/
public
class
ProducerDemon {
private
DefaultATProducer atProducer;
public
ProducerDemon() {
}
public
void
setUp() {
this
.atProducer =
new
DefaultATProducer();
try
{
this
.atProducer.start();
}
catch
(Exception e) {
e.printStackTrace();
}
}
/**
* 發送字符類型的消息
*/
public
void
doStringMessage() {
//獲取一個生成者通道,指定Topic
ATProducer producer =
this
.atProducer.createProducer(
"demo"
);
DateFormat format =
new
SimpleDateFormat(
"yyyyMMddHHmmss"
);
for
(
int
i =
0
; i <
100
; i++){
Order order =
new
Order();
order.setId(i);
order.setOrderId(format.format(
new
Date()) +
"_"
+ i);
order.setCardNo(
"6221202000111112222"
);
//新建一個StringMessage
StringMessage msg =
new
StringMessage(JSON.toJSONString(order));
//為消息設置一個業務標識符,最好是唯一的,方便在調試程序時進行跟蹤,可選屬性.
msg.setProperty(ATMessage.MSG_KEYS, order.getOrderId());
try
{
//進行消息發送,不拋異常的都是正常發送.除非服務端程序Crash,不然不會丟失消息
producer.send(msg);
}
catch
(ATException e) {
e.printStackTrace();
}
}
}
/**
* 發送對象類型的消息
*/
public
void
doObjectMessage() {
//使用新的主題創建生產者通道
ATProducer producer =
this
.atProducer.createProducer(
"demo"
);
DateFormat format =
new
SimpleDateFormat(
"yyyyMMddHHmmss"
);
for
(
int
i =
0
; i<
100
; i++) {
//新建一個ObjectMessage
ObjectMessage msg =
new
ObjectMessage();
Order order =
new
Order();
order.setId(i);
order.setOrderId(format.format(
new
Date()) +
"_"
+ i);
order.setCardNo(
"6221202000111112222"
+ i);
List<String> payments =
new
ArrayList<String>();
payments.add(
"payment"
+ i);
order.setPayments(payments);
//設置對象
msg.putObject(order);
//為消息設置一個業務標識符,最好是唯一的,方便在調試程序時進行跟蹤,可選屬性.
msg.setProperty(ATMessage.MSG_KEYS, order.getOrderId());
try
{
//進行消息發送,不拋異常的都是正常發送.除非服務端程序Crash,不然不會丟失消息
producer.send(msg);
}
catch
(ATException e) {
e.printStackTrace();
}
}
}
public
void
shutdown() {
if
(
this
.atProducer !=
null
){
this
.atProducer.shutdown();
}
}
public
static
void
main(String[] args) {
ProducerDemon demon =
new
ProducerDemon();
demon.setUp();
demon.doStringMessage();
demon.doObjectMessage();
demon.shutdown();
}
}
消費端(Consumer)
-
准備資源文件astrotrain-consumer.properties,消費者配置。
astrotrain-consumer.properties#消費者群組名稱,與生產者群組沒有關聯
astrotrain.group.name=PleaseRename
#消費者示例名稱
astrotrain.instance.name=ConsumerATbatch
#namesrv的地址,多個以分號 ; 分隔
astrotrain.namesrv.address=
10.10
.
110.51
:
9876
#消費模式,CLUSTERING and BROADCASTING,
default
is CLUSTERING
astrotrain.consumer.messageModel=CLUSTERING
#消費者啟動時從那個位置開始消費
astrotrain.consumer.consumeFromWhere=CONSUME_FROM_FIRST_OFFSET
#消費者線程最小數
astrotrain.consumer.consumeThreadMin=
10
#消費者線程最大數
astrotrain.consumer.consumeThreadMax=
20
#單次消費時一次性消費多少條消息,批量消費接口才有用,可選配置。
#astrotrain.consumer.batchMaxSize=
30
#消費者去broker拉取消息時,一次拉取多少條。可選配置。
#astrotrain.consumer.pullBatchSize=
100
#每次拉取消息的間隔,默認為
0
,可選配置/
#astrotrain.consumer.pullInterval=
1000
-
准備資源文件astrotrain.properties,應用配置
astrotrain.properties#應用標志符
astrotrain.appId=app2
-
消費者,單個消息消費
ConsumerDemon.java1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374package
com.zj.astrotrain.demo;
import
com.alibaba.fastjson.JSON;
import
com.alibaba.rocketmq.client.exception.MQClientException;
import
com.zj.astrotrain.client.ATMessage;
import
com.zj.astrotrain.client.MessageListener;
import
com.zj.astrotrain.client.consumer.DefaultATPushConsumer;
import
com.zj.astrotrain.client.message.ObjectMessage;
import
com.zj.astrotrain.client.message.StringMessage;
/**
* 單個消息消費
*
*
*/
public
class
ConsumerDemon {
private
DefaultATPushConsumer atPushConsumer;
public
ConsumerDemon() {
}
public
void
setUp() {
this
.atPushConsumer =
new
DefaultATPushConsumer();
try
{
//訂閱必須在start之前
this
.atPushConsumer.subscribe(
"demo"
,
new
DemonMessageListener());
}
catch
(MQClientException e) {
e.printStackTrace();
}
}
public
void
start() {
if
(
this
.atPushConsumer !=
null
) {
try
{
this
.atPushConsumer.start();
System.in.read();
//按任意鍵退出
}
catch
(Exception e) {
e.printStackTrace();
}
}
}
public
void
shutdown() {
if
(
this
.atPushConsumer !=
null
){
this
.atPushConsumer.shutdown();
}
}
public
static
void
main(String[] args) {
ConsumerDemon demon =
new
ConsumerDemon();
demon.setUp();
demon.start();
demon.shutdown();
}
//單個消息監聽接口
public
class
DemonMessageListener
implements
MessageListener {
@Override
public
void
onMessage(ATMessage message) {
try
{
if
(message
instanceof
StringMessage){
StringMessage msg = (StringMessage) message;
Order order = JSON.parseObject(msg.getMsg(), Order.
class
);
System.out.println(order.getCardNo());
}
else
if
(message
instanceof
ObjectMessage) {
ObjectMessage msg = (ObjectMessage) message;
Order order = (Order) msg.getObject();
System.out.println(order.getCardNo());
}
}
catch
(Exception e) {
e.printStackTrace();
}
}
}
}
-
消費者批量消費
ConsumerBatchDemon123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384package
com.zj.astrotrain.demo;
import
java.util.List;
import
com.alibaba.fastjson.JSON;
import
com.alibaba.rocketmq.client.exception.MQClientException;
import
com.zj.astrotrain.client.ATMessage;
import
com.zj.astrotrain.client.ConcurrentlyMessageListener;
import
com.zj.astrotrain.client.consumer.DefaultATPushConsumer;
import
com.zj.astrotrain.client.message.ObjectMessage;
import
com.zj.astrotrain.client.message.StringMessage;
/**
* 消費者批量消費示例
* <pre>需要在 ClassPath 路徑下准備 astrotrain-consumer.properties 具體配置信息參考 src/main/resources下的配置</pre>
*
*
*/
public
class
ConsumerBatchDemon {
private
DefaultATPushConsumer atPushConsumer;
public
ConsumerBatchDemon() {
}
public
void
setUp() {
this
.atPushConsumer =
new
DefaultATPushConsumer();
try
{
//訂閱必須在start之前
this
.atPushConsumer.subscribe(
"demo"
,
new
DemonConcurrentlyMessageListener());
}
catch
(MQClientException e) {
e.printStackTrace();
}
}
public
void
start() {
if
(
this
.atPushConsumer !=
null
) {
try
{
this
.atPushConsumer.start();
System.in.read();
//按任意鍵退出
}
catch
(Exception e) {
e.printStackTrace();
}
}
}
public
void
shutdown() {
if
(
this
.atPushConsumer !=
null
){
this
.atPushConsumer.shutdown();
}
}
public
static
void
main(String[] args) {
ConsumerBatchDemon demon =
new
ConsumerBatchDemon();
demon.setUp();
demon.start();
demon.shutdown();
}
/**
* 消息監聽
*
*
*/
public
class
DemonConcurrentlyMessageListener
implements
ConcurrentlyMessageListener{
@Override
public
void
onMessage(List<ATMessage> msgs) {
for
(ATMessage message : msgs) {
try
{
if
(message
instanceof
StringMessage){
StringMessage msg = (StringMessage) message;
Order order = JSON.parseObject(msg.getMsg(), Order.
class
);
System.out.println(order.getCardNo());
}
else
if
(message
instanceof
ObjectMessage) {
ObjectMessage msg = (ObjectMessage) message;
Order order = (Order) msg.getObject();
System.out.println(order.getCardNo());
}
}
catch
(Exception e) {
e.printStackTrace();
}
}
}
}
}
注意事項
- 訂閱時不要加上tag,以*注冊所有tag,然后再做篩選,訂閱時附上tag會導致先注冊的相同topic不能正常消費
- 在開發環境里Topic和SubGroup都是自動創建的,生產上是需要手動創建的,所以上線之前一定要檢查對應的主題和訂閱組是否已創建