6-rocketmq-springboot整合


官方手冊

https://github.com/apache/rocketmq-spring/wiki/用戶手冊

引包

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.0</version>
</dependency>

修改application.properties

## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group

注意:

請將上述示例配置中的127.0.0.1:9876替換成真實RocketMQ的NameServer地址與端口

編寫代碼

@SpringBootApplication
public class ProducerApplication implements CommandLineRunner{
    @Resource
    private RocketMQTemplate rocketMQTemplate;
    
    public static void main(String[] args){
        SpringApplication.run(ProducerApplication.class, args);
    }
    
    public void run(String... args) throws Exception {
      	//send message synchronously
        rocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!");
      	//send spring message
        rocketMQTemplate.send("test-topic-1", MessageBuilder.withPayload("Hello, World! I'm from spring message").build());
        //send messgae asynchronously
      	rocketMQTemplate.asyncSend("test-topic-2", new OrderPaidEvent("T_001", new BigDecimal("88.00")), new SendCallback() {
            @Override
            public void onSuccess(SendResult var1) {
                System.out.printf("async onSucess SendResult=%s %n", var1);
            }

            @Override
            public void onException(Throwable var1) {
                System.out.printf("async onException Throwable=%s %n", var1);
            }

        });
      	//Send messages orderly
      	rocketMQTemplate.syncSendOrderly("orderly_topic",MessageBuilder.withPayload("Hello, World").build(),"hashkey")
        
        //rocketMQTemplate.destroy(); // notes:  once rocketMQTemplate be destroyed, you can not send any message again with this rocketMQTemplate
    }
    
    @Data
    @AllArgsConstructor
    public class OrderPaidEvent implements Serializable{
        private String orderId;
        
        private BigDecimal paidMoney;
    }
}

接收消息

rongtong edited this page on 25 Dec 2019 · 1 revision

修改application.properties

## application.properties
rocketmq.name-server=127.0.0.1:9876

注意:

請將上述示例配置中的127.0.0.1:9876替換成真實RocketMQ的NameServer地址與端口

編寫代碼

@SpringBootApplication
public class ConsumerApplication{
    
    public static void main(String[] args){
        SpringApplication.run(ConsumerApplication.class, args);
    }
    
    @Slf4j
    @Service
    @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
    public class MyConsumer1 implements RocketMQListener<String>{
        public void onMessage(String message) {
            log.info("received message: {}", message);
        }
    }
    
    @Slf4j
    @Service
    @RocketMQMessageListener(topic = "test-topic-2", consumerGroup = "my-consumer_test-topic-2")
    public class MyConsumer2 implements RocketMQListener<OrderPaidEvent>{
        public void onMessage(OrderPaidEvent orderPaidEvent) {
            log.info("received orderPaidEvent: {}", orderPaidEvent);
        }
    }
}

事務消息

rongtong edited this page on 25 May · 2 revisions

修改application.properties

## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group

注意:

請將上述示例配置中的127.0.0.1:9876替換成真實RocketMQ的NameServer地址與端口

編寫代碼

@SpringBootApplication
public class ProducerApplication implements CommandLineRunner{
    @Resource
    private RocketMQTemplate rocketMQTemplate;

    public static void main(String[] args){
        SpringApplication.run(ProducerApplication.class, args);
    }

    public void run(String... args) throws Exception {
        try {
            // Build a SpringMessage for sending in transaction
            Message msg = MessageBuilder.withPayload(..)...;
            // In sendMessageInTransaction(), the first parameter transaction name ("test")
            // must be same with the @RocketMQTransactionListener's member field 'transName'
            rocketMQTemplate.sendMessageInTransaction("test-topic", msg, null);
        } catch (MQClientException e) {
            e.printStackTrace(System.out);
        }
    }

    // Define transaction listener with the annotation @RocketMQTransactionListener
    @RocketMQTransactionListener
    class TransactionListenerImpl implements RocketMQLocalTransactionListener {
          @Override
          public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            // ... local transaction process, return bollback, commit or unknown
            return RocketMQLocalTransactionState.UNKNOWN;
          }

          @Override
          public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            // ... check transaction status and return bollback, commit or unknown
            return RocketMQLocalTransactionState.COMMIT;
          }
    }
}

消息軌跡

rongtong edited this page on 25 Dec 2019 · 1 revision

Producer 端要想使用消息軌跡,需要多配置兩個配置項:

## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group

rocketmq.producer.enable-msg-trace=true
rocketmq.producer.customized-trace-topic=my-trace-topic

Consumer 端消息軌跡的功能需要在 @RocketMQMessageListener 中進行配置對應的屬性:

@Service
@RocketMQMessageListener(
    topic = "test-topic-1", 
    consumerGroup = "my-consumer_test-topic-1",
    enableMsgTrace = true,
    customizedTraceTopic = "my-trace-topic"
)
public class MyConsumer implements RocketMQListener<String> {
    ...
}

注意:

默認情況下 Producer 和 Consumer 的消息軌跡功能是開啟的且 trace-topic 為 RMQ_SYS_TRACE_TOPIC Consumer 端的消息軌跡 trace-topic 可以在配置文件中配置 rocketmq.consumer.customized-trace-topic 配置項,不需要為在每個 @RocketMQMessageListener 配置。

ACL功能

rongtong edited this page on 25 Dec 2019 · 1 revision

Producer 端要想使用 ACL 功能,需要多配置兩個配置項:

## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.group=my-group

rocketmq.producer.access-key=AK
rocketmq.producer.secret-key=SK

Consumer 端 ACL 功能需要在 @RocketMQMessageListener 中進行配置

@Service
@RocketMQMessageListener(
    topic = "test-topic-1", 
    consumerGroup = "my-consumer_test-topic-1",
    accessKey = "AK",
    secretKey = "SK"
)
public class MyConsumer implements RocketMQListener<String> {
    ...
}

注意:

可以不用為每個 @RocketMQMessageListener 注解配置 AK/SK,在配置文件中配置 rocketmq.consumer.access-keyrocketmq.consumer.secret-key 配置項,這兩個配置項的值就是默認值

請求 應答語義支持

rongtong edited this page on 21 Feb · 2 revisions

RocketMQ-Spring 提供 請求/應答 語義支持。

  • Producer端

發送Request消息使用SendAndReceive方法

注意

同步發送需要在方法的參數中指明返回值類型

異步發送需要在回調的接口中指明返回值類型

@SpringBootApplication
public class ProducerApplication implements CommandLineRunner{
    @Resource
    private RocketMQTemplate rocketMQTemplate;
    
    public static void main(String[] args){
        SpringApplication.run(ProducerApplication.class, args);
    }
    
    public void run(String... args) throws Exception {
        // 同步發送request並且等待String類型的返回值
        String replyString = rocketMQTemplate.sendAndReceive("stringRequestTopic", "request string", String.class);
        System.out.printf("send %s and receive %s %n", "request string", replyString);

        // 異步發送request並且等待User類型的返回值
        rocketMQTemplate.sendAndReceive("objectRequestTopic", new User("requestUserName",(byte) 9), new RocketMQLocalRequestCallback<User>() {
            @Override public void onSuccess(User message) {
                System.out.printf("send user object and receive %s %n", message.toString());
            }

            @Override public void onException(Throwable e) {
                e.printStackTrace();
            }
        }, 5000);
    }
    
    @Data
    @AllArgsConstructor
    public class User implements Serializable{
        private String userName;
    		private Byte userAge;
    }
}
  • Consumer端

需要實現RocketMQReplyListener<T, R> 接口,其中T表示接收值的類型,R表示返回值的類型。

@SpringBootApplication
public class ConsumerApplication{
    
    public static void main(String[] args){
        SpringApplication.run(ConsumerApplication.class, args);
    }
    
    @Service
    @RocketMQMessageListener(topic = "stringRequestTopic", consumerGroup = "stringRequestConsumer")
    public class StringConsumerWithReplyString implements RocketMQReplyListener<String, String> {
        @Override
        public String onMessage(String message) {
          System.out.printf("------- StringConsumerWithReplyString received: %s \n", message);
          return "reply string";
        }
      }
   
    @Service
    @RocketMQMessageListener(topic = "objectRequestTopic", consumerGroup = "objectRequestConsumer")
    public class ObjectConsumerWithReplyUser implements RocketMQReplyListener<User, User>{
        public void onMessage(User user) {
          	System.out.printf("------- ObjectConsumerWithReplyUser received: %s \n", user);
          	User replyUser = new User("replyUserName",(byte) 10);	
          	return replyUser;
        }
    }

    @Data
    @AllArgsConstructor
    public class User implements Serializable{
        private String userName;
    		private Byte userAge;
    }
}

常見問題

rongtong edited this page on 25 Dec 2019 · 1 revision

  1. 生產環境有多個nameserver該如何連接?

    rocketmq.name-server支持配置多個nameserver地址,采用;分隔即可。例如:172.19.0.1:9876;172.19.0.2:9876

  2. rocketMQTemplate在什么時候被銷毀?

    開發者在項目中使用rocketMQTemplate發送消息時,不需要手動執行rocketMQTemplate.destroy()方法, rocketMQTemplate會在spring容器銷毀時自動銷毀。

  3. 啟動報錯:Caused by: org.apache.rocketmq.client.exception.MQClientException: The consumer group[xxx] has been created before, specify another name please

    RocketMQ在設計時就不希望一個消費者同時處理多個類型的消息,因此同一個consumerGroup下的consumer職責應該是一樣的,不要干不同的事情(即消費多個topic)。建議consumerGrouptopic一一對應。

  4. 發送的消息內容體是如何被序列化與反序列化的?

    RocketMQ的消息體都是以byte[]方式存儲。當業務系統的消息內容體如果是java.lang.String類型時,統一按照utf-8編碼轉成byte[];如果業務系統的消息內容為非java.lang.String類型,則采用jackson-databind序列化成JSON格式的字符串之后,再統一按照utf-8編碼轉成byte[]

  5. 如何指定topic的tags?

    RocketMQ的最佳實踐中推薦:一個應用盡可能用一個Topic,消息子類型用tags來標識,tags可以由應用自由設置。 在使用rocketMQTemplate發送消息時,通過設置發送方法的destination參數來設置消息的目的地,destination的格式為topicName:tagName:前面表示topic的名稱,后面表示tags名稱。

    注意:

    tags從命名來看像是一個復數,但發送消息時,目的地只能指定一個topic下的一個tag,不能指定多個。

  6. 發送消息時如何設置消息的key?

    可以通過重載的xxxSend(String destination, Message<?> msg, ...)方法來發送消息,指定msgheaders來完成。示例:

    Message<?> message = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, msgId).build();
    rocketMQTemplate.send("topic-test", message);
    

    同理還可以根據上面的方式來設置消息的FLAGWAIT_STORE_MSG_OK以及一些用戶自定義的其它頭信息。

    注意:

    在將Spring的Message轉化為RocketMQ的Message時,為防止header信息與RocketMQ的系統屬性沖突,在所有header的名稱前面都統一添加了前綴USERS_。因此在消費時如果想獲取自定義的消息頭信息,請遍歷頭信息中以USERS_開頭的key即可。

  7. 消費消息時,除了獲取消息payload外,還想獲取RocketMQ消息的其它系統屬性,需要怎么做?

    消費者在實現RocketMQListener接口時,只需要起泛型為MessageExt即可,這樣在onMessage方法將接收到RocketMQ原生的MessageExt消息。

    @Slf4j
    @Service
    @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
    public class MyConsumer2 implements RocketMQListener<MessageExt>{
        public void onMessage(MessageExt messageExt) {
            log.info("received messageExt: {}", messageExt);
        }
    }
    
  8. 如何指定消費者從哪開始消費消息,或開始消費的位置?

    消費者默認開始消費的位置請參考:RocketMQ FAQ。 若想自定義消費者開始的消費位置,只需在消費者類添加一個RocketMQPushConsumerLifecycleListener接口的實現即可。 示例如下:

    @Slf4j
    @Service
    @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
    public class MyConsumer1 implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
        @Override
        public void onMessage(String message) {
            log.info("received message: {}", message);
        }
    
        @Override
        public void prepareStart(final DefaultMQPushConsumer consumer) {
            // set consumer consume message from now
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
            	  consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
        }
    }
    

    同理,任何關於DefaultMQPushConsumer的更多其它其它配置,都可以采用上述方式來完成。

  9. 如何發送事務消息?

    在客戶端,首先用戶需要實現RocketMQLocalTransactionListener接口,並在接口類上注解聲明@RocketMQTransactionListener,實現確認和回查方法;然后再使用資源模板RocketMQTemplate, 調用方法sendMessageInTransaction()來進行消息的發布。 注意:從RocketMQ-Spring 2.1.0版本之后,注解@RocketMQTransactionListener不能設置txProducerGroup、ak、sk,這些值均與對應的RocketMQTemplate保持一致

  10. 如何聲明不同name-server或者其他特定的屬性來定義非標的RocketMQTemplate?

    第一步: 定義非標的RocketMQTemplate使用你需要的屬性,可以定義與標准的RocketMQTemplate不同的nameserver、groupname等。如果不定義,它們取全局的配置屬性值或默認值。

    // 這個RocketMQTemplate的Spring Bean名是'extRocketMQTemplate', 與所定義的類名相同(但首字母小寫)
    @ExtRocketMQTemplateConfiguration(nameServer="127.0.0.1:9876"
       , ... // 定義其他屬性,如果有必要。
    )
    public class ExtRocketMQTemplate extends RocketMQTemplate {
      //類里面不需要做任何修改
    }
    

    第二步: 使用這個非標RocketMQTemplate

    @Resource(name = "extRocketMQTemplate") // 這里必須定義name屬性來指向上述具體的Spring Bean.
    private RocketMQTemplate extRocketMQTemplate; 
    

    接下來就可以正常使用這個extRocketMQTemplate了。

  11. 如何使用非標的RocketMQTemplate發送事務消息?

    首先用戶需要實現RocketMQLocalTransactionListener接口,並在接口類上注解聲明@RocketMQTransactionListener,注解字段的rocketMQTemplateBeanName指明為非標的RocketMQTemplate的Bean name(若不設置則默認為標准的RocketMQTemplate),比如非標的RocketMQTemplate Bean name為“extRocketMQTemplate",則代碼如下:

    @RocketMQTransactionListener(rocketMQTemplateBeanName = "extRocketMQTemplate")
        class TransactionListenerImpl implements RocketMQLocalTransactionListener {
              @Override
              public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                // ... local transaction process, return bollback, commit or unknown
                return RocketMQLocalTransactionState.UNKNOWN;
              }
    
              @Override
              public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
                // ... check transaction status and return bollback, commit or unknown
                return RocketMQLocalTransactionState.COMMIT;
              }
        }
    

    然后使用extRocketMQTemplate調用sendMessageInTransaction()來發送事務消息。

  12. MessageListener消費端,是否可以指定不同的name-server而不是使用全局定義的'rocketmq.name-server'屬性值 ?

    @Service
    @RocketMQMessageListener(
       nameServer = "NEW-NAMESERVER-LIST", // 可以使用這個optional屬性來指定不同的name-server
       topic = "test-topic-1", 
       consumerGroup = "my-consumer_test-topic-1",
       enableMsgTrace = true,
       customizedTraceTopic = "my-trace-topic"
    )
    public class MyNameServerConsumer implements RocketMQListener<String> {
       ...
    }
    


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM