java原生整合rocketmq


一,引入依賴

1
2
3
4
5
6
7
8
9
10
11
<dependency>
     <groupId>org.apache.rocketmq</groupId>
     <artifactId>rocketmq-client</artifactId>
     <version> 4.6 . 0 </version>
     <exclusions>
         <exclusion>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
         </exclusion>
     </exclusions>
</dependency>

 

 

二, 使用demo:

1,simple(簡單消息生產)

/**
  * 以單向模式發送消息
  * 這種方式主要用在不特別關心發送結果的場景,例如日志發送
  */
public static void OnewayProducer() throws Exception {
     // 實例化消息生產者Producer
     DefaultMQProducer producer = new DefaultMQProducer( "simple_producerGroup_test2" );
     // 設置NameServer的地址
     producer.setNamesrvAddr( "10.131.236.179:19300;10.131.236.180:19301" );
     // 啟動Producer實例
     producer.start();
     for ( int i = 0 ; i < 10 ; i++) {
         // 創建消息,並指定Topic,Tag和消息體
         Message msg = new Message( "simple_topic_test2" /* Topic */ ,
                 "TagA" /* Tag */ ,
                 ( "Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
         );
         // 發送單向消息,沒有任何返回結果
         producer.sendOneway(msg);
 
     }
     // 如果不再發送消息,關閉Producer實例。
     producer.shutdown();
}
 
/**
  * 同步發送消息
  * 在重要的通知消息,SMS通知,SMS營銷系統等廣泛的場景中使用可靠的同步傳輸。
  */
public static void SyncProducer() throws Exception {
     // 實例化消息生產者Producer
     DefaultMQProducer producer = new DefaultMQProducer( "simple_producerGroup_test2" );
     // 設置NameServer的地址
     producer.setNamesrvAddr( "10.131.236.179:19300;10.131.236.180:19301" );
     // 啟動Producer實例
     producer.start();
     for ( int i = 0 ; i < 100 ; i++) {
         // 創建消息,並指定Topic,Tag和消息體
         Message msg = new Message( "simple_topic_test2" /* Topic */ ,
                 "TagA" /* Tag */ ,
                 ( "Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
         );
         // 發送消息到一個Broker
         SendResult sendResult = producer.send(msg);
         // 通過sendResult返回消息是否成功送達
         System.out.printf( "%s%n" , sendResult);
     }
     // 如果不再發送消息,關閉Producer實例。
     producer.shutdown();
}
 
/**
  * 異步發送消息
  * 異步傳輸通常用於對時間敏感的業務場景中。
  */
public static void AsyncProducer () throws Exception {
     // 實例化消息生產者Producer
     DefaultMQProducer producer = new DefaultMQProducer( "simple_producerGroup_test3" );
     // 設置NameServer的地址
     producer.setNamesrvAddr( "10.131.236.179:19300;10.131.236.180:19301" );
     // 啟動Producer實例
     producer.start();
     producer.setRetryTimesWhenSendAsyncFailed( 0 );
 
     int messageCount = 100 ;
     // 根據消息數量實例化倒計時計算器
     final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
     for ( int i = 0 ; i < messageCount; i++) {
         final int index = i;
         // 創建消息,並指定Topic,Tag和消息體
         Message msg = new Message( "simple_topic_test3" ,
                 "TagA" ,
                 "OrderID188" ,
                 "Hello world" .getBytes(RemotingHelper.DEFAULT_CHARSET));
         // SendCallback接收異步返回結果的回調
         producer.send(msg, new SendCallback() {
             @Override
             public void onSuccess(SendResult sendResult) {
                 System.out.printf( "%-10d OK %s %n" , index,
                         sendResult.getMsgId());
             }
             @Override
             public void onException(Throwable e) {
                 System.out.printf( "%-10d Exception %s %n" , index, e);
                 e.printStackTrace();
             }
         });
     }
     // 等待5s
     countDownLatch.await( 5 , TimeUnit.SECONDS);
     // 如果不再發送消息,關閉Producer實例。
     producer.shutdown();
}
 
 

2,filter(過濾消息)

 

過濾消息有兩種方式:Tag 和 SQL 表達式

SQL 基本語法:

RocketMQ 只定義了一些基本語法來支持這個特性,支持擴展。

  • 數值比較,比如 >、>=、<、<=、BETWEEN、=;
  • 字符比較,比如 =、<>、IN;
  • IS NULL 或者 IS NOT NULL;
  • 邏輯符號 AND、OR、NOT。

常量支持的額類型為:

  • 數值,比如 123、3.1415;
  • 字符,比如 ‘abc’,必須用單引號包裹起來;
  • NULL,特殊的常量;
  • 布爾值,TRUE 或 FALSE。

只有使用 push 模式的消費者才能使用 SQL92 標准的 SQL 語句

注:消費者sql的使用需要rocketmq開啟broker配置項的enablePropertyFilter=true,

默認沒開啟,如需開啟使用前提前找對應開通人處理

生產者代碼:

         // 實例化消息生產者Producer
         DefaultMQProducer producer = new DefaultMQProducer( "filter_producerGroup_test1" );
         // 設置NameServer的地址
         producer.setNamesrvAddr( "10.131.236.179:19300;10.131.236.180:19301" );
         // 啟動Producer實例
         producer.start();
         for ( int i= 0 ;i< 10 ;i++) {
             Message msg = new Message( "filter_topic_test1" ,
                     "tagA" ,
                     ( "Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
             );
             // 設置一些屬性
             msg.putUserProperty( "a" , String.valueOf(i));
             SendResult sendResult = producer.send(msg);
//            System.out.println(sendResult.getMessageQueue());
             System.out.println(String.format( "SendResult status:%s,brokerName:%s, queueId:%d, queueOffset:%s" ,
                     sendResult.getSendStatus(),
                     sendResult.getMessageQueue().getBrokerName(),sendResult.getMessageQueue().getQueueId(),
                     sendResult.getQueueOffset()));
         }
 
         // 如果不再發送消息,關閉Producer實例。
         producer.shutdown();

消費者代碼:

// 實例化消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( "filter_consumerGroup_test1" );
// 設置NameServer的地址
consumer.setNamesrvAddr( "10.131.236.179:19300;10.131.236.180:19301" );
// 訂閱一個或者多個Topic,以及Tag來過濾需要消費的消息
// 需要配置broker的enablePropertyFilter=true
consumer.subscribe( "filter_topic_test1" , MessageSelector.bySql( "a between 0 and 3" ));
// 注冊回調實現類來處理從broker拉取回來的消息
consumer.registerMessageListener( new MessageListenerConcurrently() {
     @Override
     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
         System.out.printf( "%s Receive New Messages: %s %n" , Thread.currentThread().getName(), msgs);
         // 標記該消息已經被成功消費
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
     }
});
// 啟動消費者實例
consumer.start();
System.out.printf( "Consumer Started.%n" );

 

3,batch(批量發送消息)

批量發送消息能顯著提高傳遞小消息的性能,限制是這些批量消息應該有相同的 Topic,相同的 waitStoreMsgOK,而且不能是延時消息,此外,這一批消息的總大小不應超過 4MB。

 

// 實例化消息生產者Producer
DefaultMQProducer producer = new DefaultMQProducer( "batch_producerGroup_test2" );
// 設置NameServer的地址
producer.setNamesrvAddr( "10.131.236.179:19300;10.131.236.180:19301" );
// 啟動Producer實例
producer.start();
String topic = "batch_topic_test2" ;
//批量處理1--每次批處理不超過4M
List<Message> messages = new ArrayList<>();
messages.add( new Message(topic, "TagA" , "OrderID001" , "Hello world 0" .getBytes()));
messages.add( new Message(topic, "TagA" , "OrderID002" , "Hello world 1" .getBytes()));
messages.add( new Message(topic, "TagA" , "OrderID003" , "Hello world 2" .getBytes()));
//批量處理2
//把大的消息分裂成若干個小的消息
List<Message> messages = new ArrayList<>();
for ( int i= 0 ;i< 1000 ;i++){
     //如果每條消息是1kb字節數據 ,限制4M相當於400條數據處理一次,1000條需要處理三次
     String msg= "--10kb字節的數據--" ;
     messages.add( new Message(topic, "TagA" , msg, msg.getBytes()));
}
ListSplitter splitter = new ListSplitter(messages);
int j= 0 ;
while (splitter.hasNext()) {
     try {
         List<Message>  listItem = splitter.next();
         producer.send(listItem);
         System.out.println( "---------------------------" + "批處理第" +j+ "次" + "---------------------------" );
         j++;
     } catch (Exception e) {
         e.printStackTrace();
         //處理error
     }
}
try {
     producer.send(messages);
} catch (Exception e) {
     e.printStackTrace();
     //處理error
}
producer.shutdown();

消息分割工具類:

public class ListSplitter implements Iterator<List<Message>> {
     private final int SIZE_LIMIT = 4 * 1024 * 1024 //4M
     private final List<Message> messages;
     private int currIndex;
 
     public ListSplitter(List<Message> messages) {
         this .messages = messages;
     }
 
     @Override
     public boolean hasNext() {
         return currIndex < messages.size();
     }
 
     @Override
     public List<Message> next() {
         int startIndex = getStartIndex();
         int nextIndex = startIndex;
         int totalSize = 0 ;
         for (; nextIndex < messages.size(); nextIndex++) {
             Message message = messages.get(nextIndex);
             int tmpSize = calcMessageSize(message);
             if (tmpSize + totalSize > SIZE_LIMIT) {
                 break ;
             } else {
                 totalSize += tmpSize;
             }
         }
         List<Message> subList = messages.subList(startIndex, nextIndex);
         currIndex = nextIndex;
         return subList;
     }
 
     @Override
     public void remove() {
 
     }
 
     private int getStartIndex() {
         Message currMessage = messages.get(currIndex);
         int tmpSize = calcMessageSize(currMessage);
         if (tmpSize > SIZE_LIMIT){
             //發送的數據過大,SIZE_LIMIT需要設置合理,不能小於發送的消息里的單條數據
             return currIndex;
         }
         while (tmpSize > SIZE_LIMIT) {
             currIndex += 1 ;
             Message message = messages.get(currIndex);
             tmpSize = calcMessageSize(message);
         }
         return currIndex;
     }
 
     private int calcMessageSize(Message message) {
         int tmpSize = message.getTopic().length() + message.getBody().length;
         Map<String, String> properties = message.getProperties();
         for (Map.Entry<String, String> entry : properties.entrySet()) {
             tmpSize += entry.getKey().length() + entry.getValue().length();
         }
         tmpSize = tmpSize + 20 ; // 增加⽇日志的開銷20字節
         return tmpSize;
     }
}

 

4,order(有序消息)

 

訂單號相同的消息會被先后發送到同一個broker的同一個隊列中,保證順序性

生產代碼:

// 訂單流程
String[] msgs = new String[] { "創建消息" , "付款消息" , "推送消息" , "完成消息" };
for ( int i = 0 ; i < msgs.length; i++) {
     // 創建消息, 指定Topic、Tag、key和消息體
     Message msg = new Message( "OrderTopic" , "Order" , "i" + i, msgs[i].getBytes( "UTF-8" ));
     // 發送消息到一個Broker, 參數二: 消息隊列的選擇器, 參數三: 選擇的業務標識
     SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
         @Override
         public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
             long orderId = ( long ) arg;
             long index = orderId % mqs.size(); // 取模
             return mqs.get(( int ) index);
         }
     }, orderId); // 動態傳入訂單id
}

消費代碼:

// 訂閱Topic、Tag
  consumer.subscribe( "OrderTopic" , "*" );
// 設置回調函數, 處理消息
  consumer.registerMessageListener( new MessageListenerOrderly() {
      @Override
      public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
          for (MessageExt msg : msgs) {
              // 消息
              String message = new String(msg.getBody());
          }
          return ConsumeOrderlyStatus.SUCCESS;
      }
  });

 

5,scheduled(延時消息)

應用場景:

比如電商場景,提交了一個訂單就可以發送一個延時消息,1h 后去檢查這個訂單的狀態,如果還是未付款就取消訂單釋放庫存。

默認時延配置:

 

messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";對應1~18,18個等級,1是1s,2是5s...,可通過配置修改時延的時長,可在申請開通時找對應開通人開通

生產者代碼:

// 實例化一個生產者來產生延時消息
DefaultMQProducer producer = new DefaultMQProducer( "scheduled_producerGroup_test1" );
producer.setNamesrvAddr( "10.131.236.179:19300;10.131.236.180:19301" );
// 啟動生產者
producer.start();
int totalMessagesToSend = 100 ;
for ( int i = 0 ; i < totalMessagesToSend; i++) {
     Message message = new Message( "scheduled_topic_test1" , ( "Hello scheduled message " + i).getBytes());
     // 設置延時等級3,這個消息將在10s之后發送(現在只支持固定的幾個時間,詳看delayTimeLevel)
     message.setDelayTimeLevel( 3 );
     // 發送消息
     producer.send(message);
}
// 關閉生產者
producer.shutdown();

 

6,transaction(事務消息)

  1. 事務消息不支持延時消息和批量消息。
  2. 為了避免單個消息被檢查太多次而導致半隊列消息累積,我們默認將單個消息的檢查次數限制為 15 次,但是用戶可以通過 Broker 配置文件的 transactionCheckMax參數來修改此限制。如果已經檢查某條消息超過 N 次的話( N = transactionCheckMax ) 則 Broker 將丟棄此消息,並在默認情況下同時打印錯誤日志。用戶可以通過重寫 AbstractTransactionalMessageCheckListener 類來修改這個行為。
  3. 事務消息將在 Broker 配置文件中的參數 transactionTimeout 這樣的特定時間長度之后被檢查。當發送事務消息時,用戶還可以通過設置用戶屬性 CHECK_IMMUNITY_TIME_IN_SECONDS 來改變這個限制,該參數優先於 transactionTimeout 參數。
  4. 事務性消息可能不止一次被檢查或消費。
  5. 提交給用戶的目標主題消息可能會失敗,目前這依日志的記錄而定。它的高可用性通過 RocketMQ 本身的高可用性機制來保證,如果希望確保事務消息不丟失、並且事務完整性得到保證,建議使用同步的雙重寫入機制。
  6. 事務消息的生產者 ID 不能與其他類型消息的生產者 ID 共享。與其他類型的消息不同,事務消息允許反向查詢、MQ服務器能通過它們的生產者 ID 查詢到消費者。

生產者代碼:

// 使用 TransactionMQProducer類創建生產者,並指定唯一的 ProducerGroup,設置自定義線程池來處理這些檢查請求。
// 執行本地事務后、需要根據執行結果對消息隊列進行回復。
// 回傳的事務狀態:
// TransactionStatus.CommitTransaction:   提交事務,它允許消費者消費此消息。
// TransactionStatus.RollbackTransaction: 回滾事務,它代表該消息將被刪除,不允許被消費。
// TransactionStatus.Unknown:             中間狀態,它代表需要檢查消息隊列來確定狀態。
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer( "transaction_producerGroup_test1" );
ExecutorService executorService = new ThreadPoolExecutor( 2 , 5 , 100 , TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>( 2000 ), new ThreadFactory() {
     @Override
     public Thread newThread(Runnable r) {
         Thread thread = new Thread(r);
         thread.setName( "client-transaction-msg-check-thread" );
         return thread;
     }
});
producer.setExecutorService(executorService);
//
producer.setTransactionListener(transactionListener);
 
producer.setNamesrvAddr( "10.131.236.179:19300;10.131.236.180:19301" );
 
producer.start();
String[] tags = new String[] { "TagA" , "TagB" , "TagC" , "TagD" , "TagE" };
for ( int i = 0 ; i < 10 ; i++) {
     try {
         Message msg =
                 new Message( "transaction_topic_test1" , tags[i % tags.length], "KEY" + i,
                         ( "Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
         SendResult sendResult = producer.sendMessageInTransaction(msg, null );
         System.out.printf( "%s%n" , sendResult);
         Thread.sleep( 10 );
     } catch (MQClientException | UnsupportedEncodingException e) {
         e.printStackTrace();
     }
}
for ( int i = 0 ; i < 1000 ; i++) {
     Thread.sleep( 1000 );
}
producer.shutdown();

事務消息監聽處理邏輯 :

public class TransactionListenerImpl implements TransactionListener {
     // 當發送半消息成功時,我們使用 executeLocalTransaction 方法來執行本地事務。
     // 返回三個事務狀態之一。
     // checkLocalTransaction 方法用於檢查本地事務狀態,並回應消息隊列的檢查請求。
     // 也是返回三個事務狀態之一。
     private AtomicInteger transactionIndex = new AtomicInteger( 0 );
     private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
     @Override
     public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
         System.out.printf( "#### executeLocalTransaction is executed, msgTransactionId=%s %n" ,
                 msg.getTransactionId());
 
         int value = transactionIndex.getAndIncrement();
         int status = value % 3 ;
         localTrans.put(msg.getTransactionId(), status);
         return LocalTransactionState.UNKNOW;
     }
     @Override
     public LocalTransactionState checkLocalTransaction(MessageExt msg) {
         Integer status = localTrans.get(msg.getTransactionId());
         if ( null != status) {
             switch (status) {
                 case 0 :
                     System.out.printf( "    # COMMIT # Simulating msg %s related local transaction exec succeeded! ### %n" , msg.getMsgId());
                     return LocalTransactionState.UNKNOW;
                 case 1 :
                     System.out.printf( "    # ROLLBACK # Simulating %s related local transaction exec failed! %n" , msg.getMsgId());
                     return LocalTransactionState.COMMIT_MESSAGE;
                 case 2 :
                     System.out.printf( "    # UNKNOW # Simulating %s related local transaction exec UNKNOWN! \n" );
                     return LocalTransactionState.ROLLBACK_MESSAGE;
             }
         }
         return LocalTransactionState.COMMIT_MESSAGE;
     }
}

 

7,開啟消費端的廣播模式(默認集群模式)

使用廣播模式有兩個條件:

1, 創建消費者組的時候開啟consumerBroadcastEnable=true

2, 代碼里加上

consumer.setMessageModel(MessageModel.BROADCASTING);


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM