Alibaba(阿里) RocketMQ入門實例


摘自:碼友18年(www.mayou18.com)

what is rocketMQ?

RocketMQ作為一款分布式的消息中間件(阿里的說法是不遵循任何規范的,所以不能完全用JMS的那一套東西來看它),經歷了Metaq1.x、Metaq2.x的發展和淘寶雙十一的洗禮,在功能和性能上遠超ActiveMQ。RocketMQ 是一款分布式、隊列模型的消息中間件,具有以下特點:

專業&驗證

1、MQ是消息領域業內專業的消息中間件,多次在國內外獲獎

2、經過雙11阿里交易、商品、營銷等核心鏈路真實場景驗證

3、阿里集團內部1000+核心應用使用,每天流轉幾千億條消息,穩定可靠

技術體系

1、產品歷史超過7年,消息保證不丟,技術體系豐富成熟

2、阿里內部產品名:MetaQ、Notify

3、開源社區產品名:RocketMQ,無技術綁定風險

4、應用靈活,無任何強制綁定其他產品

獨立部署

1、支持專有雲獨立輸出,支持物理機和虛擬機,最小部署僅幾台機器

2、專有雲配套運維系統,方便運維人員實時監控系統狀態

3、專有雲配套mqadmin命令集和管理類open API,方便集成及統一運維

4、支持混合雲架構,包括VPC用戶

高可靠

1、一份消息多份落盤存儲,經過嚴格斷電測試,消息依然保證不丟失

2、支持消息軌跡,消息從生產到消費軌跡,可清晰排查

3、海量消息堆積,單個Topic可堆積100億+條消息,防止系統高流量崩潰

4、默認情況下消息落盤保留3天

高性能

1、同一網絡內,消息傳輸RT在10毫秒之內,性能測試下,網卡可被打滿

2、公有雲默認單Topic 發送消息為每秒5000條,最高可申請擴展至10W以上支持大量消息並發發送,超過5萬個隊列,性能依然卓越

3、支持消息海量堆積,單Topic可堆積100+億條消息

多協議

1、HTTP接入:支持跨網絡調用,HTTP公網接入支持跨網絡調用,HTTP Restful接口公網接入

2、MQTT接入:支持主動推送模,多級Topic模型支持一次觸達1000萬+終端可廣泛應用於物聯網和社交即時通信場景,可跟JStorm等實時計算完美結合

3、TCP接入(專業) :區別於HTTP簡單的接入方式,同時提供更為專業、可靠、穩定的TCP協議的SDK接入

RocketMQ 代碼實現

pom.xml

<dependency>
  <groupId>com.alibaba.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>3.2.6</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.7</version>
</dependency>

Producer.java

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
public class Producer {
  public static void main(String[] args) throws MQClientException,
      InterruptedException {
    /**
     * 一個應用創建一個Producer,由應用來維護此對象,可以設置為全局對象或者單例<br>
     * 注意:ProducerGroupName需要由應用來保證唯一<br>
     * ProducerGroup這個概念發送普通的消息時,作用不大,但是發送分布式事務消息時,比較關鍵,
     * 因為服務器會回查這個Group下的任意一個Producer
     */
    DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
    producer.setNamesrvAddr("192.168.0.123:9876");
    /**
     *默認情況下,一台服務器只能啟動一個Producer或Consumer實例,所以如果需要在一台服務器啟
     *動多個實例,需要設置實例的名稱
     */
    producer.setInstanceName("Producer");
    producer.setSendMsgTimeout(3000);//發送消息超時
    producer.setRetryTimesWhenSendFailed(2);//發送失敗后,重試幾次
    /**
     * Producer對象在使用之前必須要調用start初始化,初始化一次即可<br>
     * 注意:切記不可以在每次發送消息時,都調用start方法
     */
    producer.start();
    /**
     * 下面這段代碼表明一個Producer對象可以發送多個topic,多個tag的消息。
     * 注意:send方法是同步調用,只要不拋異常就標識成功。但是發送成功也可會有多種狀態,<br>
     * 例如消息寫入Master成功,但是Slave不成功,這種情況消息屬於成功,但是對於個別應用如果對消息可靠性要求極高,<br>
     * 需要對這種情況做處理。另外,消息可能會存在發送失敗的情況,失敗重試由應用來處理。
     */
    for (int i = 0; i < 100; i++) {
      try {
        {
          Message msg = new Message("TopicTest1",// topic
              "TagA",// tag
              "OrderID001",// key
              ("Hello MetaQ").getBytes());// body
          SendResult sendResult = producer.send(msg);
          System.out.println(sendResult);
        }
        {
          Message msg = new Message("TopicTest2",// topic
              "TagB",// tag
              "OrderID0034",// key
              ("Hello MetaQ").getBytes());// body
          SendResult sendResult = producer.send(msg);
          System.out.println(sendResult);
        }
        {
          Message msg = new Message("TopicTest3",// topic
              "TagC",// tag
              "OrderID061",// key
              ("Hello MetaQ").getBytes());// body
          SendResult sendResult = producer.send(msg);
          System.out.println(sendResult);
        }
      } catch (Exception e) {
        e.printStackTrace();
      }
      TimeUnit.MILLISECONDS.sleep(1000);
    }
    /**
     * 應用退出時,要調用shutdown來清理資源,關閉網絡連接,從服務器上注銷自己
     * 注意:我們建議應用在JBOSS、Tomcat等容器的退出鈎子里調用shutdown方法
     */
    producer.shutdown();
  }
}

PushConsumer.java

import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
public class PushConsumer {
  /**
   * 內部是使用長輪詢Pull方式從MetaQ服務器拉消息,然后再回調用戶Listener方法<br>
   */
  public static void main(String[] args) throws InterruptedException,
      MQClientException {
    /**
     * 一個應用創建一個Consumer,由應用來維護此對象,可以設置為全局對象或者單例<br>
     * 注意:ConsumerGroupName需要由應用來保證唯一。
     *不同consumer group里的consumer即便是消費同一個topic下的同一個queue,
     *那消費進度也是分開存儲的。也就是說,不同的consumer group內的consumer的消費
     *完全隔離,彼此不受影響。
     */
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
        "ConsumerGroupName");
    consumer.setNamesrvAddr("192.168.0.123:9876");
    consumer.setInstanceName("Consumber");
    //廣播消費是指一個consumer只要訂閱了某個topic的消息,那它就會收到該topic下的所有queue里的消息,
    //而不管這個consumer的group是什么。所以對於廣播消費來說,consumer group沒什么實際意義。consumer可以在實例化時,我們可以指定是集群消費還是廣播消費。
    //consumer.setMessageModel(MessageModel.BROADCASTING);
    /**
     * 訂閱指定topic下tags分別等於TagA或TagC或TagD
     */
    consumer.subscribe("TopicTest1", "TagA || TagC || TagD");
    /**
     * 訂閱指定topic下所有消息<br>
     * 注意:一個consumer對象可以訂閱多個topic
     */
    consumer.subscribe("TopicTest2", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
      /**
       * 默認msgs里只有一條消息,可以通過設置consumeMessageBatchMaxSize參數來批量接收消息
       */
      @Override
      public ConsumeConcurrentlyStatus consumeMessage(
          List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        System.out.println(Thread.currentThread().getName()
            + " Receive New Messages: " + msgs.size());
        MessageExt msg = msgs.get(0);
        if (msg.getTopic().equals("TopicTest1")) {
          // 執行TopicTest1的消費邏輯
          if (msg.getTags() != null && msg.getTags().equals("TagA")) {
            // 執行TagA的消費
            System.out.println(new String(msg.getBody()));
          } else if (msg.getTags() != null
              && msg.getTags().equals("TagC")) {
            // 執行TagC的消費
          } else if (msg.getTags() != null
              && msg.getTags().equals("TagD")) {
            // 執行TagD的消費
          }
        } else if (msg.getTopic().equals("TopicTest2")) {
          System.out.println(new String(msg.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      }
    });
    /**
     * Consumer對象在使用之前必須要調用start初始化,初始化一次即可<br>
     */
    consumer.start();
    System.out.println("Consumer Started.");
  }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM