摘自:碼友18年(www.mayou18.com)
what is rocketMQ?
RocketMQ作為一款分布式的消息中間件(阿里的說法是不遵循任何規范的,所以不能完全用JMS的那一套東西來看它),經歷了Metaq1.x、Metaq2.x的發展和淘寶雙十一的洗禮,在功能和性能上遠超ActiveMQ。RocketMQ 是一款分布式、隊列模型的消息中間件,具有以下特點:
專業&驗證
1、MQ是消息領域業內專業的消息中間件,多次在國內外獲獎
2、經過雙11阿里交易、商品、營銷等核心鏈路真實場景驗證
3、阿里集團內部1000+核心應用使用,每天流轉幾千億條消息,穩定可靠
技術體系
1、產品歷史超過7年,消息保證不丟,技術體系豐富成熟
2、阿里內部產品名:MetaQ、Notify
3、開源社區產品名:RocketMQ,無技術綁定風險
4、應用靈活,無任何強制綁定其他產品
獨立部署
1、支持專有雲獨立輸出,支持物理機和虛擬機,最小部署僅幾台機器
2、專有雲配套運維系統,方便運維人員實時監控系統狀態
3、專有雲配套mqadmin命令集和管理類open API,方便集成及統一運維
4、支持混合雲架構,包括VPC用戶
高可靠
1、一份消息多份落盤存儲,經過嚴格斷電測試,消息依然保證不丟失
2、支持消息軌跡,消息從生產到消費軌跡,可清晰排查
3、海量消息堆積,單個Topic可堆積100億+條消息,防止系統高流量崩潰
4、默認情況下消息落盤保留3天
高性能
1、同一網絡內,消息傳輸RT在10毫秒之內,性能測試下,網卡可被打滿
2、公有雲默認單Topic 發送消息為每秒5000條,最高可申請擴展至10W以上支持大量消息並發發送,超過5萬個隊列,性能依然卓越
3、支持消息海量堆積,單Topic可堆積100+億條消息
多協議
1、HTTP接入:支持跨網絡調用,HTTP公網接入支持跨網絡調用,HTTP Restful接口公網接入
2、MQTT接入:支持主動推送模,多級Topic模型支持一次觸達1000萬+終端可廣泛應用於物聯網和社交即時通信場景,可跟JStorm等實時計算完美結合
3、TCP接入(專業) :區別於HTTP簡單的接入方式,同時提供更為專業、可靠、穩定的TCP協議的SDK接入
RocketMQ 代碼實現
pom.xml
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.2.6</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
</dependency>
Producer.java
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
public class Producer {
public static void main(String[] args) throws MQClientException,
InterruptedException {
/**
* 一個應用創建一個Producer,由應用來維護此對象,可以設置為全局對象或者單例<br>
* 注意:ProducerGroupName需要由應用來保證唯一<br>
* ProducerGroup這個概念發送普通的消息時,作用不大,但是發送分布式事務消息時,比較關鍵,
* 因為服務器會回查這個Group下的任意一個Producer
*/
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("192.168.0.123:9876");
/**
*默認情況下,一台服務器只能啟動一個Producer或Consumer實例,所以如果需要在一台服務器啟
*動多個實例,需要設置實例的名稱
*/
producer.setInstanceName("Producer");
producer.setSendMsgTimeout(3000);//發送消息超時
producer.setRetryTimesWhenSendFailed(2);//發送失敗后,重試幾次
/**
* Producer對象在使用之前必須要調用start初始化,初始化一次即可<br>
* 注意:切記不可以在每次發送消息時,都調用start方法
*/
producer.start();
/**
* 下面這段代碼表明一個Producer對象可以發送多個topic,多個tag的消息。
* 注意:send方法是同步調用,只要不拋異常就標識成功。但是發送成功也可會有多種狀態,<br>
* 例如消息寫入Master成功,但是Slave不成功,這種情況消息屬於成功,但是對於個別應用如果對消息可靠性要求極高,<br>
* 需要對這種情況做處理。另外,消息可能會存在發送失敗的情況,失敗重試由應用來處理。
*/
for (int i = 0; i < 100; i++) {
try {
{
Message msg = new Message("TopicTest1",// topic
"TagA",// tag
"OrderID001",// key
("Hello MetaQ").getBytes());// body
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
{
Message msg = new Message("TopicTest2",// topic
"TagB",// tag
"OrderID0034",// key
("Hello MetaQ").getBytes());// body
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
{
Message msg = new Message("TopicTest3",// topic
"TagC",// tag
"OrderID061",// key
("Hello MetaQ").getBytes());// body
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
TimeUnit.MILLISECONDS.sleep(1000);
}
/**
* 應用退出時,要調用shutdown來清理資源,關閉網絡連接,從服務器上注銷自己
* 注意:我們建議應用在JBOSS、Tomcat等容器的退出鈎子里調用shutdown方法
*/
producer.shutdown();
}
}
PushConsumer.java
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
public class PushConsumer {
/**
* 內部是使用長輪詢Pull方式從MetaQ服務器拉消息,然后再回調用戶Listener方法<br>
*/
public static void main(String[] args) throws InterruptedException,
MQClientException {
/**
* 一個應用創建一個Consumer,由應用來維護此對象,可以設置為全局對象或者單例<br>
* 注意:ConsumerGroupName需要由應用來保證唯一。
*不同consumer group里的consumer即便是消費同一個topic下的同一個queue,
*那消費進度也是分開存儲的。也就是說,不同的consumer group內的consumer的消費
*完全隔離,彼此不受影響。
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
"ConsumerGroupName");
consumer.setNamesrvAddr("192.168.0.123:9876");
consumer.setInstanceName("Consumber");
//廣播消費是指一個consumer只要訂閱了某個topic的消息,那它就會收到該topic下的所有queue里的消息,
//而不管這個consumer的group是什么。所以對於廣播消費來說,consumer group沒什么實際意義。consumer可以在實例化時,我們可以指定是集群消費還是廣播消費。
//consumer.setMessageModel(MessageModel.BROADCASTING);
/**
* 訂閱指定topic下tags分別等於TagA或TagC或TagD
*/
consumer.subscribe("TopicTest1", "TagA || TagC || TagD");
/**
* 訂閱指定topic下所有消息<br>
* 注意:一個consumer對象可以訂閱多個topic
*/
consumer.subscribe("TopicTest2", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
/**
* 默認msgs里只有一條消息,可以通過設置consumeMessageBatchMaxSize參數來批量接收消息
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName()
+ " Receive New Messages: " + msgs.size());
MessageExt msg = msgs.get(0);
if (msg.getTopic().equals("TopicTest1")) {
// 執行TopicTest1的消費邏輯
if (msg.getTags() != null && msg.getTags().equals("TagA")) {
// 執行TagA的消費
System.out.println(new String(msg.getBody()));
} else if (msg.getTags() != null
&& msg.getTags().equals("TagC")) {
// 執行TagC的消費
} else if (msg.getTags() != null
&& msg.getTags().equals("TagD")) {
// 執行TagD的消費
}
} else if (msg.getTopic().equals("TopicTest2")) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/**
* Consumer對象在使用之前必須要調用start初始化,初始化一次即可<br>
*/
consumer.start();
System.out.println("Consumer Started.");
}
}
