摘自:碼友18年(www.mayou18.com)
what is rocketMQ?
RocketMQ作為一款分布式的消息中間件(阿里的說法是不遵循任何規范的,所以不能完全用JMS的那一套東西來看它),經歷了Metaq1.x、Metaq2.x的發展和淘寶雙十一的洗禮,在功能和性能上遠超ActiveMQ。RocketMQ 是一款分布式、隊列模型的消息中間件,具有以下特點:
專業&驗證
1、MQ是消息領域業內專業的消息中間件,多次在國內外獲獎
2、經過雙11阿里交易、商品、營銷等核心鏈路真實場景驗證
3、阿里集團內部1000+核心應用使用,每天流轉幾千億條消息,穩定可靠
技術體系
1、產品歷史超過7年,消息保證不丟,技術體系豐富成熟
2、阿里內部產品名:MetaQ、Notify
3、開源社區產品名:RocketMQ,無技術綁定風險
4、應用靈活,無任何強制綁定其他產品
獨立部署
1、支持專有雲獨立輸出,支持物理機和虛擬機,最小部署僅幾台機器
2、專有雲配套運維系統,方便運維人員實時監控系統狀態
3、專有雲配套mqadmin命令集和管理類open API,方便集成及統一運維
4、支持混合雲架構,包括VPC用戶
高可靠
1、一份消息多份落盤存儲,經過嚴格斷電測試,消息依然保證不丟失
2、支持消息軌跡,消息從生產到消費軌跡,可清晰排查
3、海量消息堆積,單個Topic可堆積100億+條消息,防止系統高流量崩潰
4、默認情況下消息落盤保留3天
高性能
1、同一網絡內,消息傳輸RT在10毫秒之內,性能測試下,網卡可被打滿
2、公有雲默認單Topic 發送消息為每秒5000條,最高可申請擴展至10W以上支持大量消息並發發送,超過5萬個隊列,性能依然卓越
3、支持消息海量堆積,單Topic可堆積100+億條消息
多協議
1、HTTP接入:支持跨網絡調用,HTTP公網接入支持跨網絡調用,HTTP Restful接口公網接入
2、MQTT接入:支持主動推送模,多級Topic模型支持一次觸達1000萬+終端可廣泛應用於物聯網和社交即時通信場景,可跟JStorm等實時計算完美結合
3、TCP接入(專業) :區別於HTTP簡單的接入方式,同時提供更為專業、可靠、穩定的TCP協議的SDK接入
RocketMQ 代碼實現
pom.xml
<dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.2.6</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> </dependency>
Producer.java
import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { /** * 一個應用創建一個Producer,由應用來維護此對象,可以設置為全局對象或者單例<br> * 注意:ProducerGroupName需要由應用來保證唯一<br> * ProducerGroup這個概念發送普通的消息時,作用不大,但是發送分布式事務消息時,比較關鍵, * 因為服務器會回查這個Group下的任意一個Producer */ DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("192.168.0.123:9876"); /** *默認情況下,一台服務器只能啟動一個Producer或Consumer實例,所以如果需要在一台服務器啟 *動多個實例,需要設置實例的名稱 */ producer.setInstanceName("Producer"); producer.setSendMsgTimeout(3000);//發送消息超時 producer.setRetryTimesWhenSendFailed(2);//發送失敗后,重試幾次 /** * Producer對象在使用之前必須要調用start初始化,初始化一次即可<br> * 注意:切記不可以在每次發送消息時,都調用start方法 */ producer.start(); /** * 下面這段代碼表明一個Producer對象可以發送多個topic,多個tag的消息。 * 注意:send方法是同步調用,只要不拋異常就標識成功。但是發送成功也可會有多種狀態,<br> * 例如消息寫入Master成功,但是Slave不成功,這種情況消息屬於成功,但是對於個別應用如果對消息可靠性要求極高,<br> * 需要對這種情況做處理。另外,消息可能會存在發送失敗的情況,失敗重試由應用來處理。 */ for (int i = 0; i < 100; i++) { try { { Message msg = new Message("TopicTest1",// topic "TagA",// tag "OrderID001",// key ("Hello MetaQ").getBytes());// body SendResult sendResult = producer.send(msg); System.out.println(sendResult); } { Message msg = new Message("TopicTest2",// topic "TagB",// tag "OrderID0034",// key ("Hello MetaQ").getBytes());// body SendResult sendResult = producer.send(msg); System.out.println(sendResult); } { Message msg = new Message("TopicTest3",// topic "TagC",// tag "OrderID061",// key ("Hello MetaQ").getBytes());// body SendResult sendResult = producer.send(msg); System.out.println(sendResult); } } catch (Exception e) { e.printStackTrace(); } TimeUnit.MILLISECONDS.sleep(1000); } /** * 應用退出時,要調用shutdown來清理資源,關閉網絡連接,從服務器上注銷自己 * 注意:我們建議應用在JBOSS、Tomcat等容器的退出鈎子里調用shutdown方法 */ producer.shutdown(); } }
PushConsumer.java
import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt; public class PushConsumer { /** * 內部是使用長輪詢Pull方式從MetaQ服務器拉消息,然后再回調用戶Listener方法<br> */ public static void main(String[] args) throws InterruptedException, MQClientException { /** * 一個應用創建一個Consumer,由應用來維護此對象,可以設置為全局對象或者單例<br> * 注意:ConsumerGroupName需要由應用來保證唯一。 *不同consumer group里的consumer即便是消費同一個topic下的同一個queue, *那消費進度也是分開存儲的。也就是說,不同的consumer group內的consumer的消費 *完全隔離,彼此不受影響。 */ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( "ConsumerGroupName"); consumer.setNamesrvAddr("192.168.0.123:9876"); consumer.setInstanceName("Consumber"); //廣播消費是指一個consumer只要訂閱了某個topic的消息,那它就會收到該topic下的所有queue里的消息, //而不管這個consumer的group是什么。所以對於廣播消費來說,consumer group沒什么實際意義。consumer可以在實例化時,我們可以指定是集群消費還是廣播消費。 //consumer.setMessageModel(MessageModel.BROADCASTING); /** * 訂閱指定topic下tags分別等於TagA或TagC或TagD */ consumer.subscribe("TopicTest1", "TagA || TagC || TagD"); /** * 訂閱指定topic下所有消息<br> * 注意:一個consumer對象可以訂閱多個topic */ consumer.subscribe("TopicTest2", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { /** * 默認msgs里只有一條消息,可以通過設置consumeMessageBatchMaxSize參數來批量接收消息 */ @Override public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size()); MessageExt msg = msgs.get(0); if (msg.getTopic().equals("TopicTest1")) { // 執行TopicTest1的消費邏輯 if (msg.getTags() != null && msg.getTags().equals("TagA")) { // 執行TagA的消費 System.out.println(new String(msg.getBody())); } else if (msg.getTags() != null && msg.getTags().equals("TagC")) { // 執行TagC的消費 } else if (msg.getTags() != null && msg.getTags().equals("TagD")) { // 執行TagD的消費 } } else if (msg.getTopic().equals("TopicTest2")) { System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /** * Consumer對象在使用之前必須要調用start初始化,初始化一次即可<br> */ consumer.start(); System.out.println("Consumer Started."); } }