springcloud使用rocketmq


4.6、rocketmq

作用:

1、削峰填谷

2、功能解耦

3、順序執行

  • 4.6.1、生產消息

  • 4.6.2、消費消息

  • 4.6.3、場景使用說明

 

4.6.1、生產消息

4.6.1.1、同步生產消息

// SendResult syncSend("TOPIC:TAG", 數據對象),例如:
rocketMQTemplate.syncSend("Hsp_Topic_Manager_Order:Hsp_Tag_Manager_Order_Create", order);

 

4.6.1.2、異步生產消息

// void asyncSend("TOPIC:TAG", 數據對象, 回調函數),例如:
rocketMQTemplate.asyncSend("Hsp_Topic_Manager_Order:Hsp_Tag_Manager_Order_Create", order, new SendCallback() {

@Override
public void onSuccess(SendResult sendResult) {
log.info("消息發送成功:{}", sendResult);
}

@Override
public void onException(Throwable e) {
log.info("消息發送失敗", e);
}});

 

4.6.2、消費消息

注解解釋:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RocketMQMessageListener {
  String consumerGroup();// 指定consumerGroup  

  String topic();// 指定消費的topic  

  SelectorType selectorType() default SelectorType.TAG; // 指定消費過濾方式: TAG, SQL92,默認TAG  

  String selectorExpress() default "*"; // 根據過濾方式,定義選擇表達式,默認所有

  ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY; // 消費方式:並發,順序,默認並發

  MessageModel messageModel() default MessageModel.CLUSTERING; // 消費模式: 集群, 廣播,默認集群

  int consumeThreadMax() default 64; //消費的並發線程數

}

 

4.6.2.1、集群消費

@Component
@Slf4j
@RocketMQMessageListener(topic = "Hsp_Topic_Manager_Order", selectorExpression = "Hsp_Tag_Manager_Order_Create",
consumerGroup = "Hsp_Tag_Manager_Order_Create", consumeMode = ConsumeMode.CONCURRENTLY, messageModel = MessageModel.CLUSTERING)
public class ThreeConsumer implements RocketMQListener<MQQuery>{

@Override
public void onMessage(MQQuery message) {
log.info(message.getPayload());
}

}

// 指定為集群消費MessageModel.CLUSTERING,並發消費ConsumeMode.CONCURRENTLY,可以不指定,默認就是這樣。

 

 

4.6.2.2、廣播消費

@Component
@Slf4j
@RocketMQMessageListener(topic = "Hsp_Topic_Manager_Order", selectorExpression = "Hsp_Tag_Manager_Order_Create",
consumerGroup = "Hsp_Tag_Manager_Order_Create", consumeMode = ConsumeMode.CONCURRENTLY,messageModel = MessageModel.BROADCASTING)
public class BroadCastingTwoConsumer implements RocketMQListener<MQQuery>{

@Override
public void onMessage(MQQuery message) {
log.info("========廣播消費節點========");
log.info(message.toString());

}

}

 

4.6.2.3、順序消費

@Component
@Slf4j
@RocketMQMessageListener(topic = "Hsp_Topic_Manager_Order", selectorExpression = "Hsp_Tag_Manager_Order_Create",
consumerGroup = "Hsp_Tag_Manager_Order_Create", consumeMode = ConsumeMode.ORDERLY, messageModel = MessageModel.CLUSTERING)
public class ThreeConsumer implements RocketMQListener<MQQuery>{

@Override
public void onMessage(MQQuery message) {
log.info(message.getPayload());
}

}

 

4.6.3、場景使用說明

1、對於需要實時了解調用結果的情況使用同步發送。比如短信通知。(同步生產消息+集群消費)

2、對實時要求不高的接口使用異步發送,如日志記錄。(異步生產消息+集群消費)

3、對數據消費有順序要求時,采用順序消費,比如:訂單的多狀態變更 待支付-》支付中-》支付完成。(同步生產消息+順序消費)

4、對於需要廣播通知的消息,采用廣播消費,如修改了門店信息,需要通知到各個模塊檢查邏輯。(同步生產消息+廣播消費)

 

4.6.4、使用前提

  • 4.6.4.1、在業務模塊bootstrap.yml中添加rocketmq配置

  • 4.6.4.2、注入RocketMQTemplate即可使用

  • 4.6.4.3、group,topic,tag的定義

4.6.4.1、在業務模塊bootstrap.yml中添加rocketmq配置

 

 

hsp-rocketmq.yml配置如下

rocketmq:
name-server: http://localhost:9876
access-channel: CLOUD
producer:
  group: GID_HSP_LOG_GROUP
  send-message-timeout: 5000
  access-key: key
  secret-key: secret
consumer:
  access-key: key
  secret-key: secret

 

4.6.4.2、注入RocketMQTemplate即可使用

4.6.4.3、group,topic,tag的定義

group: 實現消息消費的負載均衡和消息容錯,主要用於集群環境下。

topic: 消息主題。比如訂單:

tag: 主題標簽

命名規則:

TOPIC: HSP_TOPIC_模塊_主題 如 HSP_TOPIC_MANAGER_ORDER

TAG: HSP_TAG_模塊_標簽 如 HSP_TAG_MANAGER_ORDER_CREATE, HSP_TAG_MANAGER_ORDER_UPDATE

GROUP: 建議與TAG名稱相同 如 HSP_TAG_MANAGER_ORDER_CREATE

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM