RabbitMQ系列(七)--批量消息和延時消息


批量消息發送模式

  批量消息是指把消息放到一個集合統一進行提交,這種方案設計思路是希望消息在一個會話里,比如放到ThreadLocal里的集合,擁有相同

的會話ID,帶有這次提交信息的size等屬性,最重要的是吧這一批消息進行合並。對於channel就是發送一次消息。這種方式也是希望消費端在消

費的時候,可以進行批量化的消費,針對一個原子業務的操作進行處理,但是不保證可靠性,需要進行補償機制。

圖例:

偽代碼思路:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class BatchMessage {
    private long sessionId;
    private List<String> messageHolder = new ArrayList<>();    //用來保存message的集合
    private int listSize;    //集合的條數
}

 

把message放到ThreadLocal里面使用,這些批量的消息需要同一個sessionId,如果要入庫,只是保存sessionId對應的消息集合,而不是每條消息

步驟:

  1、首先業務數據入庫

  2、將批量消息對應的BatchMessage入庫,狀態為發送中

  3、發送message到Broker

  4、返回confirm確認

  5、修改狀態為消費成功

  6、。。。。后面不講了,和之前博客思路一樣

延遲消息發送模式

使用場景:

  1、在電商平台買到的商品簽收后,不點擊確認支付,系統自動在一定時間進行支付操作

  2、自動超時作廢的場景,你的優惠券/紅包也有使用時限,也可以用延遲消息機制

實現:

  1、DLX和TTL:Consumer訂閱DLX,Message發送到原Queue,設置TTL為30分鍾。TTL到期,消息發送到DLX,然后被Consumer消費,就可以實現延遲隊列

  2、rabbitmq 3.5.7及以上的版本提供了一個插件(rabbitmq-delayed-message-exchange)來實現延遲隊列功能

安裝、啟用插件

Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 30*60*1000);
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM